From 3964b570f735b1773f2173b48ddbf314ebc33f88 Mon Sep 17 00:00:00 2001 From: Yun Zheng Hu Date: Mon, 8 Sep 2025 13:53:24 +0000 Subject: [PATCH] Fix BrokenPipeError handling in rdump Fixes #186 --- flow/record/tools/rdump.py | 62 +++++++++++++++----------------------- tests/tools/test_rdump.py | 35 +++++++++++++++++++++ 2 files changed, 60 insertions(+), 37 deletions(-) diff --git a/flow/record/tools/rdump.py b/flow/record/tools/rdump.py index 63043a45..417c7490 100644 --- a/flow/record/tools/rdump.py +++ b/flow/record/tools/rdump.py @@ -393,50 +393,38 @@ def main(argv: list[str] | None = None) -> int: ret = 0 try: - record_writer = RecordWriter(uri) - for count, rec in enumerate(record_iterator, start=1): # noqa: B007 - if args.record_source is not None: - rec._source = args.record_source - if args.record_classification is not None: - rec._classification = args.record_classification - if record_field_rewriter: - rec = record_field_rewriter.rewrite(rec) - - if args.list: - # Dump RecordDescriptors - desc = rec._desc - if desc.descriptor_hash not in seen_desc: - seen_desc.add(desc.descriptor_hash) - print(f"# {desc}") - print(desc.definition()) - print() - else: - # Dump Records - if args.multi_timestamp: - for record in iter_timestamped_records(rec): - record_writer.write(record) + with RecordWriter(uri) as record_writer: + for count, rec in enumerate(record_iterator, start=1): # noqa: B007 + if args.record_source is not None: + rec._source = args.record_source + if args.record_classification is not None: + rec._classification = args.record_classification + if record_field_rewriter: + rec = record_field_rewriter.rewrite(rec) + + if args.list: + # Dump RecordDescriptors + desc = rec._desc + if desc.descriptor_hash not in seen_desc: + seen_desc.add(desc.descriptor_hash) + print(f"# {desc}") + print(desc.definition()) + print() else: - record_writer.write(rec) - + # Dump Records + if args.multi_timestamp: + for record in iter_timestamped_records(rec): + record_writer.write(record) + else: + record_writer.write(rec) + except (BrokenPipeError, OSError): + raise except Exception as e: print_error(e) - - # Prevent throwing an exception twice when deconstructing the record writer. - if hasattr(record_writer, "exception") and record_writer.exception is e: - record_writer.exception = None - ret = 1 - finally: if progress_monitor: progress_monitor.stop() - if record_writer: - # Exceptions raised in threads can be thrown when deconstructing the writer. - try: - record_writer.__exit__() - except Exception as e: - print_error(e) - ret = 1 if (args.list or args.stats) and not args.progress: stats = f"Processed {ctx.read} records (matched={ctx.matched}, unmatched={ctx.unmatched})" diff --git a/tests/tools/test_rdump.py b/tests/tools/test_rdump.py index 64bcad76..0877a25e 100644 --- a/tests/tools/test_rdump.py +++ b/tests/tools/test_rdump.py @@ -762,3 +762,38 @@ def test_record_rdump_stats(tmp_path: Path, capsys: pytest.CaptureFixture) -> No rdump.main(["--list", "--stats", str(tmp_path / "test.records")]) captured = capsys.readouterr() assert "Processed 100 records (matched=100, unmatched=0)" in captured.out + + +@pytest.mark.skipif(platform.system() == "Windows", reason="skipping this test on Windows") +def test_rdump_catch_sigpipe(tmp_path: Path) -> None: + """Test if rdump properly suppresses BrokenPipeError when writing to a closed file handle.""" + + TestRecord = RecordDescriptor( + "test/record", + [ + ("varint", "count"), + ("string", "foo"), + ], + ) + + path = tmp_path / "test.records" + with RecordWriter(path) as writer: + for i in range(10): + writer.write(TestRecord(count=i, foo="bar")) + + # rdump test.records | head -n 2 + proc = subprocess.Popen( + f"rdump {path} | head -n 2", + shell=True, + text=True, + stderr=subprocess.PIPE, + stdout=subprocess.PIPE, + ) + + stdout, stderr = proc.communicate() + exit_code = proc.wait() + assert exit_code == 0 + assert stderr == "" # We don't expect any BrokenPipeError + assert "test/record count=0" in stdout + assert "test/record count=1" in stdout + assert len(stdout.splitlines()) == 2