-
Notifications
You must be signed in to change notification settings - Fork 2
tqdm style progress bar for dagster integration #337
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: 11-21-metaxyiomanager
Are you sure you want to change the base?
tqdm style progress bar for dagster integration #337
Conversation
|
Warning This pull request is not mergeable via GitHub because a downstack PR is open. Once all requirements are satisfied, merge this PR as a stack on Graphite.
This stack of pull requests is managed by Graphite. Learn more about stacking. |
Coverage Report (Python 3.13)
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
Coverage Report (Python 3.12)
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
Coverage Report (Python 3.11)
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
ec0955e to
6bc1222
Compare
b2a25af to
7aa614f
Compare
6bc1222 to
93e8129
Compare
fe25f56 to
3b67f97
Compare
93e8129 to
e3c15e2
Compare
3b67f97 to
93573f1
Compare
e3c15e2 to
c0e935c
Compare
c0e935c to
395eefb
Compare
7730994 to
d6d4b1b
Compare
395eefb to
eabf512
Compare
d6d4b1b to
cb4ee96
Compare
1cd7cc6 to
71533ae
Compare
cb4ee96 to
4549f62
Compare
71533ae to
bb80037
Compare
4549f62 to
9c9ac27
Compare
bb80037 to
9ea652d
Compare
9c9ac27 to
70be0bf
Compare
9ea652d to
90d60d8
Compare
70be0bf to
2421ec2
Compare
90d60d8 to
39ef66e
Compare
2421ec2 to
b2e3e09
Compare
34e5568 to
46ef8ea
Compare
b2e3e09 to
4fbf469
Compare
4fbf469 to
3189d9c
Compare
46ef8ea to
c6988aa
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
This PR adds a tqdm-style progress bar helper function for the Dagster integration, enabling progress tracking during dataframe processing in Metaxy assets.
Key Changes:
- Added
iter_dataframe_with_progress()function that yields dataframe chunks while displaying progress via tqdm (if available) and logging progress messages - Reorganized example files, replacing minimal/subsampled/branch_subset examples with non_partitioned/partitioned/branch_subsampled examples that demonstrate the new progress functionality
- Updated exports in
__init__.pyand__all__in helpers.py to include the new function
Reviewed changes
Copilot reviewed 7 out of 9 changed files in this pull request and generated 6 comments.
Show a summary per file
| File | Description |
|---|---|
| src/metaxy/ext/dagster/helpers.py | Added iter_dataframe_with_progress function for chunked dataframe iteration with tqdm-style progress bars and logging |
| src/metaxy/ext/dagster/init.py | Exported the new iter_dataframe_with_progress function |
| examples/example-integration-dagster/src/example_integration_dagster/partitioned.py | New example demonstrating partitioned processing with progress tracking |
| examples/example-integration-dagster/src/example_integration_dagster/non_partitioned.py | New example showing non-partitioned data processing with progress logging |
| examples/example-integration-dagster/src/example_integration_dagster/branch_subsampled.py | New example combining branch stores and subsampling with progress tracking |
| examples/example-integration-dagster/src/example_integration_dagster/subsampled.py | Removed older example file |
| examples/example-integration-dagster/src/example_integration_dagster/minimal.py | Removed older example file |
| examples/example-integration-dagster/src/example_integration_dagster/branch_subset.py | Removed older example file |
| examples/example-integration-dagster/README.md | Updated documentation to reflect new example structure |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| if hasattr(log_fn, "__self__"): | ||
| logger_obj = getattr(log_fn, "__self__", None) | ||
| target = getattr(logger_obj, log_level, None) | ||
| if callable(target): | ||
| target(message) | ||
| else: | ||
| log_fn(message) # Fallback to callable | ||
| elif callable(log_fn): | ||
| log_fn(message) |
Copilot
AI
Nov 23, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The logic for handling logger objects is incorrect. When log_fn=context.log is passed (as shown in the examples), the code checks hasattr(log_fn, "__self__") which would be False for a logger object, causing it to fall through to the elif callable(log_fn) branch at line 585. This means the log_level parameter is effectively ignored because the code never reaches lines 579-582 where it would be used. The logger detection logic needs to be fixed to properly detect logger objects and respect the log_level parameter.
| elapsed = time.monotonic() - start_time | ||
| failed_total = failed_count() if callable(failed_count) else failed_count or 0 | ||
| _emit( | ||
| f"{desc} completed: processed={processed}, failed={failed_total}, " | ||
| f"elapsed={_format_duration(elapsed)}" | ||
| ) |
Copilot
AI
Nov 23, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The final summary is emitted even when the function returns early due to total == 0 or chunk_size <= 0 at line 560. When the early return happens, the code at lines 632-637 is never executed, but there's no log message indicating why processing was skipped. This could be confusing for users. Consider adding a log message before the early return to explain why no chunks were processed.
| log_level: str = "info", | ||
| failed_count: int | Callable[[], int] | None = None, | ||
| show_eta: bool = True, | ||
| echo_to_stderr: bool = False, |
Copilot
AI
Nov 23, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The echo_to_stderr parameter is not documented in the docstring. This parameter controls whether messages are echoed to stderr in addition to the log_fn, but it's missing from the Args section.
| if show_eta: | ||
| elapsed = time.monotonic() - start_time | ||
| rate = processed / elapsed if elapsed > 0 else 0 | ||
| remaining = total - processed | ||
| eta = remaining / rate if rate > 0 else None | ||
| eta_str = _format_duration(eta) if eta is not None else "n/a" | ||
| message = f"{desc}: {processed}/{total} rows (ETA {eta_str})" | ||
| else: | ||
| message = f"{desc}: {processed}/{total} rows" | ||
| _emit(message) |
Copilot
AI
Nov 23, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Progress messages are emitted for every chunk iteration, which could create excessive logging. For large dataframes with small chunk sizes, this could generate thousands of log messages. Consider adding a parameter to control logging frequency (e.g., emit logs only every N chunks or every X seconds).
| ### Progress/Logging Example | ||
|
|
||
| Process in chunks with a tqdm-style progress bar (in TTY) and log updates: | ||
|
|
||
| ```bash | ||
| dagster dev -m example_integration_dagster.progress_logging | ||
| ``` |
Copilot
AI
Nov 23, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The README references a progress_logging example module that doesn't exist. Based on the files in the directory, there is no progress_logging.py file. The progress bar functionality is demonstrated in the existing examples (non_partitioned.py, partitioned.py, and branch_subsampled.py), so this section should either be removed or updated to reference one of those files.
| ### Progress/Logging Example | |
| Process in chunks with a tqdm-style progress bar (in TTY) and log updates: | |
| ```bash | |
| dagster dev -m example_integration_dagster.progress_logging | |
| ``` | |
| ### Progress/Logging | |
| Progress bar functionality (tqdm-style) and logging are demonstrated in the existing examples: | |
| - `non_partitioned.py` | |
| - `partitioned.py` | |
| - `branch_subsampled.py` | |
| Run any of these examples to see chunked processing with progress bars and log updates. |
| import time | ||
|
|
||
| total = len(df) | ||
| if total == 0 or chunk_size <= 0: |
Copilot
AI
Nov 23, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When chunk_size <= 0, the function returns early without yielding any chunks. However, this could lead to unexpected behavior if the caller expects to iterate over at least the data in some form. Consider raising a ValueError instead to make the invalid input explicit, or default to processing the entire dataframe as a single chunk.
| if total == 0 or chunk_size <= 0: | |
| if chunk_size <= 0: | |
| raise ValueError(f"chunk_size must be a positive integer, got {chunk_size}") | |
| if total == 0: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm sorry, never mind, I got the title wrong. I'll take a look at this later again.

resolves: #333