Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 31 additions & 1 deletion snakesee/estimation/data_loader.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,18 @@
"""Historical data loading for time estimation."""

import logging
from collections.abc import Iterator
from pathlib import Path
from typing import TYPE_CHECKING

import orjson

from snakesee.constants import MAX_EVENTS_LINE_LENGTH
from snakesee.parser import parse_metadata_files_full
from snakesee.parser.metadata import MetadataRecord

if TYPE_CHECKING:
from snakesee.persistence.backend import PersistenceBackend
from snakesee.state.rule_registry import RuleRegistry
from snakesee.types import ProgressCallback

Expand Down Expand Up @@ -53,9 +56,36 @@ def load_from_metadata(
metadata_dir: Path to .snakemake/metadata/ directory.
progress_callback: Optional callback(current, total) for progress.
"""
self._consume_metadata_records(parse_metadata_files_full(metadata_dir, progress_callback))

def load_from_backend(
self,
backend: "PersistenceBackend",
progress_callback: "ProgressCallback | None" = None,
) -> None:
"""Load historical execution times from a persistence backend.

Works with both filesystem and SQLite backends via the
PersistenceBackend protocol.

Args:
backend: Persistence backend to read from.
progress_callback: Optional callback(current, total) for progress.
"""
self._consume_metadata_records(backend.iterate_metadata(progress_callback))

def _consume_metadata_records(
self,
records: "Iterator[MetadataRecord]",
) -> None:
"""Process metadata records into the registry and code hash map.

Args:
records: Iterator of MetadataRecord instances.
"""
hash_to_rules: dict[str, set[str]] = {}

for record in parse_metadata_files_full(metadata_dir, progress_callback):
for record in records:
duration = record.duration
end_time = record.end_time

Expand Down
19 changes: 19 additions & 0 deletions snakesee/estimation/estimator.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from snakesee.state.config import EstimationConfig

if TYPE_CHECKING:
from snakesee.persistence import PersistenceBackend
from snakesee.state.rule_registry import RuleRegistry
from snakesee.types import ProgressCallback

Expand Down Expand Up @@ -168,6 +169,24 @@ def load_from_metadata(
loader.load_from_metadata(metadata_dir, progress_callback)
self.code_hash_to_rules = loader.code_hash_to_rules

def load_from_backend(
self,
backend: "PersistenceBackend",
progress_callback: "ProgressCallback | None" = None,
) -> None:
"""Load historical execution times from a persistence backend.

Supports both filesystem and SQLite backends via the PersistenceBackend
protocol. Delegates to HistoricalDataLoader.load_from_backend().

Args:
backend: Persistence backend to read metadata from.
progress_callback: Optional callback(current, total) for progress reporting.
"""
loader = self._get_data_loader()
loader.load_from_backend(backend, progress_callback)
self.code_hash_to_rules = loader.code_hash_to_rules

def load_from_events(self, events_file: Path) -> None:
"""
Load historical execution times from a snakesee events file.
Expand Down
57 changes: 49 additions & 8 deletions snakesee/parser/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import logging
from collections.abc import Iterator
from pathlib import Path
from typing import TYPE_CHECKING

from snakesee.constants import STALE_WORKFLOW_THRESHOLD_SECONDS
from snakesee.models import JobInfo
Expand All @@ -33,6 +34,9 @@
from snakesee.parser.patterns import THREADS_PATTERN
from snakesee.parser.patterns import TIMESTAMP_PATTERN
from snakesee.parser.patterns import WILDCARDS_PATTERN

if TYPE_CHECKING:
from snakesee.persistence.backend import PersistenceBackend
from snakesee.parser.stats import collect_rule_timing_stats
from snakesee.parser.stats import collect_wildcard_timing_stats
from snakesee.parser.utils import _parse_non_negative_int
Expand Down Expand Up @@ -879,6 +883,7 @@ def parse_all_jobs_from_log(
def is_workflow_running(
snakemake_dir: Path,
stale_threshold: float = STALE_WORKFLOW_THRESHOLD_SECONDS,
backend: PersistenceBackend | None = None,
) -> bool:
"""
Check if a workflow is currently running.
Expand All @@ -896,13 +901,38 @@ def is_workflow_running(
snakemake_dir: Path to the .snakemake directory.
stale_threshold: Seconds since last log modification before considering
the workflow stale/dead. Default 1800 seconds (30 minutes).
backend: Optional persistence backend. If provided, uses it for
lock and incomplete checks instead of filesystem.

Returns:
True if workflow appears to be actively running, False otherwise.
"""
from snakesee.state.clock import get_clock
from snakesee.state.paths import WorkflowPaths

# Use backend for lock/incomplete checks if provided
if backend is not None:
has_locks = backend.has_locks()
if not has_locks:
return False

if backend.has_incomplete_jobs():
return True

# Fall back to log freshness check
paths = WorkflowPaths(snakemake_dir.parent)
log_file = paths.find_latest_log()
if log_file is None:
return True

try:
log_mtime = log_file.stat().st_mtime
age = get_clock().now() - log_mtime
return age < stale_threshold
except OSError:
return True

# Original filesystem-based logic (unchanged below)
locks_dir = snakemake_dir / "locks"
if not locks_dir.exists():
return False
Expand Down Expand Up @@ -1152,6 +1182,12 @@ def parse_workflow_state(
from snakesee.state.paths import WorkflowPaths

paths = WorkflowPaths(workflow_dir)

# Auto-detect persistence backend (SQLite DB or filesystem)
from snakesee.persistence import detect_backend

backend = detect_backend(workflow_dir)

snakemake_dir = paths.snakemake_dir

# Use specified log file or find latest
Expand All @@ -1160,7 +1196,7 @@ def parse_workflow_state(
is_latest_log = log_file is None or log_file == latest_log

# Determine status from lock files (only relevant for latest log)
workflow_is_running = is_latest_log and is_workflow_running(snakemake_dir)
workflow_is_running = is_latest_log and is_workflow_running(snakemake_dir, backend=backend)
if workflow_is_running:
status = WorkflowStatus.RUNNING
else:
Expand Down Expand Up @@ -1228,13 +1264,18 @@ def parse_workflow_state(
running = parse_running_jobs_from_log(log_path, _cached_lines=cached_lines)
failed_list = parse_failed_jobs_from_log(log_path, _cached_lines=cached_lines)

# Check for incomplete markers (jobs that were in progress when workflow was interrupted)
incomplete_dir = snakemake_dir / "incomplete"
incomplete_list = (
list(parse_incomplete_jobs(incomplete_dir, min_start_time=start_time))
if is_latest_log
else []
)
# Check for incomplete jobs (from DB or filesystem markers)
if is_latest_log:
incomplete_list = [
JobInfo(
rule=ij.rule or "unknown",
start_time=ij.start_time,
output_file=ij.output_file,
)
for ij in backend.iterate_incomplete_jobs(min_start_time=start_time)
]
else:
incomplete_list = []

# Reconcile running, failed, and incomplete job lists
running, incomplete_list = _reconcile_job_lists(
Expand Down
6 changes: 3 additions & 3 deletions snakesee/parser/metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ def to_job_info(self) -> JobInfo:
)


def _calculate_input_size(input_files: list[str] | None) -> int | None:
def calculate_metadata_input_size(input_files: list[str] | None) -> int | None:
"""Calculate total input size from file list.

Args:
Expand Down Expand Up @@ -119,7 +119,7 @@ def parse_metadata_files(
start_time=starttime,
end_time=endtime,
wildcards=wildcards,
input_size=_calculate_input_size(data.get("input")),
input_size=calculate_metadata_input_size(data.get("input")),
)


Expand Down Expand Up @@ -167,7 +167,7 @@ def parse_metadata_files_full(
start_time=starttime,
end_time=endtime,
wildcards=wildcards,
input_size=_calculate_input_size(data.get("input")),
input_size=calculate_metadata_input_size(data.get("input")),
code_hash=code_hash,
)

Expand Down
19 changes: 19 additions & 0 deletions snakesee/persistence/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
"""Persistence backend abstraction for reading Snakemake metadata.

Supports both the filesystem-based backend (.snakemake/metadata/ directory)
and the newer SQLite-based backend (.snakemake/metadata.db).
"""

from snakesee.persistence.backend import IncompleteJob
from snakesee.persistence.backend import PersistenceBackend
from snakesee.persistence.backend import detect_backend
from snakesee.persistence.db import DbPersistence
from snakesee.persistence.fs import FsPersistence

__all__ = [
"DbPersistence",
"FsPersistence",
"IncompleteJob",
"PersistenceBackend",
"detect_backend",
]
101 changes: 101 additions & 0 deletions snakesee/persistence/backend.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
"""Persistence backend protocol and auto-detection."""

from __future__ import annotations

from collections.abc import Iterator
from dataclasses import dataclass
from pathlib import Path
from typing import TYPE_CHECKING
from typing import Protocol

if TYPE_CHECKING:
from snakesee.types import ProgressCallback

from snakesee.parser.metadata import MetadataRecord


class PersistenceBackend(Protocol):
"""Protocol for reading Snakemake workflow metadata.

Implementations provide access to job metadata, incomplete markers,
and lock state from either the filesystem or SQLite backends.
"""

def iterate_metadata(
self,
progress_callback: ProgressCallback | None = None,
) -> Iterator[MetadataRecord]:
"""Iterate over all job metadata records.

Args:
progress_callback: Optional callback(current, total) for progress.

Yields:
MetadataRecord for each completed job.
"""
...

def has_incomplete_jobs(self) -> bool:
"""Check whether any jobs are currently marked incomplete."""
...

def iterate_incomplete_jobs(
self,
min_start_time: float | None = None,
) -> Iterator[IncompleteJob]:
"""Iterate over incomplete (in-progress) job markers.

Args:
min_start_time: If set, only yield jobs started at or after this time.

Yields:
IncompleteJob for each in-progress job.
"""
...

def has_locks(self) -> bool:
"""Check whether any workflow locks are held."""
...


@dataclass(frozen=True, slots=True)
class IncompleteJob:
"""A job marked as incomplete/in-progress.

Attributes:
start_time: Approximate start time (mtime for FS, starttime for DB).
output_file: Output file path if known.
rule: Rule name if known (DB backend provides this; FS does not).
external_jobid: External executor job ID if known.
"""

start_time: float | None = None
output_file: Path | None = None
rule: str | None = None
external_jobid: str | None = None


def detect_backend(workflow_dir: Path) -> PersistenceBackend:
"""Auto-detect and return the appropriate persistence backend.

Prefers the SQLite DB backend when .snakemake/metadata.db exists.
Falls back to filesystem backend otherwise.

Args:
workflow_dir: Root workflow directory containing .snakemake/.

Returns:
A PersistenceBackend implementation.
"""
from snakesee.state.paths import WorkflowPaths

paths = WorkflowPaths(workflow_dir)

if paths.metadata_db.exists():
from snakesee.persistence.db import DbPersistence

return DbPersistence(paths)
Comment thread
nh13 marked this conversation as resolved.

from snakesee.persistence.fs import FsPersistence

return FsPersistence(paths)
Loading
Loading