Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions elx/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from elx.target import Target
from elx.runner import Runner
from elx.catalog import Catalog
from elx.record_counter import RecordCounter

logger = logging.getLogger("pipx")
logger.setLevel(logging.CRITICAL)
16 changes: 15 additions & 1 deletion elx/extensions/dagster/assets.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Generator, Iterable, List, Sequence
from typing import Generator, Iterable, List, Mapping, Sequence
from elx import Runner
from dagster import (
AssetsDefinition,
Expand Down Expand Up @@ -57,19 +57,33 @@ def run(context: OpExecutionContext) -> Generator[Output, None, None]:
Yields:
Generator[Output, None, None]: The names of the selected outputs.
"""
# Build a mapping from dagster-safe names back to original stream names
stream_name_mapping = {
dagster_safe_name(stream.name): stream.name
for stream in runner.tap.catalog.streams
if stream.is_selected
}

# Execute the runner and yield the selected outputs.
runner.run(
streams=list(context.selected_output_names),
logger=logger,
)

for context_output_name in context.selected_output_names:
# Get the original stream name to look up the row count
original_stream_name = stream_name_mapping.get(
context_output_name, context_output_name
)
row_count = runner.record_counts.get(original_stream_name, 0)

yield Output(
value=Nothing,
output_name=context_output_name,
metadata={
"state_path": f"{runner.state_manager.base_path}/{runner.state_file_name}",
"state": runner.load_state(),
"row_count": row_count,
},
)

Expand Down
30 changes: 30 additions & 0 deletions elx/record_counter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
import json

class RecordCounter:
"""
A line writer that counts RECORD messages per stream from Singer tap output.
"""

def __init__(self):
self.counts: dict[str, int] = {}

def writelines(self, line: str) -> None:
"""
Parse a Singer message line and count RECORD messages per stream.

Args:
line: A JSON string containing a Singer message.
"""
try:
message = json.loads(line)
if message.get("type") == "RECORD":
stream = message.get("stream")
if stream:
self.counts[stream] = self.counts.get(stream, 0) + 1
except json.JSONDecodeError:
pass

def reset(self) -> None:
"""Reset all counts to zero."""
self.counts = {}

10 changes: 9 additions & 1 deletion elx/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from elx.tap import Tap
from elx.target import Target
from elx import StateManager
from elx.record_counter import RecordCounter
from dotenv import load_dotenv

from elx.utils import capture_subprocess_output
Expand All @@ -30,6 +31,7 @@ def __init__(
self.tap = tap
self.target = target
self.state_manager = state_manager
self.record_counts: dict[str, int] = {}

@property
def name(self) -> str:
Expand Down Expand Up @@ -113,14 +115,17 @@ def writelines(self, line: str):
if self.logger:
self.logger.info(line)

# Create a record counter to track row counts per stream
record_counter = RecordCounter()

async with self.tap.process(
state=state,
streams=streams,
) as tap_process:
async with self.target.process(
tap_process=tap_process,
) as target_process:
tap_outputs = [target_process.stdin]
tap_outputs = [target_process.stdin, record_counter]
tap_stdout_future = asyncio.ensure_future(
# forward subproc stdout to tap_outputs (i.e. targets stdin)
capture_subprocess_output(tap_process.stdout, *tap_outputs),
Expand Down Expand Up @@ -239,6 +244,9 @@ def writelines(self, line: str):
elif target_code:
raise Exception("Target failed")

# Store the record counts for access after the run
self.record_counts = record_counter.counts


if __name__ == "__main__":
tap = Tap(
Expand Down
1 change: 0 additions & 1 deletion elx/utils.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import asyncio


def require_install(func):
"""
Decorator to check if the executable is installed.
Expand Down
20 changes: 20 additions & 0 deletions tests/test_elx/test_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,3 +40,23 @@ def test_config_interpolation_target_values(tap: Tap):
runner = Runner(tap, target)

assert runner.target.config["tap_name"] == "tap_mock_fixture"


def test_record_counts(runner: Runner):
"""
Test that record counts are tracked per stream after a run.
"""
# Run the extract-load pipeline
runner.run()

# Assert that record_counts is populated
assert runner.record_counts is not None
assert isinstance(runner.record_counts, dict)

# Assert that at least one stream has records
assert len(runner.record_counts) > 0

# Assert all counts are positive integers
for stream_name, count in runner.record_counts.items():
assert isinstance(count, int)
assert count > 0
54 changes: 54 additions & 0 deletions tests/test_elx/test_utils.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from elx import RecordCounter
from elx.utils import interpolate_in_config


Expand All @@ -22,3 +23,56 @@ def test_interpolate_in_config():
)
# Assert that the interpolated config is correct
assert interpolated_config == expected_config


def test_record_counter_counts_records():
"""
Test that RecordCounter correctly counts RECORD messages per stream.
"""
counter = RecordCounter()

# Simulate Singer RECORD messages
counter.writelines('{"type": "RECORD", "stream": "users", "record": {"id": 1}}')
counter.writelines('{"type": "RECORD", "stream": "users", "record": {"id": 2}}')
counter.writelines('{"type": "RECORD", "stream": "orders", "record": {"id": 1}}')

assert counter.counts == {"users": 2, "orders": 1}


def test_record_counter_ignores_non_record_messages():
"""
Test that RecordCounter ignores non-RECORD messages.
"""
counter = RecordCounter()

# Simulate different Singer message types
counter.writelines('{"type": "SCHEMA", "stream": "users", "schema": {}}')
counter.writelines('{"type": "STATE", "value": {"bookmarks": {}}}')
counter.writelines('{"type": "RECORD", "stream": "users", "record": {"id": 1}}')

assert counter.counts == {"users": 1}


def test_record_counter_handles_invalid_json():
"""
Test that RecordCounter gracefully handles invalid JSON.
"""
counter = RecordCounter()

counter.writelines("not valid json")
counter.writelines('{"type": "RECORD", "stream": "users", "record": {"id": 1}}')

assert counter.counts == {"users": 1}


def test_record_counter_reset():
"""
Test that RecordCounter.reset() clears all counts.
"""
counter = RecordCounter()

counter.writelines('{"type": "RECORD", "stream": "users", "record": {"id": 1}}')
assert counter.counts == {"users": 1}

counter.reset()
assert counter.counts == {}
Loading