Source code for lucidlink.fsspec

"""fsspec integration for LucidLink filesystem.

Provides fsspec.AbstractFileSystem implementation that allows standard Python
data libraries (Pandas, Dask, PyArrow, etc.) to access LucidLink filespaces
using URL protocols like ``lucidlink://workspace/filespace/path/to/file.csv``

**Features:**

- Standard file operations: open, read, write, ls, mkdir, rm
- Native move/rename operations (much faster than copy+delete fallback)
- Directory operations: mkdir, makedirs, rmdir
- Sync control: automatic or manual sync to hub
"""

from dataclasses import dataclass
import logging
import os
import shutil
from pathlib import Path
from typing import Any, Dict, List, Optional, Union

from .daemon import Daemon
from .credentials import ServiceAccountCredentials
from .exceptions import LucidLinkError
from .filespace_cache import FilespaceCache
from .filespace_models import SyncMode
from .url_parser import UrlParser, ParsedUrl, PROTOCOL

logger = logging.getLogger(__name__)


@dataclass(frozen=True)
class LucidLinkOptions:
    """Configuration options for LucidLinkFileSystem.

    This dataclass normalizes options so that equivalent configurations
    produce the same hash, enabling proper fsspec instance caching.
    """

    # Default values as class constants (referenced by metaclass and __init__)
    DEFAULT_TOKEN: Optional[str] = None
    DEFAULT_SANDBOXED: bool = True
    DEFAULT_PERSIST_FILES: bool = False
    DEFAULT_ROOT_PATH: Optional[str] = None
    DEFAULT_SYNC_MODE: SyncMode = SyncMode.SYNC_ALL
    DEFAULT_WEBSERVICE_URL: Optional[str] = None

    token: Optional[str] = DEFAULT_TOKEN
    sandboxed: bool = DEFAULT_SANDBOXED
    persist_files: bool = DEFAULT_PERSIST_FILES
    root_path: Optional[str] = DEFAULT_ROOT_PATH  # str instead of Path for hashability
    sync_mode: SyncMode = DEFAULT_SYNC_MODE
    webservice_url: Optional[str] = DEFAULT_WEBSERVICE_URL

    @classmethod
    def from_kwargs(
        cls,
        token: Optional[str] = None,
        sandboxed: bool = None,
        persist_files: bool = None,
        root_path: Optional[Union[str, Path]] = None,
        sync_mode: SyncMode = None,
        webservice_url: Optional[str] = None,
        **kwargs,  # Ignore extra kwargs (fsspec passes some)
    ) -> "LucidLinkOptions":
        """Create options from keyword arguments, applying defaults."""
        return cls(
            token=token if token is not None else cls.DEFAULT_TOKEN,
            sandboxed=sandboxed if sandboxed is not None else cls.DEFAULT_SANDBOXED,
            persist_files=persist_files if persist_files is not None else cls.DEFAULT_PERSIST_FILES,
            root_path=str(root_path) if root_path is not None else cls.DEFAULT_ROOT_PATH,
            sync_mode=sync_mode if sync_mode is not None else cls.DEFAULT_SYNC_MODE,
            webservice_url=webservice_url if webservice_url is not None else cls.DEFAULT_WEBSERVICE_URL,
        )

    def to_kwargs(self) -> Dict[str, Any]:
        """Convert options to kwargs dict for consistent cache key computation."""
        return {
            "token": self.token,
            "sandboxed": self.sandboxed,
            "persist_files": self.persist_files,
            "root_path": self.root_path,
            "sync_mode": self.sync_mode,
            "webservice_url": self.webservice_url,
        }


try:
    from fsspec.spec import AbstractFileSystem, _Cached

    FSSPEC_AVAILABLE = True

    class _NormalizedCached(_Cached):
        """Metaclass that normalizes kwargs before caching.

        fsspec's _Cached metaclass computes cache keys from raw kwargs BEFORE
        __init__ runs. This means LucidLinkFileSystem(token='A') and
        LucidLinkFileSystem(token='A', sync_mode=SYNC_ALL) produce different
        cache keys, even though SYNC_ALL is the default.

        Since only one LucidLink daemon can run per process, different cache
        keys would cause daemon crashes. This metaclass normalizes kwargs by
        round-tripping through LucidLinkOptions before the cache key is computed.
        """

        def __call__(cls, *args, **kwargs):
            # Normalize by round-tripping through LucidLinkOptions
            # Preserve non-LucidLink kwargs (e.g., skip_instance_cache from fsspec)
            lucid_kwargs = LucidLinkOptions.from_kwargs(**kwargs).to_kwargs()
            other_kwargs = {
                k: v for k, v in kwargs.items()
                if k not in lucid_kwargs
            }
            # Clear and rebuild with consistent key order (tokenize is order-sensitive)
            kwargs.clear()
            kwargs.update(lucid_kwargs)
            kwargs.update(other_kwargs)
            return super().__call__(*args, **kwargs)

except ImportError:
    FSSPEC_AVAILABLE = False
    _NormalizedCached = None

[docs] class AbstractFileSystem: """Dummy base class when fsspec is not available.""" pass
[docs] class LucidLinkFileSystem( AbstractFileSystem, metaclass=_NormalizedCached if FSSPEC_AVAILABLE else type ): """fsspec filesystem implementation for LucidLink. Enables standard Python data libraries to access LucidLink filespaces using URL protocols like: lucidlink://workspace/filespace/path/to/file.csv Usage with Pandas:: df = pd.read_csv('lucidlink://my-workspace/production-data/data.csv', storage_options={'token': 'sa_live:...'}) Usage with Dask:: df = dd.read_parquet('lucidlink://workspace/filespace/dataset/*.parquet', storage_options={'token': 'sa_live:...'}) Usage with PyArrow:: table = pq.read_table('lucidlink://workspace/filespace/data.parquet', filesystem=LucidLinkFileSystem(token='sa_live:...')) Direct usage:: fs = LucidLinkFileSystem(token='sa_live:...') with fs.open('lucidlink://workspace/filespace/file.txt', 'r') as f: content = f.read() """ protocol = PROTOCOL # Note: cachable = True is inherited from AbstractFileSystem and is required. # Only one daemon can run per process, so fsspec must reuse filesystem instances. def __init__( self, token: Optional[str] = LucidLinkOptions.DEFAULT_TOKEN, sandboxed: bool = LucidLinkOptions.DEFAULT_SANDBOXED, persist_files: bool = LucidLinkOptions.DEFAULT_PERSIST_FILES, root_path: Optional[Union[str, Path]] = LucidLinkOptions.DEFAULT_ROOT_PATH, sync_mode: SyncMode = LucidLinkOptions.DEFAULT_SYNC_MODE, webservice_url: Optional[str] = LucidLinkOptions.DEFAULT_WEBSERVICE_URL, **kwargs, ): """Initialize LucidLink filesystem. Args: token: Service account token (sa_live:...) sandboxed: Use temporary directory (``True``) or persistent ``.lucid`` folder (``False``) persist_files: Keep files after daemon stops (only for physical mode) root_path: Custom root path for storage (optional) sync_mode: Sync mode - ``SYNC_ALL`` (default) syncs on close, ``SYNC_NONE`` skips sync webservice_url: WebService URL override (for testing against local environments) **kwargs: Additional fsspec parameters """ self._require_fsspec() super().__init__(**kwargs) # Normalize options for consistent hashing/caching self._options = LucidLinkOptions.from_kwargs( token=token, sandboxed=sandboxed, persist_files=persist_files, root_path=root_path, sync_mode=sync_mode, webservice_url=webservice_url, ) self._daemon: Optional[Daemon] = None self._url_parser = UrlParser() self._cache = FilespaceCache() if self._options.token: self._initialize_daemon() @property def options(self) -> LucidLinkOptions: """Read-only access to filesystem configuration options.""" return self._options def _require_fsspec(self) -> None: """Verify fsspec is available.""" if not FSSPEC_AVAILABLE: raise ImportError( "fsspec is required for LucidLinkFileSystem. " "Install with: pip install fsspec" ) def _initialize_daemon(self) -> None: """Initialize and start the LucidLink daemon.""" from . import create_daemon if self._daemon is not None: return config = {} if self._options.webservice_url: config["webservice.url"] = self._options.webservice_url self._daemon = create_daemon( config=config or None, sandboxed=self._options.sandboxed, persist_files=self._options.persist_files, root_path=self._options.root_path, ) self._daemon.start() if self._options.token: credentials = ServiceAccountCredentials(token=self._options.token) workspace = self._daemon.authenticate(credentials) self._cache.set_workspace(workspace) def _ensure_initialized(self) -> None: """Ensure daemon is initialized, raising error if token not provided.""" if self._daemon is None: if self._options.token is None: raise LucidLinkError( "No token provided. Initialize with token or set via " "storage_options={'token': 'sa_live:...'}" ) self._initialize_daemon() def _get_filespace_and_path(self, url: str): """Get filespace and file path for a URL. Args: url: LucidLink URL Returns: Tuple of (Filespace, file_path) """ self._ensure_initialized() parsed = self._url_parser.parse(url) filespace = self._cache.get_or_connect(parsed) return filespace, parsed.path def _open( self, path: str, mode: str = "rb", block_size: Optional[int] = None, **kwargs, ): """Open a file and return a file-like object. Args: path: LucidLink URL mode: File mode ('r', 'rb', 'w', 'wb', etc.) block_size: Buffer size (uses default if None) **kwargs: Additional parameters Returns: File-like object (LucidFileStream or buffered/text wrapper) """ filespace, file_path = self._get_filespace_and_path(path) buffering = block_size if block_size is not None else -1 valid_kwargs = { k: v for k, v in kwargs.items() if k in ["encoding", "errors", "newline"] } return filespace.fs.open( file_path, mode=mode, buffering=buffering, **valid_kwargs, )
[docs] def ls( self, path: str, detail: bool = True, **kwargs ) -> Union[List[str], List[Dict[str, Any]]]: """List directory contents. Args: path: LucidLink URL detail: Return detailed info (True) or just names (False) **kwargs: Additional parameters Returns: List of full paths (detail=False) or detailed info dicts (detail=True) """ filespace, file_path = self._get_filespace_and_path(path) entries = filespace.fs.read_dir(file_path) base_path = path.rstrip("/") if not detail: return [f"{base_path}/{entry.name}" for entry in entries] return [ { "name": f"{base_path}/{entry.name}", "size": entry.size, "type": "directory" if entry.is_dir() else "file", "mtime": entry.mtime, "created": entry.ctime, } for entry in entries ]
[docs] def info(self, path: str, **kwargs) -> Dict[str, Any]: """Get file or directory information. Args: path: LucidLink URL **kwargs: Additional parameters Returns: Dictionary with file/directory metadata """ filespace, file_path = self._get_filespace_and_path(path) if not file_path or file_path == "/": return { "name": path, "size": 0, "type": "directory", "mtime": 0, "created": 0, } entry = filespace.fs.get_entry(file_path) return { "name": entry.name, "size": entry.size, "type": "directory" if entry.is_dir() else "file", "mtime": entry.mtime, "created": entry.ctime, }
[docs] def exists(self, path: str, **kwargs) -> bool: """Check if path exists. Raises: PermissionError: If token doesn't have access to the workspace or filespace """ try: self.info(path) return True except PermissionError: raise except Exception: return False
[docs] def isdir(self, path: str) -> bool: """Check if path is a directory. Raises: PermissionError: If token doesn't have access to the workspace or filespace """ try: return self.info(path)["type"] == "directory" except PermissionError: raise except Exception: return False
[docs] def isfile(self, path: str) -> bool: """Check if path is a file. Raises: PermissionError: If token doesn't have access to the workspace or filespace """ try: return self.info(path)["type"] == "file" except PermissionError: raise except Exception: return False
[docs] def mkdir(self, path: str, create_parents: bool = True, **kwargs) -> None: """Create a directory. Args: path: LucidLink URL create_parents: Create parent directories if needed **kwargs: Additional parameters """ filespace, file_path = self._get_filespace_and_path(path) filespace.fs.create_dir(file_path)
[docs] def makedirs(self, path: str, exist_ok: bool = False) -> None: """Create directory recursively. Args: path: LucidLink URL exist_ok: Don't raise error if directory exists """ filespace, file_path = self._get_filespace_and_path(path) if not file_path or file_path == "/": return if exist_ok and self._directory_exists_quietly(filespace, file_path): return self._create_directory_path(filespace, file_path, exist_ok)
def _directory_exists_quietly(self, filespace, file_path: str) -> bool: """Check if directory exists without raising exceptions.""" try: entry = filespace.fs.get_entry(file_path) return entry.is_dir() except Exception: return False def _create_directory_path( self, filespace, file_path: str, exist_ok: bool ) -> None: """Create all directories in a path.""" parts = [p for p in file_path.split("/") if p] current = "" for part in parts: current += "/" + part if not filespace.fs.dir_exists(current): try: filespace.fs.create_dir(current) except Exception: if not exist_ok: raise
[docs] def rm( self, path: str, recursive: bool = False, maxdepth: Optional[int] = None, ) -> None: """Remove file or directory. Args: path: LucidLink URL recursive: Remove directory recursively maxdepth: Maximum recursion depth (ignored, always full recursion) """ filespace, file_path = self._get_filespace_and_path(path) if self.isdir(path): filespace.fs.delete_dir(file_path, recursive=recursive) else: filespace.fs.delete(file_path)
[docs] def rmdir(self, path: str) -> None: """Remove an empty directory. Args: path: LucidLink URL """ filespace, file_path = self._get_filespace_and_path(path) filespace.fs.delete_dir(file_path, recursive=False)
[docs] def mv( self, path1: str, path2: str, recursive: bool = False, maxdepth: Optional[int] = None, **kwargs, ) -> None: """Move/rename a file or directory. Uses native rename operation which is much faster than copy+delete. Args: path1: Source LucidLink URL path2: Destination LucidLink URL recursive: Ignored (move is always recursive for directories) maxdepth: Ignored **kwargs: Additional parameters (ignored) Raises: ValueError: If paths are in different filespaces """ parsed1, parsed2 = self._url_parser.validate_same_filespace(path1, path2) filespace = self._cache.get_or_connect(parsed1) filespace.fs.move(parsed1.path, parsed2.path)
[docs] def rename(self, path1: str, path2: str, **kwargs) -> None: """Rename a file or directory (alias for mv). Args: path1: Source LucidLink URL path2: Destination LucidLink URL **kwargs: Additional parameters (passed to mv) """ self.mv(path1, path2, **kwargs)
[docs] def cat( self, path: str, start: Optional[int] = None, end: Optional[int] = None, **kwargs, ) -> bytes: """Read entire file or byte range. Args: path: LucidLink URL start: Start byte position (optional) end: End byte position (optional) **kwargs: Additional parameters Returns: File contents as bytes """ with self._open(path, "rb") as f: if start is not None: f.seek(start) if end is not None: size = end - (start or 0) return f.read(size) return f.read()
[docs] def cat_file( self, path: str, start: Optional[int] = None, end: Optional[int] = None, **kwargs, ) -> bytes: """Alias for cat() for compatibility.""" return self.cat(path, start, end, **kwargs)
[docs] def get( self, rpath: str, lpath: str, recursive: bool = False, **kwargs ) -> None: """Download file from LucidLink to local filesystem. Args: rpath: Remote LucidLink URL lpath: Local filesystem path recursive: Download directory recursively **kwargs: Additional parameters """ if self.isdir(rpath): self._download_directory(rpath, lpath, recursive) else: self._download_file(rpath, lpath)
def _download_directory( self, rpath: str, lpath: str, recursive: bool ) -> None: """Download a directory recursively.""" if not recursive: raise ValueError("Cannot download directory without recursive=True") os.makedirs(lpath, exist_ok=True) for item in self.ls(rpath, detail=True): item_name = os.path.basename(item["name"]) remote_item = f"{rpath}/{item_name}" local_item = os.path.join(lpath, item_name) if item["type"] == "directory": self._download_directory(remote_item, local_item, recursive=True) else: self._download_file(remote_item, local_item) def _download_file(self, rpath: str, lpath: str) -> None: """Download a single file.""" with self._open(rpath, "rb") as src: with open(lpath, "wb") as dst: dst.write(src.read())
[docs] def put( self, lpath: str, rpath: str, recursive: bool = False, **kwargs ) -> None: """Upload file from local filesystem to LucidLink. Args: lpath: Local filesystem path rpath: Remote LucidLink URL recursive: Upload directory recursively **kwargs: Additional parameters """ if os.path.isdir(lpath): self._upload_directory(lpath, rpath, recursive) else: self._upload_file(lpath, rpath)
def _upload_directory(self, lpath: str, rpath: str, recursive: bool) -> None: """Upload a directory recursively.""" if not recursive: raise ValueError("Cannot upload directory without recursive=True") self.makedirs(rpath, exist_ok=True) for item_name in os.listdir(lpath): local_item = os.path.join(lpath, item_name) remote_item = f"{rpath}/{item_name}" if os.path.isdir(local_item): self._upload_directory(local_item, remote_item, recursive=True) else: self._upload_file(local_item, remote_item) def _upload_file(self, lpath: str, rpath: str) -> None: """Upload a single file.""" with open(lpath, "rb") as src: with self._open(rpath, "wb") as dst: dst.write(src.read())
[docs] def sync_all(self) -> None: """Synchronize all pending changes to the hub. Calls ``sync_all()`` on all connected filespaces to ensure all metadata and data changes are propagated to LucidHub. """ for filespace in self._cache.values(): try: filespace.sync_all() except Exception: pass
[docs] def close(self) -> None: """Close all connections and stop daemon.""" if self._options.sync_mode == SyncMode.SYNC_ALL: self.sync_all() if self._options.sandboxed and self._options.sync_mode == SyncMode.SYNC_NONE: logger.warning( "Closing LucidLinkFileSystem in sandboxed mode with SYNC_NONE: " "cache will be deleted but changes may not have been synced to hub" ) self._cache.unlink_all() temp_dir = self._get_temp_dir_for_cleanup() if self._daemon is not None: try: self._daemon.stop() except Exception: pass self._daemon = None self._cache.clear() self._cleanup_temp_dir(temp_dir) # Remove this instance from fsspec's cache so a new filesystem can be created self.clear_instance_cache()
def _get_temp_dir_for_cleanup(self) -> Optional[Path]: """Get temp directory path before daemon stops (if sandboxed).""" if not self._options.sandboxed or self._daemon is None: return None storage = getattr(self._daemon, "_storage", None) if storage is None: return None return getattr(storage, "_temp_dir", None) def _cleanup_temp_dir(self, temp_dir: Optional[Path]) -> None: """Delete temp directory if it exists.""" if temp_dir is not None and temp_dir.exists(): try: shutil.rmtree(temp_dir) except Exception as e: logger.debug(f"Failed to delete temp directory {temp_dir}: {e}") def __del__(self): """Cleanup daemon on destruction.""" self.close() def __enter__(self): """Context manager entry.""" return self def __exit__(self, exc_type, exc_val, exc_tb): """Context manager exit.""" self.close() return False