diff --git a/elx/__init__.py b/elx/__init__.py index a33ed46..a065068 100644 --- a/elx/__init__.py +++ b/elx/__init__.py @@ -8,6 +8,7 @@ from elx.target import Target from elx.runner import Runner from elx.catalog import Catalog +from elx.record_counter import RecordCounter logger = logging.getLogger("pipx") logger.setLevel(logging.CRITICAL) diff --git a/elx/extensions/dagster/assets.py b/elx/extensions/dagster/assets.py index e142f57..c597ca0 100644 --- a/elx/extensions/dagster/assets.py +++ b/elx/extensions/dagster/assets.py @@ -1,4 +1,4 @@ -from typing import Generator, Iterable, List, Sequence +from typing import Generator, Iterable, List, Mapping, Sequence from elx import Runner from dagster import ( AssetsDefinition, @@ -57,6 +57,13 @@ def run(context: OpExecutionContext) -> Generator[Output, None, None]: Yields: Generator[Output, None, None]: The names of the selected outputs. """ + # Build a mapping from dagster-safe names back to original stream names + stream_name_mapping = { + dagster_safe_name(stream.name): stream.name + for stream in runner.tap.catalog.streams + if stream.is_selected + } + # Execute the runner and yield the selected outputs. runner.run( streams=list(context.selected_output_names), @@ -64,12 +71,19 @@ def run(context: OpExecutionContext) -> Generator[Output, None, None]: ) for context_output_name in context.selected_output_names: + # Get the original stream name to look up the row count + original_stream_name = stream_name_mapping.get( + context_output_name, context_output_name + ) + row_count = runner.record_counts.get(original_stream_name, 0) + yield Output( value=Nothing, output_name=context_output_name, metadata={ "state_path": f"{runner.state_manager.base_path}/{runner.state_file_name}", "state": runner.load_state(), + "row_count": row_count, }, ) diff --git a/elx/record_counter.py b/elx/record_counter.py new file mode 100644 index 0000000..037cbeb --- /dev/null +++ b/elx/record_counter.py @@ -0,0 +1,30 @@ +import json + +class RecordCounter: + """ + A line writer that counts RECORD messages per stream from Singer tap output. + """ + + def __init__(self): + self.counts: dict[str, int] = {} + + def writelines(self, line: str) -> None: + """ + Parse a Singer message line and count RECORD messages per stream. + + Args: + line: A JSON string containing a Singer message. + """ + try: + message = json.loads(line) + if message.get("type") == "RECORD": + stream = message.get("stream") + if stream: + self.counts[stream] = self.counts.get(stream, 0) + 1 + except json.JSONDecodeError: + pass + + def reset(self) -> None: + """Reset all counts to zero.""" + self.counts = {} + diff --git a/elx/runner.py b/elx/runner.py index 37a9640..1d4ffc6 100644 --- a/elx/runner.py +++ b/elx/runner.py @@ -12,6 +12,7 @@ from elx.tap import Tap from elx.target import Target from elx import StateManager +from elx.record_counter import RecordCounter from dotenv import load_dotenv from elx.utils import capture_subprocess_output @@ -30,6 +31,7 @@ def __init__( self.tap = tap self.target = target self.state_manager = state_manager + self.record_counts: dict[str, int] = {} @property def name(self) -> str: @@ -113,6 +115,9 @@ def writelines(self, line: str): if self.logger: self.logger.info(line) + # Create a record counter to track row counts per stream + record_counter = RecordCounter() + async with self.tap.process( state=state, streams=streams, @@ -120,7 +125,7 @@ def writelines(self, line: str): async with self.target.process( tap_process=tap_process, ) as target_process: - tap_outputs = [target_process.stdin] + tap_outputs = [target_process.stdin, record_counter] tap_stdout_future = asyncio.ensure_future( # forward subproc stdout to tap_outputs (i.e. targets stdin) capture_subprocess_output(tap_process.stdout, *tap_outputs), @@ -239,6 +244,9 @@ def writelines(self, line: str): elif target_code: raise Exception("Target failed") + # Store the record counts for access after the run + self.record_counts = record_counter.counts + if __name__ == "__main__": tap = Tap( diff --git a/elx/utils.py b/elx/utils.py index 4b8f6ae..a0c74c5 100644 --- a/elx/utils.py +++ b/elx/utils.py @@ -1,6 +1,5 @@ import asyncio - def require_install(func): """ Decorator to check if the executable is installed. diff --git a/tests/test_elx/test_runner.py b/tests/test_elx/test_runner.py index 27ee397..e647bb6 100644 --- a/tests/test_elx/test_runner.py +++ b/tests/test_elx/test_runner.py @@ -40,3 +40,23 @@ def test_config_interpolation_target_values(tap: Tap): runner = Runner(tap, target) assert runner.target.config["tap_name"] == "tap_mock_fixture" + + +def test_record_counts(runner: Runner): + """ + Test that record counts are tracked per stream after a run. + """ + # Run the extract-load pipeline + runner.run() + + # Assert that record_counts is populated + assert runner.record_counts is not None + assert isinstance(runner.record_counts, dict) + + # Assert that at least one stream has records + assert len(runner.record_counts) > 0 + + # Assert all counts are positive integers + for stream_name, count in runner.record_counts.items(): + assert isinstance(count, int) + assert count > 0 diff --git a/tests/test_elx/test_utils.py b/tests/test_elx/test_utils.py index e77d82d..6dc787c 100644 --- a/tests/test_elx/test_utils.py +++ b/tests/test_elx/test_utils.py @@ -1,3 +1,4 @@ +from elx import RecordCounter from elx.utils import interpolate_in_config @@ -22,3 +23,56 @@ def test_interpolate_in_config(): ) # Assert that the interpolated config is correct assert interpolated_config == expected_config + + +def test_record_counter_counts_records(): + """ + Test that RecordCounter correctly counts RECORD messages per stream. + """ + counter = RecordCounter() + + # Simulate Singer RECORD messages + counter.writelines('{"type": "RECORD", "stream": "users", "record": {"id": 1}}') + counter.writelines('{"type": "RECORD", "stream": "users", "record": {"id": 2}}') + counter.writelines('{"type": "RECORD", "stream": "orders", "record": {"id": 1}}') + + assert counter.counts == {"users": 2, "orders": 1} + + +def test_record_counter_ignores_non_record_messages(): + """ + Test that RecordCounter ignores non-RECORD messages. + """ + counter = RecordCounter() + + # Simulate different Singer message types + counter.writelines('{"type": "SCHEMA", "stream": "users", "schema": {}}') + counter.writelines('{"type": "STATE", "value": {"bookmarks": {}}}') + counter.writelines('{"type": "RECORD", "stream": "users", "record": {"id": 1}}') + + assert counter.counts == {"users": 1} + + +def test_record_counter_handles_invalid_json(): + """ + Test that RecordCounter gracefully handles invalid JSON. + """ + counter = RecordCounter() + + counter.writelines("not valid json") + counter.writelines('{"type": "RECORD", "stream": "users", "record": {"id": 1}}') + + assert counter.counts == {"users": 1} + + +def test_record_counter_reset(): + """ + Test that RecordCounter.reset() clears all counts. + """ + counter = RecordCounter() + + counter.writelines('{"type": "RECORD", "stream": "users", "record": {"id": 1}}') + assert counter.counts == {"users": 1} + + counter.reset() + assert counter.counts == {}