"""Streaming file interface for LucidLink files.
Provides io.RawIOBase-compatible file streams for seamless integration with
Python's standard library and third-party libraries (Pandas, LangChain, PyTorch, etc.).
"""
import io
from typing import Optional, Any
from .file_modes import (
normalize_mode_for_cpp,
parse_mode,
validate_lock_type,
get_buffered_wrapper_type,
VALID_LOCK_TYPES,
)
[docs]
class LucidFileStream(io.RawIOBase):
"""Streaming file access for LucidLink files.
Implements Python's io.RawIOBase interface for efficient, streaming access
to files in LucidLink filespaces. Compatible with standard Python I/O
operations and third-party libraries that expect file-like objects.
This class provides:
- Standard file operations: read, write, seek, tell, truncate
- Context manager support (with statement)
- Iterator support (for line in file)
- Optional file locking (on-open locking)
- Compatibility with: Pandas, LangChain, PyTorch, Hugging Face, etc.
Args:
fs_wrapper: PythonFileSystemWrapper instance (from lucidlink_native)
path: File path within the filespace
mode: File open mode ('r', 'rb', 'w', 'wb', 'a', 'ab', 'r+', 'r+b')
lock_type: Lock type - "" (no lock), "shared" (read), "exclusive" (write)
Example:
.. code-block:: python
with filespace.fs.open("/data/file.txt", "rb") as f:
data = f.read()
with filespace.fs.open("/data/output.csv", "wb") as f:
f.write(b"column1,column2\\n")
f.write(b"value1,value2\\n")
# With exclusive locking for SQLite-style access
with filespace.fs.open("/data/db.sqlite", "r+b", lock_type="exclusive") as f:
data = f.read()
"""
def __init__(self, fs_wrapper: Any, path: str, mode: str = "rb", lock_type: str = ""):
"""Initialize a streaming file handle.
Args:
fs_wrapper: PythonFileSystemWrapper from lucidlink_native module
path: Path to file within filespace
mode: File open mode (default: 'rb')
lock_type: Lock type - "" (no lock), "shared" (read), "exclusive" (write).
Lock is held for lifetime of file handle and released on close.
"""
super().__init__()
self._fs = fs_wrapper
self._path = path
self._mode = mode
self._lock_type = lock_type
self._pos = 0
self._size: Optional[int] = None
self._handle: Optional[int] = None
self._closed = False
self._bytes_written = 0 # Track if we've written anything
# Validate mode and lock_type using shared utilities
self._parsed_mode = parse_mode(mode)
validate_lock_type(lock_type)
# Open immediately when:
# - Write mode: needed because io.BufferedWriter may not call __enter__
# - Lock requested: lock is acquired on open, so must open now
if self._parsed_mode.is_write or lock_type:
self._open_handle()
[docs]
def readable(self) -> bool:
"""Return whether the stream supports reading.
Returns:
True if stream was opened for reading
"""
return self._parsed_mode.readable
[docs]
def writable(self) -> bool:
"""Return whether the stream supports writing.
Returns:
True if stream was opened for writing
"""
return self._parsed_mode.writable
[docs]
def seekable(self) -> bool:
"""Return whether the stream supports random access.
Returns:
True (LucidLink files always support seeking)
"""
return True
[docs]
def read(self, size: int = -1) -> bytes:
"""Read up to size bytes from the stream.
Args:
size: Number of bytes to read, or -1 to read entire file
Returns:
Bytes read from file
Raises:
ValueError: If stream is closed
io.UnsupportedOperation: If stream not opened for reading
"""
if self._closed:
raise ValueError("I/O operation on closed file")
if not self.readable():
raise io.UnsupportedOperation("not readable")
# Ensure handle is open
self._open_handle()
# Handle read entire file
if size == -1:
file_size = self._get_size()
size = max(0, file_size - self._pos)
if size <= 0:
return b""
# Read from current position
data = self._fs.read(self._handle, size, self._pos)
self._pos += len(data)
return bytes(data)
[docs]
def readinto(self, b: bytearray) -> Optional[int]:
"""Read bytes into a pre-allocated buffer.
Args:
b: Buffer to read into
Returns:
Number of bytes read, or None if EOF
"""
data = self.read(len(b))
n = len(data)
if n == 0:
return None
b[:n] = data
return n
[docs]
def write(self, b: bytes) -> int:
"""Write bytes to the stream.
Args:
b: Bytes to write
Returns:
Number of bytes written
Raises:
ValueError: If stream is closed
io.UnsupportedOperation: If stream not opened for writing
"""
if self._closed:
raise ValueError("I/O operation on closed file")
if not self.writable():
raise io.UnsupportedOperation("not writable")
# Ensure handle is open
self._open_handle()
# Write at current position
self._fs.write(self._handle, bytes(b), self._pos)
bytes_written = len(b)
self._pos += bytes_written
self._bytes_written += bytes_written
# Update cached size if we extended the file
if self._size is not None and self._pos > self._size:
self._size = self._pos
return bytes_written
[docs]
def seek(self, offset: int, whence: int = 0) -> int:
"""Change stream position.
Args:
offset: Offset in bytes
whence: Reference point (0=start, 1=current, 2=end)
Returns:
New absolute position
"""
if whence == 0: # SEEK_SET - absolute position
self._pos = offset
elif whence == 1: # SEEK_CUR - relative to current position
self._pos += offset
elif whence == 2: # SEEK_END - relative to end of file
self._pos = self._get_size() + offset
else:
raise ValueError(f"Invalid whence: {whence}")
# Clamp position to valid range [0, filesize]
self._pos = max(0, self._pos)
return self._pos
[docs]
def tell(self) -> int:
"""Return current stream position.
Returns:
Current position in bytes
"""
return self._pos
[docs]
def truncate(self, size: Optional[int] = None) -> int:
"""Resize the stream to the given size.
Args:
size: New size in bytes, or current position if None
Returns:
New size
Raises:
ValueError: If stream is closed
io.UnsupportedOperation: If stream not opened for writing
"""
if self._closed:
raise ValueError("I/O operation on closed file")
if not self.writable():
raise io.UnsupportedOperation("not writable")
# Ensure handle is open
self._open_handle()
if size is None:
size = self._pos
self._fs.set_end_of_file_by_handle(self._handle, size)
# Update cached size
if self._size is not None:
self._size = size
return size
[docs]
def close(self) -> None:
"""Close the stream and release resources."""
if not self._closed:
if self._handle is not None:
try:
self._fs.close(self._handle)
except Exception:
# Ignore errors during close
pass
self._handle = None
self._closed = True
@property
def closed(self) -> bool:
"""Return whether the stream is closed.
Returns:
True if stream is closed
"""
return self._closed
@property
def name(self) -> str:
"""Return the file path.
Returns:
Path to file within filespace
"""
return self._path
@property
def mode(self) -> str:
"""Return the file mode.
Returns:
Mode string used to open the file
"""
return self._mode
[docs]
def fileno(self) -> int:
"""Return underlying file descriptor.
Raises:
io.UnsupportedOperation: LucidLink files don't have OS file descriptors
"""
raise io.UnsupportedOperation("fileno not supported")
[docs]
def isatty(self) -> bool:
"""Return whether this is an interactive stream.
Returns:
False (LucidLink files are never TTY devices)
"""
return False
def _get_size(self) -> int:
"""Get file size (cached).
Returns:
File size in bytes
"""
if self._size is None:
try:
entry = self._fs.get_entry(self._path)
self._size = entry.get("size", 0)
except Exception:
# If we can't get size, assume 0
self._size = 0
return self._size
def _open_handle(self) -> None:
"""Open the file handle if not already open."""
if self._handle is None:
cpp_mode = normalize_mode_for_cpp(self._mode)
self._handle = self._fs.open(self._path, cpp_mode, self._lock_type)
# For write mode (not append), create the file and truncate to 0,
# matching Python's open("w"/"wb") behavior
if self._parsed_mode.is_write and not self._parsed_mode.is_append:
self._fs.write(self._handle, b'', 0)
self._fs.set_end_of_file_by_handle(self._handle, 0)
self._size = 0
# For append mode, seek to end
if self._parsed_mode.is_append:
self._pos = self._get_size()
def __enter__(self) -> 'LucidFileStream':
"""Enter context manager - open file handle.
Returns:
Self
"""
self._open_handle()
return self
def __exit__(self, *args) -> None:
"""Exit context manager - close file handle."""
self.close()
def __iter__(self) -> 'LucidFileStream':
"""Return iterator for reading lines.
Returns:
Self
"""
return self
def __next__(self) -> bytes:
"""Read next line from file.
Returns:
Next line as bytes
Raises:
StopIteration: If EOF reached
"""
line = self.readline()
if not line:
raise StopIteration
return line
def __repr__(self) -> str:
"""Return string representation.
Returns:
String describing this stream
"""
status = "closed" if self._closed else "open"
return f"<LucidFileStream path={self._path!r} mode={self._mode!r} {status}>"
[docs]
def open_buffered(fs_wrapper: Any, path: str, mode: str = "rb",
buffer_size: int = io.DEFAULT_BUFFER_SIZE,
lock_type: str = "") -> io.BufferedIOBase:
"""Open a file with buffering for improved performance.
Args:
fs_wrapper: PythonFileSystemWrapper from lucidlink_native module
path: Path to file within filespace
mode: File open mode (default: 'rb')
buffer_size: Buffer size in bytes (default: 8192)
lock_type: Lock type - "" (no lock), "shared" (read), "exclusive" (write)
Returns:
BufferedReader, BufferedWriter, or BufferedRandom wrapping LucidFileStream
Example:
.. code-block:: python
with open_buffered(fs, "/data/large.csv", "rb") as f:
for line in f:
process(line)
"""
raw = LucidFileStream(fs_wrapper, path, mode, lock_type)
wrapper_type = get_buffered_wrapper_type(mode)
if wrapper_type == "random":
return io.BufferedRandom(raw, buffer_size)
elif wrapper_type == "writer":
return io.BufferedWriter(raw, buffer_size)
else:
return io.BufferedReader(raw, buffer_size)
[docs]
def open_text(fs_wrapper: Any, path: str, mode: str = "r",
encoding: str = "utf-8", errors: str = "strict",
newline: Optional[str] = None,
lock_type: str = "") -> io.TextIOWrapper:
"""Open a file in text mode.
Args:
fs_wrapper: PythonFileSystemWrapper from lucidlink_native module
path: Path to file within filespace
mode: File open mode (default: 'r')
encoding: Text encoding (default: 'utf-8')
errors: Error handling strategy (default: 'strict')
newline: Newline handling (None, '', '\\n', '\\r', '\\r\\n')
lock_type: Lock type - "" (no lock), "shared" (read), "exclusive" (write)
Returns:
TextIOWrapper for text-mode access
Example:
.. code-block:: python
with open_text(fs, "/data/report.txt", "r") as f:
for line in f:
print(line.strip())
"""
# Ensure binary mode
if 'b' not in mode:
mode = mode.replace('r', 'rb').replace('w', 'wb').replace('a', 'ab')
# Create buffered stream
buffered = open_buffered(fs_wrapper, path, mode, lock_type=lock_type)
# Wrap in TextIOWrapper
return io.TextIOWrapper(buffered, encoding=encoding, errors=errors, newline=newline)