Source code for lucidlink.daemon

"""
LucidLink Python Library - Daemon Management

High-level Pythonic wrapper for the LucidLink daemon.
Provides context manager support for daemon lifecycle management.
"""

import threading
import weakref
from typing import Dict, List, Optional
from pathlib import Path
import shutil

from .credentials import ServiceAccountCredentials
from .exceptions import DaemonError
from .filespace_models import DaemonStatus, FilespaceInfo
from .storage import StorageConfig, StorageMode
from .workspace import Workspace

# Import the native C++ extension module (relative import within package)
try:
    from . import lucidlink_native
except ImportError as e:
    raise ImportError(
        "Failed to import lucidlink_native module. "
        "Make sure the C++ extension module is built and in your Python path."
    ) from e


# Module-level singleton tracking
# Only one daemon can be active per process due to C++ global state
_active_daemon: Optional[weakref.ref] = None
_daemon_lock = threading.Lock()


[docs] class Daemon: """ High-level Python wrapper for the LucidLink daemon. The Daemon class manages the lifecycle of the LucidLink daemon process, handles authentication, and provides access to workspaces and filespaces. Example: .. code-block:: python from lucidlink import Daemon, ServiceAccountCredentials # Create credentials credentials = ServiceAccountCredentials(token="sa_live:your_key") # Start daemon and authenticate daemon = Daemon(config) daemon.start() workspace = daemon.authenticate(credentials) # List and link to filespace filespaces = workspace.list_filespaces() filespace = workspace.link_filespace(name="production-data") # Use filesystem entries = filespace.fs.read_dir("/") # Cleanup daemon.stop() """ def __init__( self, config: Optional[Dict[str, str]] = None, storage: Optional[StorageConfig] = None ): """ Initialize the daemon with configuration and storage settings. Args: config: Optional configuration dictionary. Common keys: - "fs.cache.location": Cache directory path (overridden by storage) - "fs.cache.size": Cache size in MB (as string) - "network.timeout": Network operation timeout If ``None``, uses default configuration. storage: Storage configuration for daemon files. If ``None``, defaults to ``SANDBOXED`` mode (temp directory, always cleaned up). See ``StorageConfig`` for details on ``PHYSICAL`` and ``SANDBOXED`` modes. Raises: ConfigurationError: If configuration is invalid DaemonError: If daemon initialization fails Example: .. code-block:: python # Default (sandboxed mode) daemon = Daemon() # Physical mode with cleanup from lucidlink.storage import StorageConfig, StorageMode daemon = Daemon(storage=StorageConfig(mode=StorageMode.PHYSICAL)) # Physical mode with persistence daemon = Daemon(storage=StorageConfig( mode=StorageMode.PHYSICAL, persist_on_exit=True )) """ # Default to sandboxed mode for safety self._storage = storage or StorageConfig(mode=StorageMode.SANDBOXED) # Build internal config from storage settings internal_config = config.copy() if config else {} root = self._storage.get_root_path() # Set paths based on storage configuration # Pass .lucid directory path with native separators (backslashes on Windows) # The daemon will extract .lucid and create per-filespace UUID subdirectories root_str = str(root) internal_config.setdefault("rootPath", root_str) internal_config.setdefault("configPath", root_str) # Note: DO NOT set fs.cache.location - let daemon construct it automatically try: self._native = lucidlink_native.Daemon(internal_config) except Exception as e: raise DaemonError(f"Failed to initialize daemon: {e}") from e self._started = False self._active_workspace = None def _stop_active_workspace(self) -> None: """Stop and clear the active workspace (syncs + unlinks filespace).""" if self._active_workspace is not None: try: self._active_workspace.stop() except Exception: pass self._active_workspace = None
[docs] def start(self) -> None: """ Start the daemon services. Must be called before authentication or filespace linking. Safe to call multiple times - subsequent calls are no-ops. Note: Only one daemon can be active per process due to C++ global state. Attempting to start a second daemon while one is already running will raise DaemonError. Raises: DaemonError: If daemon start fails or another daemon is already active """ global _active_daemon if self._started: return # Already started with _daemon_lock: # Check if another daemon is already active if _active_daemon is not None: existing = _active_daemon() if existing is not None and existing._started: raise DaemonError( "Another daemon is already active. " "Only one daemon can run per process due to C++ global state. " "Call stop() on the existing daemon first." ) try: self._native.start() self._started = True _active_daemon = weakref.ref(self) except Exception as e: raise DaemonError(f"Failed to start daemon: {e}") from e
[docs] def stop(self) -> None: """ Stop the daemon and cleanup resources. Automatically unlinks any linked filespaces before stopping. Cleans up operational files based on storage configuration: - ``SANDBOXED`` mode: Always cleans up temp directory - ``PHYSICAL`` mode: Cleans up if ``persist_on_exit=False`` Safe to call multiple times - subsequent calls are no-ops. Raises: DaemonError: If daemon stop fails """ global _active_daemon if not self._started: return # Already stopped try: self._stop_active_workspace() self._native.stop() self._started = False # Clear the singleton reference with _daemon_lock: if _active_daemon is not None: existing = _active_daemon() if existing is self: _active_daemon = None # Clean up physical mode files if not persisting if self._storage.should_cleanup(): root = self._storage.get_root_path() # root is the .lucid directory itself if root.exists(): try: shutil.rmtree(root, ignore_errors=True) except Exception: # Ignore cleanup errors - daemon is already stopped pass except Exception as e: raise DaemonError(f"Failed to stop daemon: {e}") from e
[docs] def is_running(self) -> bool: """ Check if the daemon is currently running. Returns: True if daemon is started, False otherwise """ return self._started
[docs] def authenticate(self, credentials: ServiceAccountCredentials) -> Workspace: """ Authenticate to LucidLink using service account credentials. Must call start() before authenticate(). Args: credentials: ServiceAccountCredentials object containing the service account token Returns: Workspace object providing access to filespace operations Raises: DaemonError: If daemon is not started AuthenticationError: If authentication fails ValueError: If credentials are invalid Example: .. code-block:: python from lucidlink import Daemon, ServiceAccountCredentials credentials = ServiceAccountCredentials(token="sa_live:your_key") daemon = Daemon() daemon.start() workspace = daemon.authenticate(credentials) print(workspace.name) """ if not self._started: raise DaemonError("Daemon must be started before authentication") try: self._stop_active_workspace() # Call native authenticate method - returns dict with workspace context workspace_context = self._native.authenticate(credentials.token) # Create Workspace object from context workspace = Workspace( native_daemon=self._native, workspace_id=workspace_context["workspace_id"], workspace_name=workspace_context["workspace_name"] ) self._active_workspace = workspace return workspace except Exception as e: raise DaemonError(f"Authentication failed: {e}") from e
# Helper methods called by Workspace class def list_filespaces(self, workspace_id: str) -> List[FilespaceInfo]: """ List all filespaces in a workspace. This is called internally by Workspace.list_filespaces(). Users should call workspace.list_filespaces() instead. Args: workspace_id: Workspace ID Returns: List of FilespaceInfo objects Raises: DaemonError: If request fails """ if not self._started: raise DaemonError("Daemon must be started") try: return [ FilespaceInfo(id=d["id"], name=d["name"], created=d["created"]) for d in self._native.list_filespaces(workspace_id) ] except Exception as e: raise DaemonError(f"Failed to list filespaces: {e}") from e def link_filespace( self, workspace_id: str, filespace_name: str, filespace_id: str, root_path: str ): """ Link to a filespace (by name or ID). This is called internally by Workspace.link_filespace(). Users should call workspace.link_filespace() instead. Args: workspace_id: Workspace ID filespace_name: Filespace name (empty if using ID) filespace_id: Filespace ID (empty if using name) root_path: Mount point path Returns: Native filesystem wrapper Raises: DaemonError: If linking fails """ if not self._started: raise DaemonError("Daemon must be started") try: return self._native.link_filespace( workspace_id=workspace_id, filespace_name=filespace_name, filespace_id=filespace_id, root_path=root_path ) except Exception as e: raise DaemonError(f"Failed to link filespace: {e}") from e def unlink_filespace(self) -> None: """ Unlink from the currently linked filespace. This is called internally by Filespace.unlink(). Users should call filespace.unlink() instead. Raises: DaemonError: If unlinking fails """ try: self._native.unlink_filespace() except Exception as e: raise DaemonError(f"Failed to unlink filespace: {e}") from e def get_status(self) -> DaemonStatus: """ Get daemon status information. Returns: DaemonStatus with is_running, is_authenticated, is_linked, root_path Raises: DaemonError: If getting status fails """ try: native_status = self._native.get_status() return DaemonStatus( is_running=native_status.get("running") == "true", is_authenticated=native_status.get("authenticated") == "true", is_linked=native_status.get("linked") == "true", root_path=str(self._storage.get_root_path()), ) except Exception as e: raise DaemonError(f"Failed to get daemon status: {e}") from e def __enter__(self) -> "Daemon": """ Enter context manager - starts the daemon. Returns: Self for use in 'with' statement """ self.start() return self def __exit__(self, exc_type, exc_val, exc_tb) -> None: """ Exit context manager - stops the daemon. Args: exc_type: Exception type (if exception occurred) exc_val: Exception value (if exception occurred) exc_tb: Exception traceback (if exception occurred) """ self.stop() def __repr__(self) -> str: """String representation of the Daemon object.""" status = "running" if self._started else "stopped" return f"Daemon({status})"