"""LucidLink Connect - external file management.
Provides the ability to attach existing S3 datasets to a LucidLink filespace
as read-only external files (1:1 file-to-S3-object mapping).
"""
from typing import List, Optional
from .connect_models import (
DataStoreCredentials,
DataStoreInfo,
LinkedFilesResult,
S3DataStoreConfig,
)
from .connect_validation import (
normalize_data_store_name,
normalize_s3_data_store_config,
validate_access_key,
validate_data_store_count,
validate_data_store_name,
validate_link_file_metadata,
validate_s3_data_store,
validate_secret_key,
)
[docs]
class ConnectManager:
"""Manages LucidLink Connect data stores and external file linking.
Obtained via ``filespace.connect`` after linking to a filespace.
Not intended to be constructed directly.
Example:
.. code-block:: python
connect = filespace.connect
connect.add_data_store("my-store", S3DataStoreConfig(
access_key="AKIA...",
secret_key="...",
bucket_name="my-bucket",
region="us-east-1",
))
connect.link_file("/data/file.csv", "my-store", "path/to/file.csv")
"""
def __init__(self, native_connect):
"""Initialize ConnectManager with native Connect wrapper.
Args:
native_connect: Native PythonConnectWrapper (internal use)
"""
self._native = native_connect
# Data Store Management
[docs]
def add_data_store(self, name: str, config: S3DataStoreConfig) -> DataStoreInfo:
"""Add a new S3 data store.
Args:
name: Unique name for the data store
config: S3 data store configuration
Returns:
DataStoreInfo for the newly created data store
Raises:
RuntimeError: If data store creation fails
ValueError: If name is empty or config is invalid
"""
self._ensure_available()
name = normalize_data_store_name(name)
config = normalize_s3_data_store_config(config)
validate_data_store_name(name)
validate_s3_data_store(config)
validate_data_store_count(len(self.list_data_stores()))
result = self._native.add_data_store(
name,
config.access_key,
config.secret_key,
config.bucket_name,
config.region,
config.endpoint,
config.url_expiration_minutes,
config.use_virtual_addressing,
)
return DataStoreInfo.from_dict(result)
[docs]
def remove_data_store(self, name: str) -> None:
"""Remove a data store by name.
Args:
name: Name of the data store to remove
Raises:
RuntimeError: If data store not found or removal fails
"""
self._ensure_available()
name = normalize_data_store_name(name)
self._native.remove_data_store(name)
[docs]
def list_data_stores(self) -> List[DataStoreInfo]:
"""List all registered data stores.
Returns:
List of DataStoreInfo objects (secret_key will be empty)
Raises:
RuntimeError: If operation fails
"""
self._ensure_available()
stores = self._native.list_data_stores()
return [DataStoreInfo.from_dict(store) for store in stores]
[docs]
def get_data_store(self, name: str) -> Optional[DataStoreInfo]:
"""Get a data store by name.
Args:
name: Name of the data store
Returns:
DataStoreInfo if found (includes decrypted secret_key), None otherwise
"""
self._ensure_available()
name = normalize_data_store_name(name)
store = self._native.get_data_store(name)
if store is None:
return None
return DataStoreInfo.from_dict(store)
[docs]
def rekey_data_store(
self,
name: str,
credentials: DataStoreCredentials | None = None,
*,
new_access_key: str | None = None,
new_secret_key: str | None = None,
) -> None:
"""Rotate credentials for a data store.
Args:
name: Name of the data store
credentials: Typed credentials object
new_access_key: New S3 access key ID
new_secret_key: New S3 secret access key
Raises:
RuntimeError: If data store not found or rekey fails
ValueError: If credentials are missing or empty
"""
self._ensure_available()
name = normalize_data_store_name(name)
if credentials is not None:
access_key = credentials.access_key
secret_key = credentials.secret_key
elif new_access_key is not None and new_secret_key is not None:
access_key = new_access_key
secret_key = new_secret_key
else:
raise ValueError(
"Provide either a credentials object or both new_access_key and new_secret_key"
)
validate_access_key(access_key)
validate_secret_key(secret_key)
self._native.rekey_data_store(name, access_key, secret_key)
# External File Operations
[docs]
def link_file(
self,
file_path: str,
data_store_name: str,
object_id: str,
size: int | None = None,
checksum: str = "",
) -> None:
"""Link an S3 object as an external file in the filespace.
Creates a read-only file entry that maps to an S3 object via presigned URL.
Args:
file_path: Path in the filespace for the new file
data_store_name: Name of the data store containing the object
object_id: S3 object key
size: Optional object size in bytes. When provided together with
checksum, skips the S3 HeadObject call (much faster for bulk ops).
checksum: Optional object ETag/checksum. When provided together with
size, skips the S3 HeadObject call.
Raises:
RuntimeError: If data store not found or link fails
ValueError: If only one of size/checksum is provided
"""
self._ensure_available()
data_store_name = normalize_data_store_name(data_store_name)
validate_link_file_metadata(size, checksum)
self._native.link_file(file_path, data_store_name, object_id, size, checksum)
[docs]
def unlink_file(self, file_path: str) -> None:
"""Remove an external file link from the filespace.
Args:
file_path: Path of the external file to remove
Raises:
RuntimeError: If file not found or unlink fails
"""
self._ensure_available()
self._native.unlink_file(file_path)
[docs]
def list_external_files(
self, data_store_name: str, limit: int = 100, cursor: str = ""
) -> LinkedFilesResult:
"""List external files linked to a data store.
Args:
data_store_name: Name of the data store
limit: Maximum number of results (default: 100)
cursor: Opaque pagination token from a previous result (default: start)
Returns:
LinkedFilesResult with file paths, pagination flag, and cursor
"""
self._ensure_available()
data_store_name = normalize_data_store_name(data_store_name)
result = self._native.list_external_files(data_store_name, limit, cursor)
return LinkedFilesResult(
file_paths=result["file_paths"],
file_ids=result["file_ids"],
has_more=result["has_more"],
cursor=result["cursor"],
)
[docs]
def count_external_files(self, data_store_name: str) -> int:
"""Count external files linked to a data store (no path resolution).
Much faster than iterating list_external_files() when only the count
is needed, as it skips per-file path lookups in the metadata store.
Args:
data_store_name: Name of the data store
Returns:
Number of linked files
"""
self._ensure_available()
data_store_name = normalize_data_store_name(data_store_name)
return self._native.count_external_files(data_store_name)
[docs]
def are_data_stores_available(self) -> bool:
"""Check if data stores can be managed on this filespace.
Verifies both that the filespace version supports external files (V9+)
and that the feature has been explicitly configured.
Returns:
True if data stores are available, False otherwise
"""
return self._native.are_data_stores_available()
def _ensure_available(self) -> None:
if not self.are_data_stores_available():
raise RuntimeError(
"Connect (external files) is not available. "
"Ensure the filespace is V9+ and the feature is configured."
)
def __repr__(self) -> str:
return "ConnectManager()"