"""Filesystem operations on a linked LucidLink filespace.
Provides file, directory, metadata, and locking operations.
Accessed via filespace.fs after linking to a filespace.
"""
import io
from typing import Dict, List, Optional, Union
from .exceptions import FilespaceError
from .file_modes import parse_mode, is_text_mode, ensure_binary_mode, get_buffered_wrapper_type
from .filesystem_models import DirEntry, FilespaceSize, FilespaceStatistics
from .stream import LucidFileStream
READ_CHUNK_SIZE = 30 * 1024 * 1024 # 30MB
[docs]
class FileHandle:
"""
Context manager for file handles (legacy API).
Provides Pythonic file operations with automatic handle cleanup.
Use with 'with' statement to ensure handles are properly closed.
Example:
.. code-block:: python
with fs.open_legacy("/path/to/file.txt", "r") as fh:
data = fh.read()
# Handle automatically closed
"""
def __init__(self, filesystem: "Filesystem", path: str, mode: str):
self._filesystem = filesystem
self._path = path
self._mode = mode
self._handle_id: Optional[int] = None
self._closed = False
def __enter__(self) -> "FileHandle":
if self._handle_id is not None:
raise FilespaceError("File handle already open")
try:
self._handle_id = self._filesystem._native.open(self._path, self._mode)
return self
except Exception as e:
raise FilespaceError(f"Failed to open file '{self._path}': {e}") from e
def __exit__(self, exc_type, exc_val, exc_tb) -> None:
self.close()
[docs]
def read(self, size: int = -1, offset: int = 0) -> bytes:
"""
Read data from the file.
Args:
size: Number of bytes to read (-1 = read entire file)
offset: Byte offset to start reading from
Returns:
Bytes read from the file
"""
if self._handle_id is None:
raise FilespaceError("File handle is not open")
if self._closed:
raise FilespaceError("Cannot read from closed file")
try:
if size == -1:
size = 1024 * 1024 * 1024 # 1GB max per read
return self._filesystem._native.read(self._handle_id, size, offset)
except Exception as e:
raise FilespaceError(f"Failed to read from file '{self._path}': {e}") from e
[docs]
def write(self, data: bytes, offset: int = 0) -> None:
"""
Write data to the file.
Args:
data: Bytes to write to the file
offset: Byte offset to start writing at
"""
if self._handle_id is None:
raise FilespaceError("File handle is not open")
if self._closed:
raise FilespaceError("Cannot write to closed file")
if not isinstance(data, bytes):
raise TypeError("Data must be bytes")
try:
self._filesystem._native.write(self._handle_id, data, offset)
except Exception as e:
raise FilespaceError(f"Failed to write to file '{self._path}': {e}") from e
[docs]
def close(self) -> None:
"""Close the file handle. Safe to call multiple times."""
if self._closed or self._handle_id is None:
return
try:
self._filesystem._native.close(self._handle_id)
self._closed = True
self._handle_id = None
except Exception as e:
self._closed = True
self._handle_id = None
raise FilespaceError(f"Failed to close file '{self._path}': {e}") from e
@property
def closed(self) -> bool:
"""Check if the file handle is closed."""
return self._closed
@property
def path(self) -> str:
"""Get the file path."""
return self._path
@property
def mode(self) -> str:
"""Get the file open mode."""
return self._mode
def __repr__(self) -> str:
status = "closed" if self._closed else "open"
return f"FileHandle(path='{self._path}', mode='{self._mode}', {status})"
[docs]
class Filesystem:
"""
Filesystem operations on a linked LucidLink filespace.
Provides file, directory, metadata, and locking operations.
Obtained via the filespace.fs property after linking.
Example:
.. code-block:: python
filespace = workspace.link_filespace(name="production-data")
entries = filespace.fs.read_dir("/")
filespace.fs.create_dir("/new-folder")
with filespace.fs.open("/file.txt", "wb") as f:
f.write(b"data")
"""
def __init__(self, native_fs):
"""
Initialize filesystem wrapper.
Args:
native_fs: Native filesystem wrapper (internal use)
Note: This constructor is called internally by Filespace.
Users should not construct Filesystem objects directly.
"""
self._native = native_fs
# Directory Operations
[docs]
def read_dir(self, path: str) -> List[DirEntry]:
"""
List directory contents.
Args:
path: Directory path to list
Returns:
List of DirEntry objects with file entry information.
Raises:
NotADirectoryError: If path is not a directory
FileNotFoundError: If directory doesn't exist
PermissionError: If no read permission
"""
return [_to_dir_entry(e) for e in self._native.read_dir(path)]
[docs]
def create_dir(self, path: str) -> None:
"""
Create a directory.
Creates parent directories if they don't exist (like mkdir -p).
Args:
path: Directory path to create
Raises:
FileExistsError: If directory already exists
PermissionError: If no create permission
"""
self._native.create_dir(path)
[docs]
def delete_dir(self, path: str, recursive: bool = False) -> None:
"""
Delete a directory.
Args:
path: Directory path to delete
recursive: If True, delete non-empty directories (default: False)
Raises:
FileNotFoundError: If directory doesn't exist
NotADirectoryError: If path is not a directory
OSError: If directory is not empty and recursive=False
PermissionError: If no delete permission
"""
self._native.delete_dir(path, recursive)
[docs]
def dir_exists(self, path: str) -> bool:
"""
Check if a directory exists.
Args:
path: Directory path to check
Returns:
True if directory exists, False otherwise
"""
return self._native.dir_exists(path)
# File Operations
[docs]
def open(
self,
path: str,
mode: str = "rb",
buffering: int = -1,
encoding: Optional[str] = None,
errors: Optional[str] = None,
newline: Optional[str] = None,
lock_type: str = ""
) -> Union[LucidFileStream, io.BufferedReader, io.BufferedWriter, io.TextIOWrapper]:
"""
Open a file with streaming support.
Returns an io.RawIOBase-compatible stream that works with standard Python
libraries and third-party packages (Pandas, LangChain, PyTorch, etc.).
Args:
path: File path to open
mode: Open mode (default: 'rb'):
- 'r': Read text (default encoding: utf-8)
- 'w': Write text (create/truncate, default encoding: utf-8)
- 'a': Append text (default encoding: utf-8)
- 'r+': Read/write text (default encoding: utf-8)
- 'w+': Write/read text (create/truncate, default encoding: utf-8)
- 'rb': Read binary
- 'wb': Write binary (create/truncate)
- 'ab': Append binary
- 'r+b': Read/write binary
- 'w+b': Write/read binary (create/truncate)
- 'a+b': Append/read binary
- 'rt': Read text (explicit, same as 'r')
- 'wt': Write text (explicit, same as 'w')
buffering: Buffer size:
- -1 (default): Use system default (8192 bytes)
- 0: Unbuffered (binary modes only)
- 1: Line buffered (text mode only)
- >1: Buffer size in bytes
encoding: Text encoding (e.g., 'utf-8', 'latin-1')
Required for text modes ('r', 'w' without 'b')
errors: Error handling ('strict', 'ignore', 'replace')
newline: Newline handling (None, '', '\\n', '\\r', '\\r\\n')
lock_type: Lock type - "" (no lock), "shared" (read), "exclusive" (write).
Lock is held for lifetime of file handle and released on close.
Returns:
File stream object supporting read, write, seek, tell operations.
Compatible with context managers (with statement).
Example:
.. code-block:: python
# Binary mode
with filespace.fs.open("/file.dat", "rb") as f:
data = f.read()
# Text mode
with filespace.fs.open("/file.txt", "rt", encoding="utf-8") as f:
for line in f:
print(line.strip())
# With Pandas
with filespace.fs.open("/data.csv", "rb") as f:
df = pd.read_csv(f)
# Writing
with filespace.fs.open("/output.txt", "wb") as f:
f.write(b"Hello, LucidLink!")
# With exclusive locking (SQLite-style)
with filespace.fs.open("/db.sqlite", "r+b", lock_type="exclusive") as f:
data = f.read()
Raises:
FileNotFoundError: If file doesn't exist (read mode)
PermissionError: If no access permission
ValueError: If mode is invalid
"""
text_mode = is_text_mode(mode)
# Default encoding for text mode
if text_mode and encoding is None:
encoding = "utf-8"
# Ensure binary mode for raw stream
binary_mode = ensure_binary_mode(mode) if text_mode else mode
# Create raw stream
raw = LucidFileStream(self._native, path, binary_mode, lock_type)
# Handle unbuffered binary mode
if buffering == 0:
if text_mode:
raise ValueError("can't have unbuffered text I/O")
return raw
# Determine buffer size
if buffering < 0:
buffering = io.DEFAULT_BUFFER_SIZE
# Create buffered stream using shared utility
wrapper_type = get_buffered_wrapper_type(mode)
buffered: io.BufferedIOBase
if wrapper_type == "random":
buffered = io.BufferedRandom(raw, buffering)
elif wrapper_type == "writer":
buffered = io.BufferedWriter(raw, buffering)
else:
buffered = io.BufferedReader(raw, buffering)
# Wrap in TextIOWrapper if text mode
if text_mode:
return io.TextIOWrapper(
buffered,
encoding=encoding,
errors=errors or "strict",
newline=newline,
)
return buffered
[docs]
def open_legacy(self, path: str, mode: str = "r") -> FileHandle:
"""
Open a file using legacy FileHandle API (deprecated).
This method is preserved for backward compatibility. New code should
use open() instead, which returns a more capable io.RawIOBase stream.
Args:
path: File path to open
mode: Open mode ('r', 'w', 'a')
Returns:
FileHandle context manager
Raises:
FileNotFoundError: If file doesn't exist (read mode)
PermissionError: If no access permission
"""
return FileHandle(self, path, mode)
[docs]
def create(self, path: str) -> int:
"""
Create a new file and return a handle ID.
Use open() with mode="w" for most cases. This is a low-level API.
Args:
path: File path to create
Returns:
File handle ID (use with read/write/close methods)
Raises:
FileExistsError: If file already exists
PermissionError: If no create permission
"""
return self._native.create(path)
[docs]
def delete(self, path: str) -> None:
"""
Delete a file.
Args:
path: File path to delete
Raises:
FileNotFoundError: If file doesn't exist
IsADirectoryError: If path is a directory
PermissionError: If no delete permission
"""
self._native.delete(path)
[docs]
def move(self, src: str, dst: str) -> None:
"""
Move/rename a file or directory.
Args:
src: Source path
dst: Destination path
Raises:
FileNotFoundError: If source doesn't exist
FileExistsError: If destination already exists
PermissionError: If no move permission
"""
self._native.move(src, dst)
[docs]
def file_exists(self, path: str) -> bool:
"""
Check if a file exists.
Args:
path: File path to check
Returns:
True if file exists, False otherwise
"""
return self._native.file_exists(path)
[docs]
def truncate(self, path: str, size: int) -> None:
"""
Truncate or extend file to specified size.
Args:
path: File path to truncate
size: New file size in bytes
Raises:
FileNotFoundError: If file doesn't exist
PermissionError: If no write permission
RuntimeError: If truncation fails
"""
self._native.set_end_of_file(path, size)
# Metadata Operations
[docs]
def get_entry(self, path: str) -> DirEntry:
"""
Get metadata for a file or directory.
Args:
path: Path to get metadata for
Returns:
DirEntry with entry information
Raises:
FileNotFoundError: If path doesn't exist
"""
return _to_dir_entry(self._native.get_entry(path))
[docs]
def get_size(self) -> FilespaceSize:
"""
Get filespace size information.
Returns:
FilespaceSize with entries, data, storage, and external file info
Raises:
RuntimeError: If operation fails
"""
d = self._native.get_size()
return FilespaceSize(
entries=d["entries"],
data=d["data"],
storage=d["storage"],
external_files_size=d["external_files_size"],
external_files_count=d["external_files_count"],
)
[docs]
def get_statistics(self) -> FilespaceStatistics:
"""
Get filespace statistics.
Returns:
FilespaceStatistics with file/directory counts and size info
Raises:
RuntimeError: If operation fails
"""
d = self._native.get_statistics()
return FilespaceStatistics(
file_count=d["entries"]["files"],
directory_count=d["entries"]["dirs"],
symlink_count=d["entries"]["symlinks"],
entries_size=d["size"]["entries"],
data_size=d["size"]["data"],
storage_size=d["size"]["storage"],
external_files_size=d["size"]["external_files_size"],
external_files_count=d["size"]["external_files_count"],
)
# Convenience Methods
[docs]
def read_file(self, path: str) -> bytes:
"""
Read entire file contents (convenience method).
Args:
path: File path to read
Returns:
File contents as bytes
Raises:
FileNotFoundError: If file doesn't exist
PermissionError: If no read permission
"""
handle_id = self._native.open(path, "r")
try:
data = b""
chunk_size = READ_CHUNK_SIZE
while True:
chunk = self._native.read(handle_id, chunk_size, len(data))
if not chunk:
break
data += chunk
return data
finally:
self._native.close(handle_id)
[docs]
def write_file(self, path: str, data: bytes) -> None:
"""
Write data to file, creating it if needed (convenience method).
Args:
path: File path to write
data: Bytes to write
Raises:
PermissionError: If no permission to write file
TypeError: If data is not bytes
"""
handle_id = self._native.open(path, "w")
try:
self._native.write(handle_id, data, 0)
self._native.set_end_of_file_by_handle(handle_id, len(data))
finally:
self._native.close(handle_id)
[docs]
def list_dir(self, path: str) -> List[str]:
"""
List directory contents, returning just filenames (convenience method).
Args:
path: Directory path to list
Returns:
List of filenames (not full paths)
Raises:
NotADirectoryError: If path is not a directory
FileNotFoundError: If directory doesn't exist
"""
entries = self.read_dir(path)
return [entry.name for entry in entries]
# Byte Range Locking Operations
[docs]
def lock_byte_range(
self,
handle_id: int,
offset: int,
length: int,
lock_type: str = "exclusive",
blocking: bool = True
) -> bool:
"""
Lock a byte range of an open file.
Acquires a lock on the specified byte range. This provides cross-daemon
file locking that is coordinated through the LucidHub, ensuring proper
mutual exclusion across all clients accessing the filespace.
Args:
handle_id: File handle ID from a low-level open operation
offset: Start offset of the byte range to lock (0 for whole file)
length: Length of the byte range to lock (use file size for whole file)
lock_type: Lock type (default: "exclusive"):
- "exclusive": Exclusive lock (no other locks allowed)
- "shared" or "read": Shared read lock (multiple readers allowed)
- "write": Protected write lock
blocking: If True (default), wait until lock is available.
If False, return immediately if lock unavailable.
Returns:
True if lock was acquired, False if non-blocking and lock unavailable
Raises:
RuntimeError: If handle is invalid or lock operation fails
ValueError: If lock_type is invalid
Example:
.. code-block:: python
handle_id = filespace.fs._native.open("/data.db", "r+b")
try:
if filespace.fs.lock_byte_range(handle_id, 0, 1, "exclusive"):
# Perform exclusive operations
pass
finally:
filespace.fs.unlock_byte_range(handle_id, 0, 1)
filespace.fs._native.close(handle_id)
"""
return self._native.lock_byte_range(handle_id, offset, length, lock_type, blocking)
[docs]
def unlock_byte_range(self, handle_id: int, offset: int, length: int) -> None:
"""
Unlock a byte range of an open file.
Releases a previously acquired lock on the specified byte range.
Args:
handle_id: File handle ID from a low-level open operation
offset: Start offset of the byte range to unlock
length: Length of the byte range to unlock
Raises:
RuntimeError: If handle is invalid or unlock fails
"""
self._native.unlock_byte_range(handle_id, offset, length)
[docs]
def unlock_all_byte_ranges(self, handle_id: int) -> None:
"""
Unlock all byte ranges on an open file.
Releases all previously acquired locks on the file.
Args:
handle_id: File handle ID from a low-level open operation
Raises:
RuntimeError: If handle is invalid or unlock fails
"""
self._native.unlock_all_byte_ranges(handle_id)
def __repr__(self) -> str:
return f"Filesystem(native={self._native})"
def _to_dir_entry(d: dict) -> DirEntry:
"""Convert a native dict to a DirEntry."""
return DirEntry(
name=d["name"],
size=d["size"],
type=d["type"],
file_id=d["file_id"],
file_id_external=d["file_id_external"],
ctime=d["creation_time"],
mtime=d["update_time"],
)