Source code for lucidlink.connect

"""LucidLink Connect - external file management.

Provides the ability to attach existing S3 datasets to a LucidLink filespace
as read-only external files (1:1 file-to-S3-object mapping).
"""

from typing import List, Optional

from .connect_models import (
    DataStoreCredentials,
    DataStoreInfo,
    LinkedFilesResult,
    S3DataStoreConfig,
)
from .connect_validation import (
    normalize_data_store_name,
    normalize_s3_data_store_config,
    validate_access_key,
    validate_data_store_count,
    validate_data_store_name,
    validate_link_file_metadata,
    validate_s3_data_store,
    validate_secret_key,
)


[docs] class ConnectManager: """Manages LucidLink Connect data stores and external file linking. Obtained via ``filespace.connect`` after linking to a filespace. Not intended to be constructed directly. Example: .. code-block:: python connect = filespace.connect connect.add_data_store("my-store", S3DataStoreConfig( access_key="AKIA...", secret_key="...", bucket_name="my-bucket", region="us-east-1", )) connect.link_file("/data/file.csv", "my-store", "path/to/file.csv") """ def __init__(self, native_connect): """Initialize ConnectManager with native Connect wrapper. Args: native_connect: Native PythonConnectWrapper (internal use) """ self._native = native_connect # Data Store Management
[docs] def add_data_store(self, name: str, config: S3DataStoreConfig) -> DataStoreInfo: """Add a new S3 data store. Args: name: Unique name for the data store config: S3 data store configuration Returns: DataStoreInfo for the newly created data store Raises: RuntimeError: If data store creation fails ValueError: If name is empty or config is invalid """ self._ensure_available() name = normalize_data_store_name(name) config = normalize_s3_data_store_config(config) validate_data_store_name(name) validate_s3_data_store(config) validate_data_store_count(len(self.list_data_stores())) result = self._native.add_data_store( name, config.access_key, config.secret_key, config.bucket_name, config.region, config.endpoint, config.url_expiration_minutes, config.use_virtual_addressing, ) return DataStoreInfo.from_dict(result)
[docs] def remove_data_store(self, name: str) -> None: """Remove a data store by name. Args: name: Name of the data store to remove Raises: RuntimeError: If data store not found or removal fails """ self._ensure_available() name = normalize_data_store_name(name) self._native.remove_data_store(name)
[docs] def list_data_stores(self) -> List[DataStoreInfo]: """List all registered data stores. Returns: List of DataStoreInfo objects (secret_key will be empty) Raises: RuntimeError: If operation fails """ self._ensure_available() stores = self._native.list_data_stores() return [DataStoreInfo.from_dict(store) for store in stores]
[docs] def get_data_store(self, name: str) -> Optional[DataStoreInfo]: """Get a data store by name. Args: name: Name of the data store Returns: DataStoreInfo if found (includes decrypted secret_key), None otherwise """ self._ensure_available() name = normalize_data_store_name(name) store = self._native.get_data_store(name) if store is None: return None return DataStoreInfo.from_dict(store)
[docs] def rekey_data_store( self, name: str, credentials: DataStoreCredentials | None = None, *, new_access_key: str | None = None, new_secret_key: str | None = None, ) -> None: """Rotate credentials for a data store. Args: name: Name of the data store credentials: Typed credentials object new_access_key: New S3 access key ID new_secret_key: New S3 secret access key Raises: RuntimeError: If data store not found or rekey fails ValueError: If credentials are missing or empty """ self._ensure_available() name = normalize_data_store_name(name) if credentials is not None: access_key = credentials.access_key secret_key = credentials.secret_key elif new_access_key is not None and new_secret_key is not None: access_key = new_access_key secret_key = new_secret_key else: raise ValueError( "Provide either a credentials object or both new_access_key and new_secret_key" ) validate_access_key(access_key) validate_secret_key(secret_key) self._native.rekey_data_store(name, access_key, secret_key)
# External File Operations
[docs] def list_external_files( self, data_store_name: str, limit: int = 100, cursor: str = "" ) -> LinkedFilesResult: """List external files linked to a data store. Args: data_store_name: Name of the data store limit: Maximum number of results (default: 100) cursor: Opaque pagination token from a previous result (default: start) Returns: LinkedFilesResult with file paths, pagination flag, and cursor """ self._ensure_available() data_store_name = normalize_data_store_name(data_store_name) result = self._native.list_external_files(data_store_name, limit, cursor) return LinkedFilesResult( file_paths=result["file_paths"], file_ids=result["file_ids"], has_more=result["has_more"], cursor=result["cursor"], )
[docs] def count_external_files(self, data_store_name: str) -> int: """Count external files linked to a data store (no path resolution). Much faster than iterating list_external_files() when only the count is needed, as it skips per-file path lookups in the metadata store. Args: data_store_name: Name of the data store Returns: Number of linked files """ self._ensure_available() data_store_name = normalize_data_store_name(data_store_name) return self._native.count_external_files(data_store_name)
[docs] def are_data_stores_available(self) -> bool: """Check if data stores can be managed on this filespace. Verifies both that the filespace version supports external files (V9+) and that the feature has been explicitly configured. Returns: True if data stores are available, False otherwise """ return self._native.are_data_stores_available()
def _ensure_available(self) -> None: if not self.are_data_stores_available(): raise RuntimeError( "Connect (external files) is not available. " "Ensure the filespace is V9+ and the feature is configured." ) def __repr__(self) -> str: return "ConnectManager()"