"""
LucidLink Python Library - Daemon Management
High-level Pythonic wrapper for the LucidLink daemon.
Provides context manager support for daemon lifecycle management.
"""
import threading
import weakref
from typing import Dict, List, Optional
from pathlib import Path
import shutil
from .credentials import ServiceAccountCredentials
from .exceptions import DaemonError
from .filespace_models import DaemonStatus, FilespaceInfo
from .storage import StorageConfig, StorageMode
from .workspace import Workspace
# Import the native C++ extension module (relative import within package)
try:
from . import lucidlink_native
except ImportError as e:
raise ImportError(
"Failed to import lucidlink_native module. "
"Make sure the C++ extension module is built and in your Python path."
) from e
# Module-level singleton tracking
# Only one daemon can be active per process due to C++ global state
_active_daemon: Optional[weakref.ref] = None
_daemon_lock = threading.Lock()
[docs]
class Daemon:
"""
High-level Python wrapper for the LucidLink daemon.
The Daemon class manages the lifecycle of the LucidLink daemon process,
handles authentication, and provides access to workspaces and filespaces.
Example:
.. code-block:: python
from lucidlink import Daemon, ServiceAccountCredentials
# Create credentials
credentials = ServiceAccountCredentials(token="sa_live:your_key")
# Start daemon and authenticate
daemon = Daemon(config)
daemon.start()
workspace = daemon.authenticate(credentials)
# List and link to filespace
filespaces = workspace.list_filespaces()
filespace = workspace.link_filespace(name="production-data")
# Use filesystem
entries = filespace.fs.read_dir("/")
# Cleanup
daemon.stop()
"""
def __init__(
self,
config: Optional[Dict[str, str]] = None,
storage: Optional[StorageConfig] = None
):
"""
Initialize the daemon with configuration and storage settings.
Args:
config: Optional configuration dictionary. Common keys:
- "fs.cache.location": Cache directory path (overridden by storage)
- "fs.cache.size": Cache size in MB (as string)
- "network.timeout": Network operation timeout
If ``None``, uses default configuration.
storage: Storage configuration for daemon files.
If ``None``, defaults to ``SANDBOXED`` mode (temp directory, always cleaned up).
See ``StorageConfig`` for details on ``PHYSICAL`` and ``SANDBOXED`` modes.
Raises:
ConfigurationError: If configuration is invalid
DaemonError: If daemon initialization fails
Example:
.. code-block:: python
# Default (sandboxed mode)
daemon = Daemon()
# Physical mode with cleanup
from lucidlink.storage import StorageConfig, StorageMode
daemon = Daemon(storage=StorageConfig(mode=StorageMode.PHYSICAL))
# Physical mode with persistence
daemon = Daemon(storage=StorageConfig(
mode=StorageMode.PHYSICAL,
persist_on_exit=True
))
"""
# Default to sandboxed mode for safety
self._storage = storage or StorageConfig(mode=StorageMode.SANDBOXED)
# Build internal config from storage settings
internal_config = config.copy() if config else {}
root = self._storage.get_root_path()
# Set paths based on storage configuration
# Pass .lucid directory path with native separators (backslashes on Windows)
# The daemon will extract .lucid and create per-filespace UUID subdirectories
root_str = str(root)
internal_config.setdefault("rootPath", root_str)
internal_config.setdefault("configPath", root_str)
# Note: DO NOT set fs.cache.location - let daemon construct it automatically
try:
self._native = lucidlink_native.Daemon(internal_config)
except Exception as e:
raise DaemonError(f"Failed to initialize daemon: {e}") from e
self._started = False
self._active_workspace = None
def _stop_active_workspace(self) -> None:
"""Stop and clear the active workspace (syncs + unlinks filespace)."""
if self._active_workspace is not None:
try:
self._active_workspace.stop()
except Exception:
pass
self._active_workspace = None
[docs]
def start(self) -> None:
"""
Start the daemon services.
Must be called before authentication or filespace linking.
Safe to call multiple times - subsequent calls are no-ops.
Note: Only one daemon can be active per process due to C++ global state.
Attempting to start a second daemon while one is already running will
raise DaemonError.
Raises:
DaemonError: If daemon start fails or another daemon is already active
"""
global _active_daemon
if self._started:
return # Already started
with _daemon_lock:
# Check if another daemon is already active
if _active_daemon is not None:
existing = _active_daemon()
if existing is not None and existing._started:
raise DaemonError(
"Another daemon is already active. "
"Only one daemon can run per process due to C++ global state. "
"Call stop() on the existing daemon first."
)
try:
self._native.start()
self._started = True
_active_daemon = weakref.ref(self)
except Exception as e:
raise DaemonError(f"Failed to start daemon: {e}") from e
[docs]
def stop(self) -> None:
"""
Stop the daemon and cleanup resources.
Automatically unlinks any linked filespaces before stopping.
Cleans up operational files based on storage configuration:
- ``SANDBOXED`` mode: Always cleans up temp directory
- ``PHYSICAL`` mode: Cleans up if ``persist_on_exit=False``
Safe to call multiple times - subsequent calls are no-ops.
Raises:
DaemonError: If daemon stop fails
"""
global _active_daemon
if not self._started:
return # Already stopped
try:
self._stop_active_workspace()
self._native.stop()
self._started = False
# Clear the singleton reference
with _daemon_lock:
if _active_daemon is not None:
existing = _active_daemon()
if existing is self:
_active_daemon = None
# Clean up physical mode files if not persisting
if self._storage.should_cleanup():
root = self._storage.get_root_path()
# root is the .lucid directory itself
if root.exists():
try:
shutil.rmtree(root, ignore_errors=True)
except Exception:
# Ignore cleanup errors - daemon is already stopped
pass
except Exception as e:
raise DaemonError(f"Failed to stop daemon: {e}") from e
[docs]
def is_running(self) -> bool:
"""
Check if the daemon is currently running.
Returns:
True if daemon is started, False otherwise
"""
return self._started
[docs]
def authenticate(self, credentials: ServiceAccountCredentials) -> Workspace:
"""
Authenticate to LucidLink using service account credentials.
Must call start() before authenticate().
Args:
credentials: ServiceAccountCredentials object containing the service account token
Returns:
Workspace object providing access to filespace operations
Raises:
DaemonError: If daemon is not started
AuthenticationError: If authentication fails
ValueError: If credentials are invalid
Example:
.. code-block:: python
from lucidlink import Daemon, ServiceAccountCredentials
credentials = ServiceAccountCredentials(token="sa_live:your_key")
daemon = Daemon()
daemon.start()
workspace = daemon.authenticate(credentials)
print(workspace.name)
"""
if not self._started:
raise DaemonError("Daemon must be started before authentication")
try:
self._stop_active_workspace()
# Call native authenticate method - returns dict with workspace context
workspace_context = self._native.authenticate(credentials.token)
# Create Workspace object from context
workspace = Workspace(
native_daemon=self._native,
workspace_id=workspace_context["workspace_id"],
workspace_name=workspace_context["workspace_name"]
)
self._active_workspace = workspace
return workspace
except Exception as e:
raise DaemonError(f"Authentication failed: {e}") from e
# Helper methods called by Workspace class
def list_filespaces(self, workspace_id: str) -> List[FilespaceInfo]:
"""
List all filespaces in a workspace.
This is called internally by Workspace.list_filespaces().
Users should call workspace.list_filespaces() instead.
Args:
workspace_id: Workspace ID
Returns:
List of FilespaceInfo objects
Raises:
DaemonError: If request fails
"""
if not self._started:
raise DaemonError("Daemon must be started")
try:
return [
FilespaceInfo(id=d["id"], name=d["name"], created=d["created"])
for d in self._native.list_filespaces(workspace_id)
]
except Exception as e:
raise DaemonError(f"Failed to list filespaces: {e}") from e
def link_filespace(
self,
workspace_id: str,
filespace_name: str,
filespace_id: str,
root_path: str
):
"""
Link to a filespace (by name or ID).
This is called internally by Workspace.link_filespace().
Users should call workspace.link_filespace() instead.
Args:
workspace_id: Workspace ID
filespace_name: Filespace name (empty if using ID)
filespace_id: Filespace ID (empty if using name)
root_path: Mount point path
Returns:
Native filesystem wrapper
Raises:
DaemonError: If linking fails
"""
if not self._started:
raise DaemonError("Daemon must be started")
try:
return self._native.link_filespace(
workspace_id=workspace_id,
filespace_name=filespace_name,
filespace_id=filespace_id,
root_path=root_path
)
except Exception as e:
raise DaemonError(f"Failed to link filespace: {e}") from e
def unlink_filespace(self) -> None:
"""
Unlink from the currently linked filespace.
This is called internally by Filespace.unlink().
Users should call filespace.unlink() instead.
Raises:
DaemonError: If unlinking fails
"""
try:
self._native.unlink_filespace()
except Exception as e:
raise DaemonError(f"Failed to unlink filespace: {e}") from e
def get_status(self) -> DaemonStatus:
"""
Get daemon status information.
Returns:
DaemonStatus with is_running, is_authenticated, is_linked, root_path
Raises:
DaemonError: If getting status fails
"""
try:
native_status = self._native.get_status()
return DaemonStatus(
is_running=native_status.get("running") == "true",
is_authenticated=native_status.get("authenticated") == "true",
is_linked=native_status.get("linked") == "true",
root_path=str(self._storage.get_root_path()),
)
except Exception as e:
raise DaemonError(f"Failed to get daemon status: {e}") from e
def __enter__(self) -> "Daemon":
"""
Enter context manager - starts the daemon.
Returns:
Self for use in 'with' statement
"""
self.start()
return self
def __exit__(self, exc_type, exc_val, exc_tb) -> None:
"""
Exit context manager - stops the daemon.
Args:
exc_type: Exception type (if exception occurred)
exc_val: Exception value (if exception occurred)
exc_tb: Exception traceback (if exception occurred)
"""
self.stop()
def __repr__(self) -> str:
"""String representation of the Daemon object."""
status = "running" if self._started else "stopped"
return f"Daemon({status})"