Source code for lucidlink.stream

"""Streaming file interface for LucidLink files.

Provides io.RawIOBase-compatible file streams for seamless integration with
Python's standard library and third-party libraries (Pandas, LangChain, PyTorch, etc.).
"""

import io
from typing import Optional, Any

from .file_modes import (
    normalize_mode_for_cpp,
    parse_mode,
    validate_lock_type,
    get_buffered_wrapper_type,
    VALID_LOCK_TYPES,
)


[docs] class LucidFileStream(io.RawIOBase): """Streaming file access for LucidLink files. Implements Python's io.RawIOBase interface for efficient, streaming access to files in LucidLink filespaces. Compatible with standard Python I/O operations and third-party libraries that expect file-like objects. This class provides: - Standard file operations: read, write, seek, tell, truncate - Context manager support (with statement) - Iterator support (for line in file) - Optional file locking (on-open locking) - Compatibility with: Pandas, LangChain, PyTorch, Hugging Face, etc. Args: fs_wrapper: PythonFileSystemWrapper instance (from lucidlink_native) path: File path within the filespace mode: File open mode ('r', 'rb', 'w', 'wb', 'a', 'ab', 'r+', 'r+b') lock_type: Lock type - "" (no lock), "shared" (read), "exclusive" (write) Example: .. code-block:: python with filespace.fs.open("/data/file.txt", "rb") as f: data = f.read() with filespace.fs.open("/data/output.csv", "wb") as f: f.write(b"column1,column2\\n") f.write(b"value1,value2\\n") # With exclusive locking for SQLite-style access with filespace.fs.open("/data/db.sqlite", "r+b", lock_type="exclusive") as f: data = f.read() """ def __init__(self, fs_wrapper: Any, path: str, mode: str = "rb", lock_type: str = ""): """Initialize a streaming file handle. Args: fs_wrapper: PythonFileSystemWrapper from lucidlink_native module path: Path to file within filespace mode: File open mode (default: 'rb') lock_type: Lock type - "" (no lock), "shared" (read), "exclusive" (write). Lock is held for lifetime of file handle and released on close. """ super().__init__() self._fs = fs_wrapper self._path = path self._mode = mode self._lock_type = lock_type self._pos = 0 self._size: Optional[int] = None self._handle: Optional[int] = None self._closed = False self._bytes_written = 0 # Track if we've written anything # Validate mode and lock_type using shared utilities self._parsed_mode = parse_mode(mode) validate_lock_type(lock_type) # Open immediately when: # - Write mode: needed because io.BufferedWriter may not call __enter__ # - Lock requested: lock is acquired on open, so must open now if self._parsed_mode.is_write or lock_type: self._open_handle()
[docs] def readable(self) -> bool: """Return whether the stream supports reading. Returns: True if stream was opened for reading """ return self._parsed_mode.readable
[docs] def writable(self) -> bool: """Return whether the stream supports writing. Returns: True if stream was opened for writing """ return self._parsed_mode.writable
[docs] def seekable(self) -> bool: """Return whether the stream supports random access. Returns: True (LucidLink files always support seeking) """ return True
[docs] def read(self, size: int = -1) -> bytes: """Read up to size bytes from the stream. Args: size: Number of bytes to read, or -1 to read entire file Returns: Bytes read from file Raises: ValueError: If stream is closed io.UnsupportedOperation: If stream not opened for reading """ if self._closed: raise ValueError("I/O operation on closed file") if not self.readable(): raise io.UnsupportedOperation("not readable") # Ensure handle is open self._open_handle() # Handle read entire file if size == -1: file_size = self._get_size() size = max(0, file_size - self._pos) if size <= 0: return b"" # Read from current position data = self._fs.read(self._handle, size, self._pos) self._pos += len(data) return bytes(data)
[docs] def readinto(self, b: bytearray) -> Optional[int]: """Read bytes into a pre-allocated buffer. Args: b: Buffer to read into Returns: Number of bytes read, or None if EOF """ data = self.read(len(b)) n = len(data) if n == 0: return None b[:n] = data return n
[docs] def write(self, b: bytes) -> int: """Write bytes to the stream. Args: b: Bytes to write Returns: Number of bytes written Raises: ValueError: If stream is closed io.UnsupportedOperation: If stream not opened for writing """ if self._closed: raise ValueError("I/O operation on closed file") if not self.writable(): raise io.UnsupportedOperation("not writable") # Ensure handle is open self._open_handle() # Write at current position self._fs.write(self._handle, bytes(b), self._pos) bytes_written = len(b) self._pos += bytes_written self._bytes_written += bytes_written # Update cached size if we extended the file if self._size is not None and self._pos > self._size: self._size = self._pos return bytes_written
[docs] def seek(self, offset: int, whence: int = 0) -> int: """Change stream position. Args: offset: Offset in bytes whence: Reference point (0=start, 1=current, 2=end) Returns: New absolute position """ if whence == 0: # SEEK_SET - absolute position self._pos = offset elif whence == 1: # SEEK_CUR - relative to current position self._pos += offset elif whence == 2: # SEEK_END - relative to end of file self._pos = self._get_size() + offset else: raise ValueError(f"Invalid whence: {whence}") # Clamp position to valid range [0, filesize] self._pos = max(0, self._pos) return self._pos
[docs] def tell(self) -> int: """Return current stream position. Returns: Current position in bytes """ return self._pos
[docs] def truncate(self, size: Optional[int] = None) -> int: """Resize the stream to the given size. Args: size: New size in bytes, or current position if None Returns: New size Raises: ValueError: If stream is closed io.UnsupportedOperation: If stream not opened for writing """ if self._closed: raise ValueError("I/O operation on closed file") if not self.writable(): raise io.UnsupportedOperation("not writable") # Ensure handle is open self._open_handle() if size is None: size = self._pos self._fs.set_end_of_file_by_handle(self._handle, size) # Update cached size if self._size is not None: self._size = size return size
[docs] def close(self) -> None: """Close the stream and release resources.""" if not self._closed: if self._handle is not None: try: self._fs.close(self._handle) except Exception: # Ignore errors during close pass self._handle = None self._closed = True
@property def closed(self) -> bool: """Return whether the stream is closed. Returns: True if stream is closed """ return self._closed @property def name(self) -> str: """Return the file path. Returns: Path to file within filespace """ return self._path @property def mode(self) -> str: """Return the file mode. Returns: Mode string used to open the file """ return self._mode
[docs] def fileno(self) -> int: """Return underlying file descriptor. Raises: io.UnsupportedOperation: LucidLink files don't have OS file descriptors """ raise io.UnsupportedOperation("fileno not supported")
[docs] def isatty(self) -> bool: """Return whether this is an interactive stream. Returns: False (LucidLink files are never TTY devices) """ return False
def _get_size(self) -> int: """Get file size (cached). Returns: File size in bytes """ if self._size is None: try: entry = self._fs.get_entry(self._path) self._size = entry.get("size", 0) except Exception: # If we can't get size, assume 0 self._size = 0 return self._size def _open_handle(self) -> None: """Open the file handle if not already open.""" if self._handle is None: cpp_mode = normalize_mode_for_cpp(self._mode) self._handle = self._fs.open(self._path, cpp_mode, self._lock_type) # For write mode (not append), create the file and truncate to 0, # matching Python's open("w"/"wb") behavior if self._parsed_mode.is_write and not self._parsed_mode.is_append: self._fs.write(self._handle, b'', 0) self._fs.set_end_of_file_by_handle(self._handle, 0) self._size = 0 # For append mode, seek to end if self._parsed_mode.is_append: self._pos = self._get_size() def __enter__(self) -> 'LucidFileStream': """Enter context manager - open file handle. Returns: Self """ self._open_handle() return self def __exit__(self, *args) -> None: """Exit context manager - close file handle.""" self.close() def __iter__(self) -> 'LucidFileStream': """Return iterator for reading lines. Returns: Self """ return self def __next__(self) -> bytes: """Read next line from file. Returns: Next line as bytes Raises: StopIteration: If EOF reached """ line = self.readline() if not line: raise StopIteration return line def __repr__(self) -> str: """Return string representation. Returns: String describing this stream """ status = "closed" if self._closed else "open" return f"<LucidFileStream path={self._path!r} mode={self._mode!r} {status}>"
[docs] def open_buffered(fs_wrapper: Any, path: str, mode: str = "rb", buffer_size: int = io.DEFAULT_BUFFER_SIZE, lock_type: str = "") -> io.BufferedIOBase: """Open a file with buffering for improved performance. Args: fs_wrapper: PythonFileSystemWrapper from lucidlink_native module path: Path to file within filespace mode: File open mode (default: 'rb') buffer_size: Buffer size in bytes (default: 8192) lock_type: Lock type - "" (no lock), "shared" (read), "exclusive" (write) Returns: BufferedReader, BufferedWriter, or BufferedRandom wrapping LucidFileStream Example: .. code-block:: python with open_buffered(fs, "/data/large.csv", "rb") as f: for line in f: process(line) """ raw = LucidFileStream(fs_wrapper, path, mode, lock_type) wrapper_type = get_buffered_wrapper_type(mode) if wrapper_type == "random": return io.BufferedRandom(raw, buffer_size) elif wrapper_type == "writer": return io.BufferedWriter(raw, buffer_size) else: return io.BufferedReader(raw, buffer_size)
[docs] def open_text(fs_wrapper: Any, path: str, mode: str = "r", encoding: str = "utf-8", errors: str = "strict", newline: Optional[str] = None, lock_type: str = "") -> io.TextIOWrapper: """Open a file in text mode. Args: fs_wrapper: PythonFileSystemWrapper from lucidlink_native module path: Path to file within filespace mode: File open mode (default: 'r') encoding: Text encoding (default: 'utf-8') errors: Error handling strategy (default: 'strict') newline: Newline handling (None, '', '\\n', '\\r', '\\r\\n') lock_type: Lock type - "" (no lock), "shared" (read), "exclusive" (write) Returns: TextIOWrapper for text-mode access Example: .. code-block:: python with open_text(fs, "/data/report.txt", "r") as f: for line in f: print(line.strip()) """ # Ensure binary mode if 'b' not in mode: mode = mode.replace('r', 'rb').replace('w', 'wb').replace('a', 'ab') # Create buffered stream buffered = open_buffered(fs_wrapper, path, mode, lock_type=lock_type) # Wrap in TextIOWrapper return io.TextIOWrapper(buffered, encoding=encoding, errors=errors, newline=newline)