Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import fcntl
import os
import threading
from pathlib import Path
from typing import TextIO

Expand All @@ -12,7 +13,11 @@ class EventWriter:
"""Thread-safe JSONL event writer with file locking.

Writes events to a JSON Lines file with proper file locking
to support concurrent access from multiple processes.
for inter-process safety and a threading lock for intra-process safety.

Thread safety is critical because Snakemake dispatches log events via a
``QueueListener`` background thread, while handler ``close()`` may be
called from the main thread during cleanup.

Attributes:
path: Path to the event file.
Expand All @@ -31,10 +36,14 @@ def __init__(self, path: Path, buffer_size: int = 1) -> None:
self.buffer_size = buffer_size
self._buffer: list[SnakeseeEvent] = []
self._file: TextIO | None = None
self._lock = threading.Lock()
self._closed = False

def _ensure_open(self) -> TextIO:
"""Ensure the file is open for appending.

Must be called while holding ``_lock``.

Returns:
The open file handle.
"""
Expand All @@ -49,14 +58,16 @@ def truncate(self) -> None:

Should be called when a new workflow starts to ensure fresh state.
"""
# Close existing handle if open
if self._file is not None:
self._file.close()
self._file = None

# Truncate by opening in write mode
self.path.parent.mkdir(parents=True, exist_ok=True)
self._file = open(self.path, "w", encoding="utf-8")
with self._lock:
self._closed = False
# Close existing handle if open
if self._file is not None:
self._file.close()
self._file = None

# Truncate by opening in write mode
self.path.parent.mkdir(parents=True, exist_ok=True)
self._file = open(self.path, "w", encoding="utf-8")

def write(self, event: SnakeseeEvent) -> None:
"""Write an event to the file.
Expand All @@ -66,15 +77,23 @@ def write(self, event: SnakeseeEvent) -> None:
Args:
event: The event to write.
"""
self._buffer.append(event)
if len(self._buffer) >= self.buffer_size:
self.flush()
with self._lock:
if self._closed:
return
self._buffer.append(event)
if len(self._buffer) >= self.buffer_size:
self._flush_locked()

def flush(self) -> None:
"""Flush buffered events to disk.

Uses file locking to ensure safe concurrent access.
"""
with self._lock:
self._flush_locked()

def _flush_locked(self) -> None:
"""Flush buffered events to disk. Must be called while holding ``_lock``."""
if not self._buffer:
return

Expand All @@ -94,10 +113,16 @@ def flush(self) -> None:

def close(self) -> None:
"""Close the file and flush any remaining events."""
self.flush()
if self._file is not None:
self._file.close()
self._file = None
with self._lock:
if self._closed:
return
self._closed = True
try:
self._flush_locked()
finally:
if self._file is not None:
self._file.close()
self._file = None

def __enter__(self) -> "EventWriter":
"""Context manager entry."""
Expand Down
86 changes: 86 additions & 0 deletions snakemake-logger-plugin-snakesee/tests/test_writer.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
"""Tests for EventWriter."""

import threading
from pathlib import Path
from unittest.mock import patch

from snakemake_logger_plugin_snakesee.events import EventType, SnakeseeEvent
from snakemake_logger_plugin_snakesee.writer import EventWriter
Expand Down Expand Up @@ -170,3 +172,87 @@ def test_truncate_clears_existing_file(self, tmp_path: Path) -> None:
parsed = SnakeseeEvent.from_json(lines[0])
assert parsed.event_type == EventType.WORKFLOW_STARTED
assert parsed.timestamp == 3.0

def test_concurrent_write_and_close(self, tmp_path: Path) -> None:
"""Test that close() during concurrent writes doesn't lose events or raise.

Simulates the race condition caused by Snakemake's cleanup calling
handler.close() from the main thread while the QueueListener
background thread is still delivering events via emit()/write().
"""
event_file = tmp_path / "events.jsonl"
writer = EventWriter(event_file)
num_events = 200

# Write one event first to ensure the file exists
writer.write(SnakeseeEvent(event_type=EventType.WORKFLOW_STARTED, timestamp=0.0))

barrier = threading.Barrier(2)

def writer_thread() -> None:
barrier.wait()
for i in range(num_events):
writer.write(
SnakeseeEvent(
event_type=EventType.JOB_STARTED,
timestamp=float(i + 1),
job_id=i,
)
)

t = threading.Thread(target=writer_thread)
t.start()
barrier.wait()
# Close from main thread while writer thread is active
writer.close()
t.join()

# The initial event plus some subset of the concurrent writes
# must be on disk; events arriving after close() are silently
# dropped (no crash).
lines = event_file.read_text().strip().split("\n")
assert len(lines) >= 1
assert len(lines) <= num_events + 1

def test_write_after_close_is_silent(self, tmp_path: Path) -> None:
"""Test that write() after close() is silently ignored."""
event_file = tmp_path / "events.jsonl"
writer = EventWriter(event_file)
writer.write(SnakeseeEvent(event_type=EventType.WORKFLOW_STARTED, timestamp=1.0))
writer.close()

# Write after close should not raise
writer.write(SnakeseeEvent(event_type=EventType.JOB_STARTED, timestamp=2.0, job_id=1))

lines = event_file.read_text().strip().split("\n")
assert len(lines) == 1

def test_double_close(self, tmp_path: Path) -> None:
"""Test that close() can be called multiple times safely."""
event_file = tmp_path / "events.jsonl"
writer = EventWriter(event_file)
writer.write(SnakeseeEvent(event_type=EventType.WORKFLOW_STARTED, timestamp=1.0))
writer.close()
writer.close() # Should not raise

lines = event_file.read_text().strip().split("\n")
assert len(lines) == 1

def test_close_releases_file_handle_on_flush_error(self, tmp_path: Path) -> None:
"""Test that close() releases the file handle even if flush raises."""
event_file = tmp_path / "events.jsonl"
writer = EventWriter(event_file, buffer_size=10)
writer.write(SnakeseeEvent(event_type=EventType.WORKFLOW_STARTED, timestamp=1.0))

# Force an error during the flush inside close().
# The exception propagates (caller is responsible for catching),
# but the file handle must still be released.
with patch.object(writer, "_flush_locked", side_effect=OSError("disk full")):
try:
writer.close()
except OSError:
pass

# File handle must be released despite the flush error
assert writer._file is None
assert writer._closed is True
Loading