Skip to content

Commit b0e3a49

Browse files
authored
fix: parse checkpoint and localcheckpoint rule lines (#53)
Snakemake logs checkpoint rules as "checkpoint X:" or "localcheckpoint X:" instead of "rule X:" or "localrule X:". RULE_START_PATTERN only matched the rule/localrule forms, making checkpoint jobs invisible to all log-based parsers. This caused checkpoint timing to be either lost or misattributed to whichever rule preceded the checkpoint in the log. Update RULE_START_PATTERN to match all four forms and add checkpoint prefixes to the LogLineParser fast-path checks. Closes #43
1 parent 684359e commit b0e3a49

File tree

3 files changed

+177
-14
lines changed

3 files changed

+177
-14
lines changed

snakesee/parser/line_parser.py

Lines changed: 5 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -185,18 +185,11 @@ def parse_line(self, line: str) -> list[ParseEvent]:
185185
events.append(event)
186186
return events
187187

188-
# Rule start: "rule X:" or "localrule X:" - this ends error blocks
189-
if first_char == "r" and line.startswith("rule "):
190-
if match := RULE_START_PATTERN.match(line):
191-
# Flush any pending error before emitting rule start
192-
if pending := self.context.get_pending_error():
193-
events.append(pending)
194-
rule = match.group(1)
195-
self.context.reset_for_new_rule(rule)
196-
events.append(ParseEvent(ParseEventType.RULE_START, {"rule": rule}))
197-
return events
198-
199-
if first_char == "l" and line.startswith("localrule "):
188+
# Rule/checkpoint start - this ends error blocks
189+
# Matches: "rule X:", "localrule X:", "checkpoint X:", "localcheckpoint X:"
190+
if first_char in ("r", "l", "c") and (
191+
line.startswith(("rule ", "localrule ", "checkpoint ", "localcheckpoint "))
192+
):
200193
if match := RULE_START_PATTERN.match(line):
201194
# Flush any pending error before emitting rule start
202195
if pending := self.context.get_pending_error():

snakesee/parser/patterns.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,8 @@
99
# Pattern: "15 of 50 steps (30%) done"
1010
PROGRESS_PATTERN = re.compile(r"(\d+) of (\d+) steps \((\d+(?:\.\d+)?)%\) done")
1111

12-
# Pattern for rule start: "rule align:" or "localrule all:"
13-
RULE_START_PATTERN = re.compile(r"(?:local)?rule (\w+):")
12+
# Pattern for rule start: "rule align:", "localrule all:", "checkpoint X:", "localcheckpoint X:"
13+
RULE_START_PATTERN = re.compile(r"(?:local)?(?:rule|checkpoint) (\w+):")
1414

1515
# Pattern for job ID in log: " jobid: 5"
1616
JOBID_PATTERN = re.compile(r"\s+jobid:\s*(\d+)")

tests/test_parser.py

Lines changed: 170 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2219,3 +2219,173 @@ def test_parse_jobs_deduplicates_by_jobid(self, tmp_path: Path) -> None:
22192219

22202220
assert len(jobs) == 1
22212221
assert jobs[0].job_id == "1"
2222+
2223+
2224+
# Snakemake logs checkpoint rules as "checkpoint X:" or "localcheckpoint X:"
2225+
# instead of "rule X:" or "localrule X:".
2226+
CHECKPOINT_LOG = """\
2227+
[Mon Jan 6 10:00:00 2026]
2228+
localcheckpoint discover:
2229+
output: output/discovered
2230+
wildcards: sample=A
2231+
jobid: 5
2232+
[Mon Jan 6 10:00:02 2026]
2233+
Finished jobid: 5 (Rule: discover)
2234+
1 of 3 steps (33%) done
2235+
[Mon Jan 6 10:00:02 2026]
2236+
rule process:
2237+
input: output/discovered/C.txt
2238+
output: output/processed/C.done
2239+
wildcards: sample=C
2240+
jobid: 6
2241+
[Mon Jan 6 10:00:04 2026]
2242+
Finished jobid: 6 (Rule: process)
2243+
2 of 3 steps (67%) done
2244+
[Mon Jan 6 10:00:04 2026]
2245+
localrule all:
2246+
input: output/processed/C.done
2247+
jobid: 0
2248+
[Mon Jan 6 10:00:04 2026]
2249+
Finished jobid: 0 (Rule: all)
2250+
3 of 3 steps (100%) done
2251+
"""
2252+
2253+
CHECKPOINT_LOG_RUNNING = """\
2254+
[Mon Jan 6 10:00:00 2026]
2255+
checkpoint discover:
2256+
output: output/discovered
2257+
wildcards: sample=A
2258+
jobid: 5
2259+
"""
2260+
2261+
CHECKPOINT_LOG_ERROR = """\
2262+
[Mon Jan 6 10:00:00 2026]
2263+
checkpoint discover:
2264+
output: output/discovered
2265+
wildcards: sample=A
2266+
jobid: 5
2267+
[Mon Jan 6 10:00:02 2026]
2268+
Error in rule discover:
2269+
jobid: 5
2270+
log: logs/discover.log (check log file(s) for error details)
2271+
"""
2272+
2273+
2274+
class TestCheckpointParsing:
2275+
"""Tests for parsing Snakemake checkpoint rules.
2276+
2277+
Snakemake logs checkpoint rules as "checkpoint X:" or "localcheckpoint X:"
2278+
instead of "rule X:" or "localrule X:". The parser must handle both forms.
2279+
"""
2280+
2281+
def test_running_checkpoint_job(self, snakemake_dir: Path) -> None:
2282+
"""A checkpoint job in progress is detected as running."""
2283+
log_file = snakemake_dir / "log" / "test.snakemake.log"
2284+
log_file.write_text(CHECKPOINT_LOG_RUNNING)
2285+
2286+
running = parse_running_jobs_from_log(log_file)
2287+
assert len(running) == 1
2288+
assert running[0].rule == "discover"
2289+
assert running[0].job_id == "5"
2290+
assert running[0].wildcards == {"sample": "A"}
2291+
2292+
def test_completed_checkpoint_job(self, tmp_path: Path) -> None:
2293+
"""Completed checkpoint jobs have correct rule name and timing."""
2294+
from snakesee.parser import parse_completed_jobs_from_log
2295+
2296+
log_file = tmp_path / "test.log"
2297+
log_file.write_text(CHECKPOINT_LOG)
2298+
2299+
completed = parse_completed_jobs_from_log(log_file)
2300+
assert len(completed) == 3
2301+
rules = {j.rule for j in completed}
2302+
assert rules == {"discover", "process", "all"}
2303+
2304+
# Verify the checkpoint job specifically
2305+
cp_job = next(j for j in completed if j.rule == "discover")
2306+
assert cp_job.job_id == "5"
2307+
assert cp_job.start_time is not None
2308+
assert cp_job.end_time is not None
2309+
assert cp_job.end_time >= cp_job.start_time
2310+
2311+
def test_failed_checkpoint_job(self, snakemake_dir: Path) -> None:
2312+
"""A failed checkpoint job is detected with correct rule name."""
2313+
log_file = snakemake_dir / "log" / "test.snakemake.log"
2314+
log_file.write_text(CHECKPOINT_LOG_ERROR)
2315+
2316+
failed = parse_failed_jobs_from_log(log_file)
2317+
assert len(failed) == 1
2318+
assert failed[0].rule == "discover"
2319+
assert failed[0].job_id == "5"
2320+
2321+
def test_all_jobs_includes_checkpoint(self, tmp_path: Path) -> None:
2322+
"""parse_all_jobs_from_log includes checkpoint jobs."""
2323+
from snakesee.parser import parse_all_jobs_from_log
2324+
2325+
log_file = tmp_path / "test.log"
2326+
log_file.write_text(CHECKPOINT_LOG)
2327+
2328+
jobs = parse_all_jobs_from_log(log_file)
2329+
assert len(jobs) == 3
2330+
rules = {j.rule for j in jobs}
2331+
assert rules == {"discover", "process", "all"}
2332+
2333+
cp_job = next(j for j in jobs if j.rule == "discover")
2334+
assert cp_job.wildcards == {"sample": "A"}
2335+
2336+
def test_checkpoint_does_not_steal_next_rule_timing(self, tmp_path: Path) -> None:
2337+
"""A checkpoint must not cause the following rule's timing to be wrong.
2338+
2339+
Before the fix, the checkpoint line was not matched, so the next rule's
2340+
jobid got associated with whatever rule came before the checkpoint.
2341+
"""
2342+
from snakesee.parser import parse_completed_jobs_from_log
2343+
2344+
log_file = tmp_path / "test.log"
2345+
log_file.write_text(CHECKPOINT_LOG)
2346+
2347+
completed = parse_completed_jobs_from_log(log_file)
2348+
process_job = next(j for j in completed if j.rule == "process")
2349+
assert process_job.job_id == "6"
2350+
# Duration should be ~2 seconds, not inflated
2351+
assert process_job.duration is not None
2352+
assert process_job.duration < 10.0
2353+
2354+
2355+
class TestLogLineParserCheckpoint:
2356+
"""Tests for LogLineParser handling of checkpoint rule lines."""
2357+
2358+
def test_checkpoint_rule_start(self) -> None:
2359+
"""'checkpoint X:' is parsed as RULE_START."""
2360+
from snakesee.parser.line_parser import LogLineParser
2361+
from snakesee.parser.line_parser import ParseEventType
2362+
2363+
parser = LogLineParser()
2364+
events = parser.parse_line("checkpoint discover:")
2365+
assert len(events) == 1
2366+
assert events[0].event_type == ParseEventType.RULE_START
2367+
assert events[0].data["rule"] == "discover"
2368+
2369+
def test_localcheckpoint_rule_start(self) -> None:
2370+
"""'localcheckpoint X:' is parsed as RULE_START."""
2371+
from snakesee.parser.line_parser import LogLineParser
2372+
from snakesee.parser.line_parser import ParseEventType
2373+
2374+
parser = LogLineParser()
2375+
events = parser.parse_line("localcheckpoint discover:")
2376+
assert len(events) == 1
2377+
assert events[0].event_type == ParseEventType.RULE_START
2378+
assert events[0].data["rule"] == "discover"
2379+
2380+
def test_checkpoint_sets_context_rule(self) -> None:
2381+
"""Checkpoint line sets the parser context so subsequent jobid gets correct rule."""
2382+
from snakesee.parser.line_parser import LogLineParser
2383+
from snakesee.parser.line_parser import ParseEventType
2384+
2385+
parser = LogLineParser()
2386+
parser.parse_line("checkpoint discover:")
2387+
events = parser.parse_line(" jobid: 5")
2388+
2389+
jobid_event = events[0]
2390+
assert jobid_event.event_type == ParseEventType.JOBID
2391+
assert jobid_event.data["rule"] == "discover"

0 commit comments

Comments
 (0)