"""fsspec integration for LucidLink filesystem.
Provides fsspec.AbstractFileSystem implementation that allows standard Python
data libraries (Pandas, Dask, PyArrow, etc.) to access LucidLink filespaces
using URL protocols like ``lucidlink://workspace/filespace/path/to/file.csv``
**Features:**
- Standard file operations: open, read, write, ls, mkdir, rm
- Native move/rename operations (much faster than copy+delete fallback)
- Directory operations: mkdir, makedirs, rmdir
- Sync control: automatic or manual sync to hub
"""
from dataclasses import dataclass
import logging
import os
import shutil
from pathlib import Path
from typing import Any, Dict, List, Optional, Union
from .daemon import Daemon
from .credentials import ServiceAccountCredentials
from .exceptions import LucidLinkError
from .filespace_cache import FilespaceCache
from .filespace_models import SyncMode
from .url_parser import UrlParser, ParsedUrl, PROTOCOL
logger = logging.getLogger(__name__)
@dataclass(frozen=True)
class LucidLinkOptions:
"""Configuration options for LucidLinkFileSystem.
This dataclass normalizes options so that equivalent configurations
produce the same hash, enabling proper fsspec instance caching.
"""
# Default values as class constants (referenced by metaclass and __init__)
DEFAULT_TOKEN: Optional[str] = None
DEFAULT_SANDBOXED: bool = True
DEFAULT_PERSIST_FILES: bool = False
DEFAULT_ROOT_PATH: Optional[str] = None
DEFAULT_SYNC_MODE: SyncMode = SyncMode.SYNC_ALL
DEFAULT_WEBSERVICE_URL: Optional[str] = None
token: Optional[str] = DEFAULT_TOKEN
sandboxed: bool = DEFAULT_SANDBOXED
persist_files: bool = DEFAULT_PERSIST_FILES
root_path: Optional[str] = DEFAULT_ROOT_PATH # str instead of Path for hashability
sync_mode: SyncMode = DEFAULT_SYNC_MODE
webservice_url: Optional[str] = DEFAULT_WEBSERVICE_URL
@classmethod
def from_kwargs(
cls,
token: Optional[str] = None,
sandboxed: bool = None,
persist_files: bool = None,
root_path: Optional[Union[str, Path]] = None,
sync_mode: SyncMode = None,
webservice_url: Optional[str] = None,
**kwargs, # Ignore extra kwargs (fsspec passes some)
) -> "LucidLinkOptions":
"""Create options from keyword arguments, applying defaults."""
return cls(
token=token if token is not None else cls.DEFAULT_TOKEN,
sandboxed=sandboxed if sandboxed is not None else cls.DEFAULT_SANDBOXED,
persist_files=persist_files if persist_files is not None else cls.DEFAULT_PERSIST_FILES,
root_path=str(root_path) if root_path is not None else cls.DEFAULT_ROOT_PATH,
sync_mode=sync_mode if sync_mode is not None else cls.DEFAULT_SYNC_MODE,
webservice_url=webservice_url if webservice_url is not None else cls.DEFAULT_WEBSERVICE_URL,
)
def to_kwargs(self) -> Dict[str, Any]:
"""Convert options to kwargs dict for consistent cache key computation."""
return {
"token": self.token,
"sandboxed": self.sandboxed,
"persist_files": self.persist_files,
"root_path": self.root_path,
"sync_mode": self.sync_mode,
"webservice_url": self.webservice_url,
}
try:
from fsspec.spec import AbstractFileSystem, _Cached
FSSPEC_AVAILABLE = True
class _NormalizedCached(_Cached):
"""Metaclass that normalizes kwargs before caching.
fsspec's _Cached metaclass computes cache keys from raw kwargs BEFORE
__init__ runs. This means LucidLinkFileSystem(token='A') and
LucidLinkFileSystem(token='A', sync_mode=SYNC_ALL) produce different
cache keys, even though SYNC_ALL is the default.
Since only one LucidLink daemon can run per process, different cache
keys would cause daemon crashes. This metaclass normalizes kwargs by
round-tripping through LucidLinkOptions before the cache key is computed.
"""
def __call__(cls, *args, **kwargs):
# Normalize by round-tripping through LucidLinkOptions
# Preserve non-LucidLink kwargs (e.g., skip_instance_cache from fsspec)
lucid_kwargs = LucidLinkOptions.from_kwargs(**kwargs).to_kwargs()
other_kwargs = {
k: v for k, v in kwargs.items()
if k not in lucid_kwargs
}
# Clear and rebuild with consistent key order (tokenize is order-sensitive)
kwargs.clear()
kwargs.update(lucid_kwargs)
kwargs.update(other_kwargs)
return super().__call__(*args, **kwargs)
except ImportError:
FSSPEC_AVAILABLE = False
_NormalizedCached = None
[docs]
class AbstractFileSystem:
"""Dummy base class when fsspec is not available."""
pass
[docs]
class LucidLinkFileSystem(
AbstractFileSystem,
metaclass=_NormalizedCached if FSSPEC_AVAILABLE else type
):
"""fsspec filesystem implementation for LucidLink.
Enables standard Python data libraries to access LucidLink filespaces using
URL protocols like: lucidlink://workspace/filespace/path/to/file.csv
Usage with Pandas::
df = pd.read_csv('lucidlink://my-workspace/production-data/data.csv',
storage_options={'token': 'sa_live:...'})
Usage with Dask::
df = dd.read_parquet('lucidlink://workspace/filespace/dataset/*.parquet',
storage_options={'token': 'sa_live:...'})
Usage with PyArrow::
table = pq.read_table('lucidlink://workspace/filespace/data.parquet',
filesystem=LucidLinkFileSystem(token='sa_live:...'))
Direct usage::
fs = LucidLinkFileSystem(token='sa_live:...')
with fs.open('lucidlink://workspace/filespace/file.txt', 'r') as f:
content = f.read()
"""
protocol = PROTOCOL
# Note: cachable = True is inherited from AbstractFileSystem and is required.
# Only one daemon can run per process, so fsspec must reuse filesystem instances.
def __init__(
self,
token: Optional[str] = LucidLinkOptions.DEFAULT_TOKEN,
sandboxed: bool = LucidLinkOptions.DEFAULT_SANDBOXED,
persist_files: bool = LucidLinkOptions.DEFAULT_PERSIST_FILES,
root_path: Optional[Union[str, Path]] = LucidLinkOptions.DEFAULT_ROOT_PATH,
sync_mode: SyncMode = LucidLinkOptions.DEFAULT_SYNC_MODE,
webservice_url: Optional[str] = LucidLinkOptions.DEFAULT_WEBSERVICE_URL,
**kwargs,
):
"""Initialize LucidLink filesystem.
Args:
token: Service account token (sa_live:...)
sandboxed: Use temporary directory (``True``) or persistent ``.lucid`` folder (``False``)
persist_files: Keep files after daemon stops (only for physical mode)
root_path: Custom root path for storage (optional)
sync_mode: Sync mode - ``SYNC_ALL`` (default) syncs on close, ``SYNC_NONE`` skips sync
webservice_url: WebService URL override (for testing against local environments)
**kwargs: Additional fsspec parameters
"""
self._require_fsspec()
super().__init__(**kwargs)
# Normalize options for consistent hashing/caching
self._options = LucidLinkOptions.from_kwargs(
token=token,
sandboxed=sandboxed,
persist_files=persist_files,
root_path=root_path,
sync_mode=sync_mode,
webservice_url=webservice_url,
)
self._daemon: Optional[Daemon] = None
self._url_parser = UrlParser()
self._cache = FilespaceCache()
if self._options.token:
self._initialize_daemon()
@property
def options(self) -> LucidLinkOptions:
"""Read-only access to filesystem configuration options."""
return self._options
def _require_fsspec(self) -> None:
"""Verify fsspec is available."""
if not FSSPEC_AVAILABLE:
raise ImportError(
"fsspec is required for LucidLinkFileSystem. "
"Install with: pip install fsspec"
)
def _initialize_daemon(self) -> None:
"""Initialize and start the LucidLink daemon."""
from . import create_daemon
if self._daemon is not None:
return
config = {}
if self._options.webservice_url:
config["webservice.url"] = self._options.webservice_url
self._daemon = create_daemon(
config=config or None,
sandboxed=self._options.sandboxed,
persist_files=self._options.persist_files,
root_path=self._options.root_path,
)
self._daemon.start()
if self._options.token:
credentials = ServiceAccountCredentials(token=self._options.token)
workspace = self._daemon.authenticate(credentials)
self._cache.set_workspace(workspace)
def _ensure_initialized(self) -> None:
"""Ensure daemon is initialized, raising error if token not provided."""
if self._daemon is None:
if self._options.token is None:
raise LucidLinkError(
"No token provided. Initialize with token or set via "
"storage_options={'token': 'sa_live:...'}"
)
self._initialize_daemon()
def _get_filespace_and_path(self, url: str):
"""Get filespace and file path for a URL.
Args:
url: LucidLink URL
Returns:
Tuple of (Filespace, file_path)
"""
self._ensure_initialized()
parsed = self._url_parser.parse(url)
filespace = self._cache.get_or_connect(parsed)
return filespace, parsed.path
def _open(
self,
path: str,
mode: str = "rb",
block_size: Optional[int] = None,
**kwargs,
):
"""Open a file and return a file-like object.
Args:
path: LucidLink URL
mode: File mode ('r', 'rb', 'w', 'wb', etc.)
block_size: Buffer size (uses default if None)
**kwargs: Additional parameters
Returns:
File-like object (LucidFileStream or buffered/text wrapper)
"""
filespace, file_path = self._get_filespace_and_path(path)
buffering = block_size if block_size is not None else -1
valid_kwargs = {
k: v
for k, v in kwargs.items()
if k in ["encoding", "errors", "newline"]
}
return filespace.fs.open(
file_path,
mode=mode,
buffering=buffering,
**valid_kwargs,
)
[docs]
def ls(
self, path: str, detail: bool = True, **kwargs
) -> Union[List[str], List[Dict[str, Any]]]:
"""List directory contents.
Args:
path: LucidLink URL
detail: Return detailed info (True) or just names (False)
**kwargs: Additional parameters
Returns:
List of full paths (detail=False) or detailed info dicts (detail=True)
"""
filespace, file_path = self._get_filespace_and_path(path)
entries = filespace.fs.read_dir(file_path)
base_path = path.rstrip("/")
if not detail:
return [f"{base_path}/{entry.name}" for entry in entries]
return [
{
"name": f"{base_path}/{entry.name}",
"size": entry.size,
"type": "directory" if entry.is_dir() else "file",
"mtime": entry.mtime,
"created": entry.ctime,
}
for entry in entries
]
[docs]
def info(self, path: str, **kwargs) -> Dict[str, Any]:
"""Get file or directory information.
Args:
path: LucidLink URL
**kwargs: Additional parameters
Returns:
Dictionary with file/directory metadata
"""
filespace, file_path = self._get_filespace_and_path(path)
if not file_path or file_path == "/":
return {
"name": path,
"size": 0,
"type": "directory",
"mtime": 0,
"created": 0,
}
entry = filespace.fs.get_entry(file_path)
return {
"name": entry.name,
"size": entry.size,
"type": "directory" if entry.is_dir() else "file",
"mtime": entry.mtime,
"created": entry.ctime,
}
[docs]
def exists(self, path: str, **kwargs) -> bool:
"""Check if path exists.
Raises:
PermissionError: If token doesn't have access to the workspace or filespace
"""
try:
self.info(path)
return True
except PermissionError:
raise
except Exception:
return False
[docs]
def isdir(self, path: str) -> bool:
"""Check if path is a directory.
Raises:
PermissionError: If token doesn't have access to the workspace or filespace
"""
try:
return self.info(path)["type"] == "directory"
except PermissionError:
raise
except Exception:
return False
[docs]
def isfile(self, path: str) -> bool:
"""Check if path is a file.
Raises:
PermissionError: If token doesn't have access to the workspace or filespace
"""
try:
return self.info(path)["type"] == "file"
except PermissionError:
raise
except Exception:
return False
[docs]
def mkdir(self, path: str, create_parents: bool = True, **kwargs) -> None:
"""Create a directory.
Args:
path: LucidLink URL
create_parents: Create parent directories if needed
**kwargs: Additional parameters
"""
filespace, file_path = self._get_filespace_and_path(path)
filespace.fs.create_dir(file_path)
[docs]
def makedirs(self, path: str, exist_ok: bool = False) -> None:
"""Create directory recursively.
Args:
path: LucidLink URL
exist_ok: Don't raise error if directory exists
"""
filespace, file_path = self._get_filespace_and_path(path)
if not file_path or file_path == "/":
return
if exist_ok and self._directory_exists_quietly(filespace, file_path):
return
self._create_directory_path(filespace, file_path, exist_ok)
def _directory_exists_quietly(self, filespace, file_path: str) -> bool:
"""Check if directory exists without raising exceptions."""
try:
entry = filespace.fs.get_entry(file_path)
return entry.is_dir()
except Exception:
return False
def _create_directory_path(
self, filespace, file_path: str, exist_ok: bool
) -> None:
"""Create all directories in a path."""
parts = [p for p in file_path.split("/") if p]
current = ""
for part in parts:
current += "/" + part
if not filespace.fs.dir_exists(current):
try:
filespace.fs.create_dir(current)
except Exception:
if not exist_ok:
raise
[docs]
def rm(
self,
path: str,
recursive: bool = False,
maxdepth: Optional[int] = None,
) -> None:
"""Remove file or directory.
Args:
path: LucidLink URL
recursive: Remove directory recursively
maxdepth: Maximum recursion depth (ignored, always full recursion)
"""
filespace, file_path = self._get_filespace_and_path(path)
if self.isdir(path):
filespace.fs.delete_dir(file_path, recursive=recursive)
else:
filespace.fs.delete(file_path)
[docs]
def rmdir(self, path: str) -> None:
"""Remove an empty directory.
Args:
path: LucidLink URL
"""
filespace, file_path = self._get_filespace_and_path(path)
filespace.fs.delete_dir(file_path, recursive=False)
[docs]
def mv(
self,
path1: str,
path2: str,
recursive: bool = False,
maxdepth: Optional[int] = None,
**kwargs,
) -> None:
"""Move/rename a file or directory.
Uses native rename operation which is much faster than copy+delete.
Args:
path1: Source LucidLink URL
path2: Destination LucidLink URL
recursive: Ignored (move is always recursive for directories)
maxdepth: Ignored
**kwargs: Additional parameters (ignored)
Raises:
ValueError: If paths are in different filespaces
"""
parsed1, parsed2 = self._url_parser.validate_same_filespace(path1, path2)
filespace = self._cache.get_or_connect(parsed1)
filespace.fs.move(parsed1.path, parsed2.path)
[docs]
def rename(self, path1: str, path2: str, **kwargs) -> None:
"""Rename a file or directory (alias for mv).
Args:
path1: Source LucidLink URL
path2: Destination LucidLink URL
**kwargs: Additional parameters (passed to mv)
"""
self.mv(path1, path2, **kwargs)
[docs]
def cat(
self,
path: str,
start: Optional[int] = None,
end: Optional[int] = None,
**kwargs,
) -> bytes:
"""Read entire file or byte range.
Args:
path: LucidLink URL
start: Start byte position (optional)
end: End byte position (optional)
**kwargs: Additional parameters
Returns:
File contents as bytes
"""
with self._open(path, "rb") as f:
if start is not None:
f.seek(start)
if end is not None:
size = end - (start or 0)
return f.read(size)
return f.read()
[docs]
def cat_file(
self,
path: str,
start: Optional[int] = None,
end: Optional[int] = None,
**kwargs,
) -> bytes:
"""Alias for cat() for compatibility."""
return self.cat(path, start, end, **kwargs)
[docs]
def get(
self, rpath: str, lpath: str, recursive: bool = False, **kwargs
) -> None:
"""Download file from LucidLink to local filesystem.
Args:
rpath: Remote LucidLink URL
lpath: Local filesystem path
recursive: Download directory recursively
**kwargs: Additional parameters
"""
if self.isdir(rpath):
self._download_directory(rpath, lpath, recursive)
else:
self._download_file(rpath, lpath)
def _download_directory(
self, rpath: str, lpath: str, recursive: bool
) -> None:
"""Download a directory recursively."""
if not recursive:
raise ValueError("Cannot download directory without recursive=True")
os.makedirs(lpath, exist_ok=True)
for item in self.ls(rpath, detail=True):
item_name = os.path.basename(item["name"])
remote_item = f"{rpath}/{item_name}"
local_item = os.path.join(lpath, item_name)
if item["type"] == "directory":
self._download_directory(remote_item, local_item, recursive=True)
else:
self._download_file(remote_item, local_item)
def _download_file(self, rpath: str, lpath: str) -> None:
"""Download a single file."""
with self._open(rpath, "rb") as src:
with open(lpath, "wb") as dst:
dst.write(src.read())
[docs]
def put(
self, lpath: str, rpath: str, recursive: bool = False, **kwargs
) -> None:
"""Upload file from local filesystem to LucidLink.
Args:
lpath: Local filesystem path
rpath: Remote LucidLink URL
recursive: Upload directory recursively
**kwargs: Additional parameters
"""
if os.path.isdir(lpath):
self._upload_directory(lpath, rpath, recursive)
else:
self._upload_file(lpath, rpath)
def _upload_directory(self, lpath: str, rpath: str, recursive: bool) -> None:
"""Upload a directory recursively."""
if not recursive:
raise ValueError("Cannot upload directory without recursive=True")
self.makedirs(rpath, exist_ok=True)
for item_name in os.listdir(lpath):
local_item = os.path.join(lpath, item_name)
remote_item = f"{rpath}/{item_name}"
if os.path.isdir(local_item):
self._upload_directory(local_item, remote_item, recursive=True)
else:
self._upload_file(local_item, remote_item)
def _upload_file(self, lpath: str, rpath: str) -> None:
"""Upload a single file."""
with open(lpath, "rb") as src:
with self._open(rpath, "wb") as dst:
dst.write(src.read())
[docs]
def sync_all(self) -> None:
"""Synchronize all pending changes to the hub.
Calls ``sync_all()`` on all connected filespaces to ensure all metadata
and data changes are propagated to LucidHub.
"""
for filespace in self._cache.values():
try:
filespace.sync_all()
except Exception:
pass
[docs]
def close(self) -> None:
"""Close all connections and stop daemon."""
if self._options.sync_mode == SyncMode.SYNC_ALL:
self.sync_all()
if self._options.sandboxed and self._options.sync_mode == SyncMode.SYNC_NONE:
logger.warning(
"Closing LucidLinkFileSystem in sandboxed mode with SYNC_NONE: "
"cache will be deleted but changes may not have been synced to hub"
)
self._cache.unlink_all()
temp_dir = self._get_temp_dir_for_cleanup()
if self._daemon is not None:
try:
self._daemon.stop()
except Exception:
pass
self._daemon = None
self._cache.clear()
self._cleanup_temp_dir(temp_dir)
# Remove this instance from fsspec's cache so a new filesystem can be created
self.clear_instance_cache()
def _get_temp_dir_for_cleanup(self) -> Optional[Path]:
"""Get temp directory path before daemon stops (if sandboxed)."""
if not self._options.sandboxed or self._daemon is None:
return None
storage = getattr(self._daemon, "_storage", None)
if storage is None:
return None
return getattr(storage, "_temp_dir", None)
def _cleanup_temp_dir(self, temp_dir: Optional[Path]) -> None:
"""Delete temp directory if it exists."""
if temp_dir is not None and temp_dir.exists():
try:
shutil.rmtree(temp_dir)
except Exception as e:
logger.debug(f"Failed to delete temp directory {temp_dir}: {e}")
def __del__(self):
"""Cleanup daemon on destruction."""
self.close()
def __enter__(self):
"""Context manager entry."""
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""Context manager exit."""
self.close()
return False