Conversation
Provides the ability for users to specify pipeline- and file-level staging conditions. Pipeline conditions define a `check` method taking the pipeline input directory as its only argument. File conditions define a `check` method taking a file path as its only argument. All `check` methods return a `bool` indicating whether any files (pipeline-level) or a specific file (file-level) should be staged. There are five built-in, parameterized file-level condition types and one pipeline type that runs a arbitrary shell command.
- Add validate_callable_reference and import_callable utilities - Replace ScriptCondition with CallableFileCondition and CallablePipelineCondition using module:function references - Validate callables eagerly at config load time - Handle callable errors gracefully (log warning, return False) - Rename task -> file (field, method, union type) for clarity
- Replace boolean conditions with middleware pattern where each step transforms the candidate list: (list[Path], state) -> list[Path] - Add PipelineState dataclass exposing waiting/staged/completed/failed counts plus input_dir/output_dir paths - Built-in filters: min_size, max_size, min_age, filename_match, companion_file - Built-in limits: max_staged, max_batch, sort_by - CallableMiddleware for custom user functions - Config uses staging.steps list instead of conditions.file/directory - Delete conditions.py, add staging.py with 117 tests Closes #39
|
|
||
| kind: Literal["callable"] | ||
| function: str | ||
|
|
There was a problem hiding this comment.
My understanding is that CallableMiddleware only works if the function's module is importable (i.e., an installed package or on sys.path). Users cannot reference a function defined in a standalone Python script. Is this intentional, or should we support that? For example, could we allow something like:
staging:
steps:
- kind: callable
function: path/to/filters.py:my_funcThere was a problem hiding this comment.
Good question. Let me think about this a bit more.
There was a problem hiding this comment.
This PR can proceed without addressing this — we can tackle it in a separate issue. Let me know your preference!
- Change PipelineState to StagingContext - Fix staged files count error
| def _build_pipeline_state(self) -> StagingContext: | ||
| """Build current pipeline state for staging middleware.""" |
There was a problem hiding this comment.
Rename for consistency:
| def _build_pipeline_state(self) -> StagingContext: | |
| """Build current pipeline state for staging middleware.""" | |
| def _build_staging_context(self) -> StagingContext: | |
| """Build the current context for staging middleware.""" |
There was a problem hiding this comment.
This file needs to be moved into tests/unit/ per file reorg done in #32.
There was a problem hiding this comment.
This file needs to be merged into existing tests/unit/test_utils.py.
|
|
||
| @pytest.fixture | ||
| def mock_context(tmp_path: Path) -> StagingContext: | ||
| """Create a mock pipeline state for testing.""" |
There was a problem hiding this comment.
Rename for consistency:
| """Create a mock pipeline state for testing.""" | |
| """Create a mock staging context for testing.""" |
|
|
||
|
|
||
| class TestCallableMiddleware: | ||
| def test_calls_function_with_candidates_and_state( |
There was a problem hiding this comment.
Rename for consistency:
| def test_calls_function_with_candidates_and_state( | |
| def test_calls_function_with_candidates_and_context( |
| test_module = tmp_path / "test_staging_func.py" | ||
| test_module.write_text( | ||
| """ | ||
| def keep_first(candidates, state): |
There was a problem hiding this comment.
Rename for consistency:
| def keep_first(candidates, state): | |
| def keep_first(candidates, context): |
| test_module = tmp_path / "test_error_func.py" | ||
| test_module.write_text( | ||
| """ | ||
| def raise_error(candidates, state): |
There was a problem hiding this comment.
Rename for consistency:
| def raise_error(candidates, state): | |
| def raise_error(candidates, context): |
| def _build_pipeline_state(self) -> StagingContext: | ||
| """Build current pipeline state for staging middleware.""" | ||
| n_finished = sum(1 for f in self._finished_dir.iterdir() if f.is_file()) | ||
| n_failed = sum(len(e) for e in self._task_error_filenames.values()) |
There was a problem hiding this comment.
[Nit] Rename for consistency across codebase:
| n_failed = sum(len(e) for e in self._task_error_filenames.values()) | |
| n_failed = sum(len(errs) for errs in self._task_error_filenames.values()) |
|
|
||
| kind: Literal["callable"] | ||
| function: str | ||
|
|
There was a problem hiding this comment.
This PR can proceed without addressing this — we can tackle it in a separate issue. Let me know your preference!
Summary
(list[Path], state) -> list[Path]PipelineStatedataclass exposingwaiting/staged/completed/failedcounts plusinput_dir/output_dirpathsmin_size,max_size,min_age,filename_match,companion_filemax_staged,max_batch,sort_byCallableMiddlewarefor custom user functionsstaging.stepslist instead ofconditions.file/directoryExample config
Test plan
Closes #39