From d31017f78d369c118a26a1a435c8c9eb20a43243 Mon Sep 17 00:00:00 2001 From: BernardWez Date: Wed, 17 Dec 2025 16:43:00 +0100 Subject: [PATCH 1/6] Add record_count to dagster asset metadata --- elx/extensions/dagster/assets.py | 14 ++++++++ elx/runner.py | 11 +++++-- elx/utils.py | 30 +++++++++++++++++ tests/test_elx/test_runner.py | 20 ++++++++++++ tests/test_elx/test_utils.py | 55 +++++++++++++++++++++++++++++++- 5 files changed, 127 insertions(+), 3 deletions(-) diff --git a/elx/extensions/dagster/assets.py b/elx/extensions/dagster/assets.py index e142f57..81a64e6 100644 --- a/elx/extensions/dagster/assets.py +++ b/elx/extensions/dagster/assets.py @@ -57,6 +57,13 @@ def run(context: OpExecutionContext) -> Generator[Output, None, None]: Yields: Generator[Output, None, None]: The names of the selected outputs. """ + # Build a mapping from dagster-safe names back to original stream names + stream_name_mapping = { + dagster_safe_name(stream.name): stream.name + for stream in runner.tap.catalog.streams + if stream.is_selected + } + # Execute the runner and yield the selected outputs. runner.run( streams=list(context.selected_output_names), @@ -64,12 +71,19 @@ def run(context: OpExecutionContext) -> Generator[Output, None, None]: ) for context_output_name in context.selected_output_names: + # Get the original stream name to look up the row count + original_stream_name = stream_name_mapping.get( + context_output_name, context_output_name + ) + row_count = runner.record_counts.get(original_stream_name, 0) + yield Output( value=Nothing, output_name=context_output_name, metadata={ "state_path": f"{runner.state_manager.base_path}/{runner.state_file_name}", "state": runner.load_state(), + "row_count": row_count, }, ) diff --git a/elx/runner.py b/elx/runner.py index 37a9640..db2b152 100644 --- a/elx/runner.py +++ b/elx/runner.py @@ -14,7 +14,7 @@ from elx import StateManager from dotenv import load_dotenv -from elx.utils import capture_subprocess_output +from elx.utils import capture_subprocess_output, RecordCounter logging.basicConfig(level=logging.INFO) @@ -30,6 +30,7 @@ def __init__( self.tap = tap self.target = target self.state_manager = state_manager + self.record_counts: dict[str, int] = {} @property def name(self) -> str: @@ -113,6 +114,9 @@ def writelines(self, line: str): if self.logger: self.logger.info(line) + # Create a record counter to track row counts per stream + record_counter = RecordCounter() + async with self.tap.process( state=state, streams=streams, @@ -120,7 +124,7 @@ def writelines(self, line: str): async with self.target.process( tap_process=tap_process, ) as target_process: - tap_outputs = [target_process.stdin] + tap_outputs = [target_process.stdin, record_counter] tap_stdout_future = asyncio.ensure_future( # forward subproc stdout to tap_outputs (i.e. targets stdin) capture_subprocess_output(tap_process.stdout, *tap_outputs), @@ -239,6 +243,9 @@ def writelines(self, line: str): elif target_code: raise Exception("Target failed") + # Store the record counts for access after the run + self.record_counts = record_counter.counts + if __name__ == "__main__": tap = Tap( diff --git a/elx/utils.py b/elx/utils.py index 4b8f6ae..5fb882d 100644 --- a/elx/utils.py +++ b/elx/utils.py @@ -1,4 +1,34 @@ import asyncio +import json + + +class RecordCounter: + """ + A line writer that counts RECORD messages per stream from Singer tap output. + """ + + def __init__(self): + self.counts: dict[str, int] = {} + + def writelines(self, line: str) -> None: + """ + Parse a Singer message line and count RECORD messages per stream. + + Args: + line: A JSON string containing a Singer message. + """ + try: + message = json.loads(line) + if message.get("type") == "RECORD": + stream = message.get("stream") + if stream: + self.counts[stream] = self.counts.get(stream, 0) + 1 + except json.JSONDecodeError: + pass + + def reset(self) -> None: + """Reset all counts to zero.""" + self.counts = {} def require_install(func): diff --git a/tests/test_elx/test_runner.py b/tests/test_elx/test_runner.py index 27ee397..e647bb6 100644 --- a/tests/test_elx/test_runner.py +++ b/tests/test_elx/test_runner.py @@ -40,3 +40,23 @@ def test_config_interpolation_target_values(tap: Tap): runner = Runner(tap, target) assert runner.target.config["tap_name"] == "tap_mock_fixture" + + +def test_record_counts(runner: Runner): + """ + Test that record counts are tracked per stream after a run. + """ + # Run the extract-load pipeline + runner.run() + + # Assert that record_counts is populated + assert runner.record_counts is not None + assert isinstance(runner.record_counts, dict) + + # Assert that at least one stream has records + assert len(runner.record_counts) > 0 + + # Assert all counts are positive integers + for stream_name, count in runner.record_counts.items(): + assert isinstance(count, int) + assert count > 0 diff --git a/tests/test_elx/test_utils.py b/tests/test_elx/test_utils.py index e77d82d..0965441 100644 --- a/tests/test_elx/test_utils.py +++ b/tests/test_elx/test_utils.py @@ -1,4 +1,4 @@ -from elx.utils import interpolate_in_config +from elx.utils import interpolate_in_config, RecordCounter def test_interpolate_in_config(): @@ -22,3 +22,56 @@ def test_interpolate_in_config(): ) # Assert that the interpolated config is correct assert interpolated_config == expected_config + + +def test_record_counter_counts_records(): + """ + Test that RecordCounter correctly counts RECORD messages per stream. + """ + counter = RecordCounter() + + # Simulate Singer RECORD messages + counter.writelines('{"type": "RECORD", "stream": "users", "record": {"id": 1}}') + counter.writelines('{"type": "RECORD", "stream": "users", "record": {"id": 2}}') + counter.writelines('{"type": "RECORD", "stream": "orders", "record": {"id": 1}}') + + assert counter.counts == {"users": 2, "orders": 1} + + +def test_record_counter_ignores_non_record_messages(): + """ + Test that RecordCounter ignores non-RECORD messages. + """ + counter = RecordCounter() + + # Simulate different Singer message types + counter.writelines('{"type": "SCHEMA", "stream": "users", "schema": {}}') + counter.writelines('{"type": "STATE", "value": {"bookmarks": {}}}') + counter.writelines('{"type": "RECORD", "stream": "users", "record": {"id": 1}}') + + assert counter.counts == {"users": 1} + + +def test_record_counter_handles_invalid_json(): + """ + Test that RecordCounter gracefully handles invalid JSON. + """ + counter = RecordCounter() + + counter.writelines("not valid json") + counter.writelines('{"type": "RECORD", "stream": "users", "record": {"id": 1}}') + + assert counter.counts == {"users": 1} + + +def test_record_counter_reset(): + """ + Test that RecordCounter.reset() clears all counts. + """ + counter = RecordCounter() + + counter.writelines('{"type": "RECORD", "stream": "users", "record": {"id": 1}}') + assert counter.counts == {"users": 1} + + counter.reset() + assert counter.counts == {} From d8cc6a3102362ba15981a05bebf7fab421bd6f6f Mon Sep 17 00:00:00 2001 From: BernardWez Date: Wed, 17 Dec 2025 16:59:58 +0100 Subject: [PATCH 2/6] Add tags parameter to load_assets function for asset metadata --- elx/extensions/dagster/assets.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/elx/extensions/dagster/assets.py b/elx/extensions/dagster/assets.py index 81a64e6..9062682 100644 --- a/elx/extensions/dagster/assets.py +++ b/elx/extensions/dagster/assets.py @@ -1,4 +1,4 @@ -from typing import Generator, Iterable, List, Sequence +from typing import Generator, Iterable, List, Mapping, Sequence from elx import Runner from dagster import ( AssetsDefinition, @@ -22,6 +22,7 @@ def load_assets( deps: Iterable[AssetKey | str | Sequence[str] | AssetsDefinition | SourceAsset | AssetDep] | None = None, key_prefix: str | Sequence[str] | None = None, group_name: str | None = None, + tags: Mapping[str, str] | None = None, ) -> List[AssetsDefinition]: """ Load the assets for a runner, each asset represents one tap target combination. @@ -31,6 +32,7 @@ def load_assets( deps (Iterable[AssetKey | str | Sequence[str] | AssetsDefinition | SourceAsset | AssetDep] | None): Upstream assets upon which the assets depend. key_prefix (str | Sequence[str] | None): Key prefix for the assets. If not provided, defaults to the tap executable name. group_name (str | None): Group name for the assets. If not provided, defaults to the tap executable name. + tags (Mapping[str, str] | None): Tags to apply to all assets. Returns: List[AssetsDefinition]: The assets. @@ -99,6 +101,7 @@ def run(context: OpExecutionContext) -> Generator[Output, None, None]: description=generate_description(runner=runner, stream=stream), key_prefix=key_prefix or dagster_safe_name(runner.tap.executable), code_version=runner.tap.hash_key, + tags=tags, ) for stream in runner.tap.catalog.streams if stream.is_selected From 7f797c2128ebffd7ca25772d1e4c7a7f68214f2a Mon Sep 17 00:00:00 2001 From: BernardWez Date: Wed, 17 Dec 2025 21:42:57 +0100 Subject: [PATCH 3/6] Add the ability to set the key_prefix and grou_name options for Dagster assets --- elx/extensions/dagster/assets.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/elx/extensions/dagster/assets.py b/elx/extensions/dagster/assets.py index 9062682..a887a6e 100644 --- a/elx/extensions/dagster/assets.py +++ b/elx/extensions/dagster/assets.py @@ -1,4 +1,5 @@ -from typing import Generator, Iterable, List, Mapping, Sequence +from collections.abc import Mapping +from typing import Generator, Iterable, List, Sequence from elx import Runner from dagster import ( AssetsDefinition, @@ -32,7 +33,7 @@ def load_assets( deps (Iterable[AssetKey | str | Sequence[str] | AssetsDefinition | SourceAsset | AssetDep] | None): Upstream assets upon which the assets depend. key_prefix (str | Sequence[str] | None): Key prefix for the assets. If not provided, defaults to the tap executable name. group_name (str | None): Group name for the assets. If not provided, defaults to the tap executable name. - tags (Mapping[str, str] | None): Tags to apply to all assets. + tags (Mapping[str, str] | None): Tags for filtering and organizing. These tags are not attached to runs of the asset. Returns: List[AssetsDefinition]: The assets. @@ -99,7 +100,7 @@ def run(context: OpExecutionContext) -> Generator[Output, None, None]: dagster_safe_name(stream.name): AssetOut( is_required=False, description=generate_description(runner=runner, stream=stream), - key_prefix=key_prefix or dagster_safe_name(runner.tap.executable), + key_prefix=key_prefix, code_version=runner.tap.hash_key, tags=tags, ) From f8fe80eda1b06a323359ebf66de0f2d9f9ac12b3 Mon Sep 17 00:00:00 2001 From: BernardWez Date: Thu, 18 Dec 2025 09:32:41 +0100 Subject: [PATCH 4/6] Remove tags parameter and key_prefix changes --- elx/extensions/dagster/assets.py | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/elx/extensions/dagster/assets.py b/elx/extensions/dagster/assets.py index a887a6e..17761f5 100644 --- a/elx/extensions/dagster/assets.py +++ b/elx/extensions/dagster/assets.py @@ -1,5 +1,4 @@ -from collections.abc import Mapping -from typing import Generator, Iterable, List, Sequence +from typing import Generator, Iterable, List, Mapping, Sequence from elx import Runner from dagster import ( AssetsDefinition, @@ -23,7 +22,6 @@ def load_assets( deps: Iterable[AssetKey | str | Sequence[str] | AssetsDefinition | SourceAsset | AssetDep] | None = None, key_prefix: str | Sequence[str] | None = None, group_name: str | None = None, - tags: Mapping[str, str] | None = None, ) -> List[AssetsDefinition]: """ Load the assets for a runner, each asset represents one tap target combination. @@ -33,7 +31,7 @@ def load_assets( deps (Iterable[AssetKey | str | Sequence[str] | AssetsDefinition | SourceAsset | AssetDep] | None): Upstream assets upon which the assets depend. key_prefix (str | Sequence[str] | None): Key prefix for the assets. If not provided, defaults to the tap executable name. group_name (str | None): Group name for the assets. If not provided, defaults to the tap executable name. - tags (Mapping[str, str] | None): Tags for filtering and organizing. These tags are not attached to runs of the asset. + tags (Mapping[str, str] | None): Tags to apply to all assets. Returns: List[AssetsDefinition]: The assets. @@ -100,9 +98,8 @@ def run(context: OpExecutionContext) -> Generator[Output, None, None]: dagster_safe_name(stream.name): AssetOut( is_required=False, description=generate_description(runner=runner, stream=stream), - key_prefix=key_prefix, + key_prefix=key_prefix or dagster_safe_name(runner.tap.executable), code_version=runner.tap.hash_key, - tags=tags, ) for stream in runner.tap.catalog.streams if stream.is_selected From 6dc032d2bcfd492447c257bf3649999648f9b799 Mon Sep 17 00:00:00 2001 From: BernardWez Date: Thu, 18 Dec 2025 09:33:31 +0100 Subject: [PATCH 5/6] Clean up doc string --- elx/extensions/dagster/assets.py | 1 - 1 file changed, 1 deletion(-) diff --git a/elx/extensions/dagster/assets.py b/elx/extensions/dagster/assets.py index 17761f5..c597ca0 100644 --- a/elx/extensions/dagster/assets.py +++ b/elx/extensions/dagster/assets.py @@ -31,7 +31,6 @@ def load_assets( deps (Iterable[AssetKey | str | Sequence[str] | AssetsDefinition | SourceAsset | AssetDep] | None): Upstream assets upon which the assets depend. key_prefix (str | Sequence[str] | None): Key prefix for the assets. If not provided, defaults to the tap executable name. group_name (str | None): Group name for the assets. If not provided, defaults to the tap executable name. - tags (Mapping[str, str] | None): Tags to apply to all assets. Returns: List[AssetsDefinition]: The assets. From 4e8c4a82b0b8c8c7c56244b1279636ad045dcab2 Mon Sep 17 00:00:00 2001 From: BernardWez Date: Thu, 18 Dec 2025 09:50:11 +0100 Subject: [PATCH 6/6] Move RecordCounter to its own file --- elx/__init__.py | 1 + elx/record_counter.py | 30 ++++++++++++++++++++++++++++++ elx/runner.py | 3 ++- elx/utils.py | 31 ------------------------------- tests/test_elx/test_utils.py | 3 ++- 5 files changed, 35 insertions(+), 33 deletions(-) create mode 100644 elx/record_counter.py diff --git a/elx/__init__.py b/elx/__init__.py index a33ed46..a065068 100644 --- a/elx/__init__.py +++ b/elx/__init__.py @@ -8,6 +8,7 @@ from elx.target import Target from elx.runner import Runner from elx.catalog import Catalog +from elx.record_counter import RecordCounter logger = logging.getLogger("pipx") logger.setLevel(logging.CRITICAL) diff --git a/elx/record_counter.py b/elx/record_counter.py new file mode 100644 index 0000000..037cbeb --- /dev/null +++ b/elx/record_counter.py @@ -0,0 +1,30 @@ +import json + +class RecordCounter: + """ + A line writer that counts RECORD messages per stream from Singer tap output. + """ + + def __init__(self): + self.counts: dict[str, int] = {} + + def writelines(self, line: str) -> None: + """ + Parse a Singer message line and count RECORD messages per stream. + + Args: + line: A JSON string containing a Singer message. + """ + try: + message = json.loads(line) + if message.get("type") == "RECORD": + stream = message.get("stream") + if stream: + self.counts[stream] = self.counts.get(stream, 0) + 1 + except json.JSONDecodeError: + pass + + def reset(self) -> None: + """Reset all counts to zero.""" + self.counts = {} + diff --git a/elx/runner.py b/elx/runner.py index db2b152..1d4ffc6 100644 --- a/elx/runner.py +++ b/elx/runner.py @@ -12,9 +12,10 @@ from elx.tap import Tap from elx.target import Target from elx import StateManager +from elx.record_counter import RecordCounter from dotenv import load_dotenv -from elx.utils import capture_subprocess_output, RecordCounter +from elx.utils import capture_subprocess_output logging.basicConfig(level=logging.INFO) diff --git a/elx/utils.py b/elx/utils.py index 5fb882d..a0c74c5 100644 --- a/elx/utils.py +++ b/elx/utils.py @@ -1,35 +1,4 @@ import asyncio -import json - - -class RecordCounter: - """ - A line writer that counts RECORD messages per stream from Singer tap output. - """ - - def __init__(self): - self.counts: dict[str, int] = {} - - def writelines(self, line: str) -> None: - """ - Parse a Singer message line and count RECORD messages per stream. - - Args: - line: A JSON string containing a Singer message. - """ - try: - message = json.loads(line) - if message.get("type") == "RECORD": - stream = message.get("stream") - if stream: - self.counts[stream] = self.counts.get(stream, 0) + 1 - except json.JSONDecodeError: - pass - - def reset(self) -> None: - """Reset all counts to zero.""" - self.counts = {} - def require_install(func): """ diff --git a/tests/test_elx/test_utils.py b/tests/test_elx/test_utils.py index 0965441..6dc787c 100644 --- a/tests/test_elx/test_utils.py +++ b/tests/test_elx/test_utils.py @@ -1,4 +1,5 @@ -from elx.utils import interpolate_in_config, RecordCounter +from elx import RecordCounter +from elx.utils import interpolate_in_config def test_interpolate_in_config():