Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions src/teamster/core/CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,9 @@ Two dbt-specific `AutomationCondition` builders:
`newly_missing`, `code_version_changed`, or `execution_failed`. Intentionally
omits `any_deps_updated` since views are computed on read.
- `dbt_table_automation_condition()` β€” for TABLE models: also triggers on
upstream data changes, including through intermediate views via
`_build_any_ancestor_updated()` (recursive `any_deps_match` up to 5 levels)
upstream data changes via `AnyDepsUpdatedSinceSelf` (storage-id comparison,
immune to SinceCondition tick-collision bug dagster-io/dagster#33587). Looks
through intermediate views up to `_MAX_VIEW_DEPTH` levels.

**Unsynced badge behavior**: Dagster's "unsynced" indicator is driven by its
data versioning system, not the automation condition. When an upstream table
Expand Down
160 changes: 131 additions & 29 deletions src/teamster/core/automation_conditions.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,24 @@
from typing import AbstractSet

from dagster import AssetSelection, AutomationCondition
from dagster._core.asset_graph_view.entity_subset import EntitySubset
from dagster._core.definitions.asset_key import AssetKey
from dagster._core.definitions.assets.graph.base_asset_graph import (
BaseAssetGraph,
BaseAssetNode,
)
from dagster._core.definitions.declarative_automation.automation_context import (
AutomationContext,
)
from dagster._core.definitions.declarative_automation.operands.subset_automation_condition import (
SubsetAutomationCondition,
)
from dagster._core.definitions.declarative_automation.operators.dep_operators import (
DepsAutomationCondition,
)
from dagster._core.definitions.events import AssetKeyPartitionKey
from dagster._record import record
from dagster_shared.serdes import whitelist_for_serdes

_MAX_VIEW_DEPTH = 3
_VIEW_SELECTION = AssetSelection.tag("dagster/materialized", "view")
Expand Down Expand Up @@ -45,30 +55,120 @@ def _patched_get_dep_keys(
) - AssetSelection.all(include_sources=False)


def _build_any_ancestor_updated(
max_depth: int = _MAX_VIEW_DEPTH, view_selection: AssetSelection | None = None
) -> DepsAutomationCondition:
"""Detect updates in ancestor assets through intermediate views.
@whitelist_for_serdes
@record
class AnyDepsUpdatedSinceSelf(SubsetAutomationCondition):
"""True when any dep has a more recent materialization than this asset.

Replaces the tick-based ``any_deps_updated().since(newly_updated)`` pattern
with a direct storage-id comparison that is immune to the SinceCondition
tick-collision bug (dagster-io/dagster#33587).

For a chain like Table_A -> View_1 -> ... -> View_N -> Table_B,
this condition on Table_B detects when Table_A is updated, even though
the intermediate views were not re-materialized.
When both ``any_deps_updated`` (trigger) and ``newly_updated`` (reset) fire
within the same evaluation tick, ``SinceCondition`` unconditionally applies
the reset β€” even when the dep update has a more recent wall-clock timestamp
than the asset's completion. This condition sidesteps the issue entirely by
comparing ``storage_id`` ordering, which is monotonically increasing and
tick-independent.

If view_selection is provided, recursion only follows deps matching that
selection (i.e., views), stopping at the nearest table boundary. This
avoids exponential blowup when evaluating wide DAGs.
The condition is self-resetting: once the asset materialises its
``storage_id`` becomes the newest, so no ``.since()`` wrapper is needed.
``~in_progress`` on the outer condition prevents duplicate requests while a
run is in flight.

Supports looking *through* intermediate views (assets matching
``view_selection``) up to ``max_view_depth`` levels.
"""
condition = AutomationCondition.any_deps_updated()

for _ in range(max_depth - 1):
recurse = AutomationCondition.any_deps_match(condition)
ignore_selection: AssetSelection | None = None
view_selection: AssetSelection | None = None
max_view_depth: int = _MAX_VIEW_DEPTH

@property
def name(self) -> str:
return "any_deps_updated_since_self"

@property
def description(self) -> str:
return "any dep materialized more recently than this asset"

if view_selection is not None:
recurse = recurse.allow(view_selection)
def _gather_dep_keys(
self,
key: AssetKey,
asset_graph: BaseAssetGraph,
ignore_keys: AbstractSet[AssetKey],
view_keys: AbstractSet[AssetKey],
depth: int,
_visited: set[AssetKey] | None = None,
) -> AbstractSet[AssetKey]:
"""Gather dep keys, recursing through views up to *depth* levels."""
if _visited is None:
_visited = set()

condition = AutomationCondition.any_deps_updated() | recurse
dep_keys = set(asset_graph.get(key).parent_entity_keys) - ignore_keys
result = set(dep_keys)

return AutomationCondition.any_deps_match(condition)
if depth > 0 and view_keys:
for dep_key in dep_keys:
if dep_key in view_keys and dep_key not in _visited:
_visited.add(dep_key)
result |= self._gather_dep_keys(
dep_key,
asset_graph,
ignore_keys,
view_keys,
depth - 1,
_visited,
)

return result

async def compute_subset(self, context: AutomationContext) -> EntitySubset:
queryer = context.asset_graph_view.get_inner_queryer_for_back_compat()
asset_partition = AssetKeyPartitionKey(context.key)

my_storage_id = queryer.get_latest_materialization_or_observation_storage_id(
asset_partition
)
if my_storage_id is None:
# Never materialised β€” let newly_missing handle the first run.
return context.get_empty_subset()

asset_graph = context.asset_graph_view.asset_graph
ignore_keys = (
self.ignore_selection.resolve(asset_graph, allow_missing=True)
if self.ignore_selection is not None
else set()
)
view_keys = (
self.view_selection.resolve(asset_graph, allow_missing=True)
if self.view_selection is not None
else set()
)

all_dep_keys = self._gather_dep_keys(
context.key, asset_graph, ignore_keys, view_keys, self.max_view_depth
)

for dep_key in all_dep_keys:
# Check 1: dep materialised after us (storage-id comparison).
# This is the primary check and is tick-independent.
dep_record = queryer.get_latest_materialization_or_observation_record(
AssetKeyPartitionKey(dep_key), after_cursor=my_storage_id
)
if dep_record is not None:
return context.candidate_subset

# Check 2: dep will be requested in this evaluation tick.
# Matches the will_be_requested() semantics in Dagster's built-in
# any_deps_updated(), enabling same-tick propagation through chains
# like Table_A β†’ Table_B β†’ View β†’ Table_C (where Table_B hasn't
# materialised yet but the evaluator has already decided to request it).
dep_request = context.request_subsets_by_key.get(dep_key)
if dep_request is not None and not dep_request.is_empty:
return context.candidate_subset

return context.get_empty_subset()


def dbt_view_automation_condition(
Expand Down Expand Up @@ -126,18 +226,21 @@ def dbt_table_automation_condition(
physical data and must be re-materialized when upstream data changes.

Changes from eager():
- Added ancestor lookthrough: uses recursive any_deps_match (up to
_MAX_VIEW_DEPTH levels) to detect upstream table updates through
intermediate views that aren't re-materialized.
- Replaced any_deps_updated().since() with AnyDepsUpdatedSinceSelf: uses
direct storage-id comparison instead of the tick-based SinceCondition,
fixing a bug where dep updates are silently swallowed when they co-occur
with the downstream's newly_updated in the same evaluation tick
(dagster-io/dagster#33587).
- AnyDepsUpdatedSinceSelf also subsumes the ancestor lookthrough: it
gathers dep keys through intermediate views (up to _MAX_VIEW_DEPTH
levels) and checks storage-id ordering for all of them in one pass.
- Added code_version_changed: detects SQL code changes from dbt deploys.
Uses its own .since(newly_updated) so it only resets after successful
materialization, not on request β€” the signal isn't lost if the run
fails or never executes.
- Filtered any_deps_missing: ignores external source assets (which have
no materialization records) and configured ignore_selection to prevent
them from permanently blocking downstream automation.
- Filtered any_deps_updated and ancestor lookthrough: ignores configured
ignore_selection.
- Filtered any_deps_in_progress: ignores configured ignore_selection.
- Omitted initial_evaluation from .since() reset: it fires on the same
tick as newly_missing, suppressing it permanently since newly_missing
Expand All @@ -150,13 +253,12 @@ def dbt_table_automation_condition(
return (
AutomationCondition.in_latest_time_window()
& (
(
AutomationCondition.newly_missing()
| AutomationCondition.any_deps_updated().ignore(ignore_selection)
| _build_any_ancestor_updated(view_selection=_VIEW_SELECTION).ignore(
ignore_selection
)
).since(_since_last_handled)
AutomationCondition.newly_missing().since(_since_last_handled)
| AnyDepsUpdatedSinceSelf(
ignore_selection=ignore_selection,
view_selection=_VIEW_SELECTION,
max_view_depth=_MAX_VIEW_DEPTH,
)
| AutomationCondition.code_version_changed().since(
AutomationCondition.newly_updated()
)
Expand Down
76 changes: 76 additions & 0 deletions tests/test_automation_conditions.py
Original file line number Diff line number Diff line change
Expand Up @@ -796,6 +796,82 @@ def my_view_v2():
assert result.get_num_requested(AssetKey("my_view")) == 1


def test_table_requested_when_dep_updates_between_request_and_completion():
"""Regression test for dagster-io/dagster#33587.

AnyDepsUpdatedSinceSelf uses storage-id comparison instead of the tick-based
SinceCondition, so it correctly detects that a dep update (storage_id Y)
occurred after the downstream's last materialization (storage_id X < Y) β€”
even when both events fall within the same evaluation tick window.

Mirrors a production event observed around 3:00 AM on kipptaf:

2:57:06 AM sensor evaluates; sees earlier dep update β†’ requests kipptaf run
2:57:21 AM kipppaterson dep updates again (run still in progress)
2:59:55 AM kipptaf run completes (storage_id X)
3:00:26 AM kipppaterson dep updates a third time (storage_id Y > X)
3:00:44 AM sensor evaluates β†’ should request re-run

Previously, SinceCondition's tick-based state machine would apply
(prev βˆͺ trigger) βˆ’ reset = {} β†’ 0 requested, silently swallowing the
3:00:26 dep update. With AnyDepsUpdatedSinceSelf, the condition checks
storage_id(Y) > storage_id(X) and correctly returns 1 requested.

Step-by-step (test simulation):
1. Materialize all β†’ evaluate (tick 1) β†’ 0 requested
2. Materialize upstream β†’ evaluate (tick 2) β†’ 1 requested (run triggered)
3. [between ticks] Materialize upstream AGAIN (storage_id T3)
4. [between ticks] Materialize downstream (storage_id T4 > T3)
5. Evaluate (tick 3) β†’ 1 requested ← fixed by AnyDepsUpdatedSinceSelf
"""

@asset(tags=_TABLE_TAG)
def upstream():
return 1

@asset(
deps=[upstream],
automation_condition=_get_table_condition(),
tags=_TABLE_TAG,
)
def downstream():
return 2

instance = DagsterInstance.ephemeral()
all_assets = [upstream, downstream]
defs = Definitions(assets=all_assets)

# Tick 1: initial materialize β€” no requests needed
materialize(assets=all_assets, instance=instance)
result = evaluate_automation_conditions(defs=defs, instance=instance)
assert result.total_requested == 0

# Tick 2: upstream materializes β†’ downstream requested (run triggered)
materialize(assets=[upstream], instance=instance, selection=[upstream])
result = evaluate_automation_conditions(
defs=defs, instance=instance, cursor=result.cursor
)
assert result.get_num_requested(AssetKey("downstream")) == 1

# Between tick 2 and tick 3:
# Upstream materializes AGAIN (storage_id T3).
materialize(assets=[upstream], instance=instance, selection=[upstream])
# Downstream run completes (storage_id T4 > T3).
# The upstream's storage_id T3 is LOWER than the downstream's T4,
# so this particular update is "covered" by the downstream's run.
materialize(assets=[downstream], instance=instance, selection=[downstream])
# Upstream materializes a THIRD time (storage_id T5 > T4).
# This update is strictly newer than the downstream's completion.
materialize(assets=[upstream], instance=instance, selection=[upstream])

# Tick 3: AnyDepsUpdatedSinceSelf sees upstream storage_id T5 > downstream
# storage_id T4 β†’ correctly requests a re-run.
result = evaluate_automation_conditions(
defs=defs, instance=instance, cursor=result.cursor
)
assert result.get_num_requested(AssetKey("downstream")) == 1


class TestKipptafDbtAssets:
"""Integration tests using the real kipptaf dbt manifest.

Expand Down
Loading