Source code for lucidlink.filesystem

"""Filesystem operations on a linked LucidLink filespace.

Provides file, directory, metadata, and locking operations.
Accessed via filespace.fs after linking to a filespace.
"""

import io
from typing import Dict, List, Optional, Union

from .exceptions import FilespaceError
from .file_modes import parse_mode, is_text_mode, ensure_binary_mode, get_buffered_wrapper_type
from .filesystem_models import DirEntry, FilespaceSize, FilespaceStatistics
from .stream import LucidFileStream

READ_CHUNK_SIZE = 30 * 1024 * 1024  # 30MB


[docs] class FileHandle: """ Context manager for file handles (legacy API). Provides Pythonic file operations with automatic handle cleanup. Use with 'with' statement to ensure handles are properly closed. Example: .. code-block:: python with fs.open_legacy("/path/to/file.txt", "r") as fh: data = fh.read() # Handle automatically closed """ def __init__(self, filesystem: "Filesystem", path: str, mode: str): self._filesystem = filesystem self._path = path self._mode = mode self._handle_id: Optional[int] = None self._closed = False def __enter__(self) -> "FileHandle": if self._handle_id is not None: raise FilespaceError("File handle already open") try: self._handle_id = self._filesystem._native.open(self._path, self._mode) return self except Exception as e: raise FilespaceError(f"Failed to open file '{self._path}': {e}") from e def __exit__(self, exc_type, exc_val, exc_tb) -> None: self.close()
[docs] def read(self, size: int = -1, offset: int = 0) -> bytes: """ Read data from the file. Args: size: Number of bytes to read (-1 = read entire file) offset: Byte offset to start reading from Returns: Bytes read from the file """ if self._handle_id is None: raise FilespaceError("File handle is not open") if self._closed: raise FilespaceError("Cannot read from closed file") try: if size == -1: size = 1024 * 1024 * 1024 # 1GB max per read return self._filesystem._native.read(self._handle_id, size, offset) except Exception as e: raise FilespaceError(f"Failed to read from file '{self._path}': {e}") from e
[docs] def write(self, data: bytes, offset: int = 0) -> None: """ Write data to the file. Args: data: Bytes to write to the file offset: Byte offset to start writing at """ if self._handle_id is None: raise FilespaceError("File handle is not open") if self._closed: raise FilespaceError("Cannot write to closed file") if not isinstance(data, bytes): raise TypeError("Data must be bytes") try: self._filesystem._native.write(self._handle_id, data, offset) except Exception as e: raise FilespaceError(f"Failed to write to file '{self._path}': {e}") from e
[docs] def close(self) -> None: """Close the file handle. Safe to call multiple times.""" if self._closed or self._handle_id is None: return try: self._filesystem._native.close(self._handle_id) self._closed = True self._handle_id = None except Exception as e: self._closed = True self._handle_id = None raise FilespaceError(f"Failed to close file '{self._path}': {e}") from e
@property def closed(self) -> bool: """Check if the file handle is closed.""" return self._closed @property def path(self) -> str: """Get the file path.""" return self._path @property def mode(self) -> str: """Get the file open mode.""" return self._mode def __repr__(self) -> str: status = "closed" if self._closed else "open" return f"FileHandle(path='{self._path}', mode='{self._mode}', {status})"
[docs] class Filesystem: """ Filesystem operations on a linked LucidLink filespace. Provides file, directory, metadata, and locking operations. Obtained via the filespace.fs property after linking. Example: .. code-block:: python filespace = workspace.link_filespace(name="production-data") entries = filespace.fs.read_dir("/") filespace.fs.create_dir("/new-folder") with filespace.fs.open("/file.txt", "wb") as f: f.write(b"data") """ def __init__(self, native_fs): """ Initialize filesystem wrapper. Args: native_fs: Native filesystem wrapper (internal use) Note: This constructor is called internally by Filespace. Users should not construct Filesystem objects directly. """ self._native = native_fs # Directory Operations
[docs] def read_dir(self, path: str) -> List[DirEntry]: """ List directory contents. Args: path: Directory path to list Returns: List of DirEntry objects with file entry information. Raises: NotADirectoryError: If path is not a directory FileNotFoundError: If directory doesn't exist PermissionError: If no read permission """ return [_to_dir_entry(e) for e in self._native.read_dir(path)]
[docs] def create_dir(self, path: str) -> None: """ Create a directory. Creates parent directories if they don't exist (like mkdir -p). Args: path: Directory path to create Raises: FileExistsError: If directory already exists PermissionError: If no create permission """ self._native.create_dir(path)
[docs] def delete_dir(self, path: str, recursive: bool = False) -> None: """ Delete a directory. Args: path: Directory path to delete recursive: If True, delete non-empty directories (default: False) Raises: FileNotFoundError: If directory doesn't exist NotADirectoryError: If path is not a directory OSError: If directory is not empty and recursive=False PermissionError: If no delete permission """ self._native.delete_dir(path, recursive)
[docs] def dir_exists(self, path: str) -> bool: """ Check if a directory exists. Args: path: Directory path to check Returns: True if directory exists, False otherwise """ return self._native.dir_exists(path)
# File Operations
[docs] def open( self, path: str, mode: str = "rb", buffering: int = -1, encoding: Optional[str] = None, errors: Optional[str] = None, newline: Optional[str] = None, lock_type: str = "" ) -> Union[LucidFileStream, io.BufferedReader, io.BufferedWriter, io.TextIOWrapper]: """ Open a file with streaming support. Returns an io.RawIOBase-compatible stream that works with standard Python libraries and third-party packages (Pandas, LangChain, PyTorch, etc.). Args: path: File path to open mode: Open mode (default: 'rb'): - 'r': Read text (default encoding: utf-8) - 'w': Write text (create/truncate, default encoding: utf-8) - 'a': Append text (default encoding: utf-8) - 'r+': Read/write text (default encoding: utf-8) - 'w+': Write/read text (create/truncate, default encoding: utf-8) - 'rb': Read binary - 'wb': Write binary (create/truncate) - 'ab': Append binary - 'r+b': Read/write binary - 'w+b': Write/read binary (create/truncate) - 'a+b': Append/read binary - 'rt': Read text (explicit, same as 'r') - 'wt': Write text (explicit, same as 'w') buffering: Buffer size: - -1 (default): Use system default (8192 bytes) - 0: Unbuffered (binary modes only) - 1: Line buffered (text mode only) - >1: Buffer size in bytes encoding: Text encoding (e.g., 'utf-8', 'latin-1') Required for text modes ('r', 'w' without 'b') errors: Error handling ('strict', 'ignore', 'replace') newline: Newline handling (None, '', '\\n', '\\r', '\\r\\n') lock_type: Lock type - "" (no lock), "shared" (read), "exclusive" (write). Lock is held for lifetime of file handle and released on close. Returns: File stream object supporting read, write, seek, tell operations. Compatible with context managers (with statement). Example: .. code-block:: python # Binary mode with filespace.fs.open("/file.dat", "rb") as f: data = f.read() # Text mode with filespace.fs.open("/file.txt", "rt", encoding="utf-8") as f: for line in f: print(line.strip()) # With Pandas with filespace.fs.open("/data.csv", "rb") as f: df = pd.read_csv(f) # Writing with filespace.fs.open("/output.txt", "wb") as f: f.write(b"Hello, LucidLink!") # With exclusive locking (SQLite-style) with filespace.fs.open("/db.sqlite", "r+b", lock_type="exclusive") as f: data = f.read() Raises: FileNotFoundError: If file doesn't exist (read mode) PermissionError: If no access permission ValueError: If mode is invalid """ text_mode = is_text_mode(mode) # Default encoding for text mode if text_mode and encoding is None: encoding = "utf-8" # Ensure binary mode for raw stream binary_mode = ensure_binary_mode(mode) if text_mode else mode # Create raw stream raw = LucidFileStream(self._native, path, binary_mode, lock_type) # Handle unbuffered binary mode if buffering == 0: if text_mode: raise ValueError("can't have unbuffered text I/O") return raw # Determine buffer size if buffering < 0: buffering = io.DEFAULT_BUFFER_SIZE # Create buffered stream using shared utility wrapper_type = get_buffered_wrapper_type(mode) buffered: io.BufferedIOBase if wrapper_type == "random": buffered = io.BufferedRandom(raw, buffering) elif wrapper_type == "writer": buffered = io.BufferedWriter(raw, buffering) else: buffered = io.BufferedReader(raw, buffering) # Wrap in TextIOWrapper if text mode if text_mode: return io.TextIOWrapper( buffered, encoding=encoding, errors=errors or "strict", newline=newline, ) return buffered
[docs] def open_legacy(self, path: str, mode: str = "r") -> FileHandle: """ Open a file using legacy FileHandle API (deprecated). This method is preserved for backward compatibility. New code should use open() instead, which returns a more capable io.RawIOBase stream. Args: path: File path to open mode: Open mode ('r', 'w', 'a') Returns: FileHandle context manager Raises: FileNotFoundError: If file doesn't exist (read mode) PermissionError: If no access permission """ return FileHandle(self, path, mode)
[docs] def create(self, path: str) -> int: """ Create a new file and return a handle ID. Use open() with mode="w" for most cases. This is a low-level API. Args: path: File path to create Returns: File handle ID (use with read/write/close methods) Raises: FileExistsError: If file already exists PermissionError: If no create permission """ return self._native.create(path)
[docs] def delete(self, path: str) -> None: """ Delete a file. Args: path: File path to delete Raises: FileNotFoundError: If file doesn't exist IsADirectoryError: If path is a directory PermissionError: If no delete permission """ self._native.delete(path)
[docs] def move(self, src: str, dst: str) -> None: """ Move/rename a file or directory. Args: src: Source path dst: Destination path Raises: FileNotFoundError: If source doesn't exist FileExistsError: If destination already exists PermissionError: If no move permission """ self._native.move(src, dst)
[docs] def file_exists(self, path: str) -> bool: """ Check if a file exists. Args: path: File path to check Returns: True if file exists, False otherwise """ return self._native.file_exists(path)
[docs] def truncate(self, path: str, size: int) -> None: """ Truncate or extend file to specified size. Args: path: File path to truncate size: New file size in bytes Raises: FileNotFoundError: If file doesn't exist PermissionError: If no write permission RuntimeError: If truncation fails """ self._native.set_end_of_file(path, size)
# Metadata Operations
[docs] def get_entry(self, path: str) -> DirEntry: """ Get metadata for a file or directory. Args: path: Path to get metadata for Returns: DirEntry with entry information Raises: FileNotFoundError: If path doesn't exist """ return _to_dir_entry(self._native.get_entry(path))
[docs] def get_size(self) -> FilespaceSize: """ Get filespace size information. Returns: FilespaceSize with entries, data, storage, and external file info Raises: RuntimeError: If operation fails """ d = self._native.get_size() return FilespaceSize( entries=d["entries"], data=d["data"], storage=d["storage"], external_files_size=d["external_files_size"], external_files_count=d["external_files_count"], )
[docs] def get_statistics(self) -> FilespaceStatistics: """ Get filespace statistics. Returns: FilespaceStatistics with file/directory counts and size info Raises: RuntimeError: If operation fails """ d = self._native.get_statistics() return FilespaceStatistics( file_count=d["entries"]["files"], directory_count=d["entries"]["dirs"], symlink_count=d["entries"]["symlinks"], entries_size=d["size"]["entries"], data_size=d["size"]["data"], storage_size=d["size"]["storage"], external_files_size=d["size"]["external_files_size"], external_files_count=d["size"]["external_files_count"], )
# Convenience Methods
[docs] def read_file(self, path: str) -> bytes: """ Read entire file contents (convenience method). Args: path: File path to read Returns: File contents as bytes Raises: FileNotFoundError: If file doesn't exist PermissionError: If no read permission """ handle_id = self._native.open(path, "r") try: data = b"" chunk_size = READ_CHUNK_SIZE while True: chunk = self._native.read(handle_id, chunk_size, len(data)) if not chunk: break data += chunk return data finally: self._native.close(handle_id)
[docs] def write_file(self, path: str, data: bytes) -> None: """ Write data to file, creating it if needed (convenience method). Args: path: File path to write data: Bytes to write Raises: PermissionError: If no permission to write file TypeError: If data is not bytes """ handle_id = self._native.open(path, "w") try: self._native.write(handle_id, data, 0) self._native.set_end_of_file_by_handle(handle_id, len(data)) finally: self._native.close(handle_id)
[docs] def list_dir(self, path: str) -> List[str]: """ List directory contents, returning just filenames (convenience method). Args: path: Directory path to list Returns: List of filenames (not full paths) Raises: NotADirectoryError: If path is not a directory FileNotFoundError: If directory doesn't exist """ entries = self.read_dir(path) return [entry.name for entry in entries]
# Byte Range Locking Operations
[docs] def lock_byte_range( self, handle_id: int, offset: int, length: int, lock_type: str = "exclusive", blocking: bool = True ) -> bool: """ Lock a byte range of an open file. Acquires a lock on the specified byte range. This provides cross-daemon file locking that is coordinated through the LucidHub, ensuring proper mutual exclusion across all clients accessing the filespace. Args: handle_id: File handle ID from a low-level open operation offset: Start offset of the byte range to lock (0 for whole file) length: Length of the byte range to lock (use file size for whole file) lock_type: Lock type (default: "exclusive"): - "exclusive": Exclusive lock (no other locks allowed) - "shared" or "read": Shared read lock (multiple readers allowed) - "write": Protected write lock blocking: If True (default), wait until lock is available. If False, return immediately if lock unavailable. Returns: True if lock was acquired, False if non-blocking and lock unavailable Raises: RuntimeError: If handle is invalid or lock operation fails ValueError: If lock_type is invalid Example: .. code-block:: python handle_id = filespace.fs._native.open("/data.db", "r+b") try: if filespace.fs.lock_byte_range(handle_id, 0, 1, "exclusive"): # Perform exclusive operations pass finally: filespace.fs.unlock_byte_range(handle_id, 0, 1) filespace.fs._native.close(handle_id) """ return self._native.lock_byte_range(handle_id, offset, length, lock_type, blocking)
[docs] def unlock_byte_range(self, handle_id: int, offset: int, length: int) -> None: """ Unlock a byte range of an open file. Releases a previously acquired lock on the specified byte range. Args: handle_id: File handle ID from a low-level open operation offset: Start offset of the byte range to unlock length: Length of the byte range to unlock Raises: RuntimeError: If handle is invalid or unlock fails """ self._native.unlock_byte_range(handle_id, offset, length)
[docs] def unlock_all_byte_ranges(self, handle_id: int) -> None: """ Unlock all byte ranges on an open file. Releases all previously acquired locks on the file. Args: handle_id: File handle ID from a low-level open operation Raises: RuntimeError: If handle is invalid or unlock fails """ self._native.unlock_all_byte_ranges(handle_id)
def __repr__(self) -> str: return f"Filesystem(native={self._native})"
def _to_dir_entry(d: dict) -> DirEntry: """Convert a native dict to a DirEntry.""" return DirEntry( name=d["name"], size=d["size"], type=d["type"], file_id=d["file_id"], file_id_external=d["file_id_external"], ctime=d["creation_time"], mtime=d["update_time"], )