From 6ac40017338cb6d4cbb44013d5362ca23db59f04 Mon Sep 17 00:00:00 2001 From: Claude Date: Wed, 4 Feb 2026 17:06:58 +0000 Subject: [PATCH] Fix artifact duplication on node re-execution When a node re-runs (e.g., refresh data, collect schema), the record_published() method was appending to the published list without checking for existing artifacts with the same name. This caused duplicate entries to appear in the artifact tracker. Now record_published() removes any existing artifact with the same (node_id, kernel_id, artifact_name) before adding the new one, ensuring uniqueness and preventing duplicate artifacts on refresh. https://claude.ai/code/session_015b6B5uvhwP1jwC6Uk5ykGH --- .../flowfile_core/flowfile/artifacts.py | 14 +++++- .../tests/flowfile/test_artifact_context.py | 49 +++++++++++++++++++ 2 files changed, 62 insertions(+), 1 deletion(-) diff --git a/flowfile_core/flowfile_core/flowfile/artifacts.py b/flowfile_core/flowfile_core/flowfile/artifacts.py index 4199d988..2b0e8186 100644 --- a/flowfile_core/flowfile_core/flowfile/artifacts.py +++ b/flowfile_core/flowfile_core/flowfile/artifacts.py @@ -93,6 +93,9 @@ def record_published( ``artifacts`` may be a list of dicts (with at least a ``"name"`` key) or a plain list of artifact name strings. + If an artifact with the same name was already published by this node, + it is replaced (node_id + artifact_name are unique). + Returns the created :class:`ArtifactRef` objects. """ state = self._get_or_create_state(node_id) @@ -100,8 +103,17 @@ def record_published( for item in artifacts: if isinstance(item, str): item = {"name": item} + artifact_name = item["name"] + + # Remove any existing artifact with the same name from this node + # to ensure (node_id, artifact_name) uniqueness + state.published = [ + r for r in state.published + if not (r.name == artifact_name and r.kernel_id == kernel_id) + ] + ref = ArtifactRef( - name=item["name"], + name=artifact_name, source_node_id=node_id, kernel_id=kernel_id, type_name=item.get("type_name", ""), diff --git a/flowfile_core/tests/flowfile/test_artifact_context.py b/flowfile_core/tests/flowfile/test_artifact_context.py index b4e8a8e3..f5193637 100644 --- a/flowfile_core/tests/flowfile/test_artifact_context.py +++ b/flowfile_core/tests/flowfile/test_artifact_context.py @@ -113,6 +113,55 @@ def test_record_published_updates_kernel_artifacts(self): assert "model" in ka assert ka["model"].source_node_id == 1 + def test_record_published_overwrites_same_name_same_node(self): + """Publishing the same artifact name from the same node should overwrite, + not create duplicates. This ensures (node_id, artifact_name) uniqueness.""" + ctx = ArtifactContext() + # First publish + refs1 = ctx.record_published( + node_id=1, + kernel_id="k1", + artifacts=[{"name": "model", "type_name": "RF"}], + ) + assert len(ctx.get_published_by_node(1)) == 1 + assert ctx.get_published_by_node(1)[0].type_name == "RF" + + # Second publish of same artifact name from same node - should overwrite + refs2 = ctx.record_published( + node_id=1, + kernel_id="k1", + artifacts=[{"name": "model", "type_name": "XGBoost"}], + ) + # Should still only have 1 artifact, not 2 + assert len(ctx.get_published_by_node(1)) == 1 + assert ctx.get_published_by_node(1)[0].type_name == "XGBoost" + # Kernel artifacts should also be updated + ka = ctx.get_kernel_artifacts("k1") + assert ka["model"].type_name == "XGBoost" + + def test_record_published_allows_same_name_different_nodes(self): + """Different nodes can publish artifacts with the same name.""" + ctx = ArtifactContext() + ctx.record_published(1, "k1", [{"name": "model", "type_name": "RF"}]) + ctx.record_published(2, "k1", [{"name": "model", "type_name": "XGBoost"}]) + # Both nodes should have their own published list + assert len(ctx.get_published_by_node(1)) == 1 + assert len(ctx.get_published_by_node(2)) == 1 + # Kernel artifacts should have the latest (from node 2) + ka = ctx.get_kernel_artifacts("k1") + assert ka["model"].source_node_id == 2 + + def test_record_published_allows_same_name_different_kernels(self): + """Same node can publish same artifact name to different kernels.""" + ctx = ArtifactContext() + ctx.record_published(1, "k1", [{"name": "model", "type_name": "RF"}]) + ctx.record_published(1, "k2", [{"name": "model", "type_name": "XGBoost"}]) + # Node 1 should have 2 published artifacts (one per kernel) + assert len(ctx.get_published_by_node(1)) == 2 + # Each kernel should have its own version + assert ctx.get_kernel_artifacts("k1")["model"].type_name == "RF" + assert ctx.get_kernel_artifacts("k2")["model"].type_name == "XGBoost" + def test_record_consumed(self): ctx = ArtifactContext() ctx.record_consumed(5, ["model", "scaler"])