Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 13 additions & 1 deletion flowfile_core/flowfile_core/flowfile/artifacts.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,15 +93,27 @@ def record_published(
``artifacts`` may be a list of dicts (with at least a ``"name"`` key)
or a plain list of artifact name strings.

If an artifact with the same name was already published by this node,
it is replaced (node_id + artifact_name are unique).

Returns the created :class:`ArtifactRef` objects.
"""
state = self._get_or_create_state(node_id)
refs: list[ArtifactRef] = []
for item in artifacts:
if isinstance(item, str):
item = {"name": item}
artifact_name = item["name"]

# Remove any existing artifact with the same name from this node
# to ensure (node_id, artifact_name) uniqueness
state.published = [
r for r in state.published
if not (r.name == artifact_name and r.kernel_id == kernel_id)
]

ref = ArtifactRef(
name=item["name"],
name=artifact_name,
source_node_id=node_id,
kernel_id=kernel_id,
type_name=item.get("type_name", ""),
Expand Down
49 changes: 49 additions & 0 deletions flowfile_core/tests/flowfile/test_artifact_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,55 @@ def test_record_published_updates_kernel_artifacts(self):
assert "model" in ka
assert ka["model"].source_node_id == 1

def test_record_published_overwrites_same_name_same_node(self):
"""Publishing the same artifact name from the same node should overwrite,
not create duplicates. This ensures (node_id, artifact_name) uniqueness."""
ctx = ArtifactContext()
# First publish
refs1 = ctx.record_published(
node_id=1,
kernel_id="k1",
artifacts=[{"name": "model", "type_name": "RF"}],
)
assert len(ctx.get_published_by_node(1)) == 1
assert ctx.get_published_by_node(1)[0].type_name == "RF"

# Second publish of same artifact name from same node - should overwrite
refs2 = ctx.record_published(
node_id=1,
kernel_id="k1",
artifacts=[{"name": "model", "type_name": "XGBoost"}],
)
# Should still only have 1 artifact, not 2
assert len(ctx.get_published_by_node(1)) == 1
assert ctx.get_published_by_node(1)[0].type_name == "XGBoost"
# Kernel artifacts should also be updated
ka = ctx.get_kernel_artifacts("k1")
assert ka["model"].type_name == "XGBoost"

def test_record_published_allows_same_name_different_nodes(self):
"""Different nodes can publish artifacts with the same name."""
ctx = ArtifactContext()
ctx.record_published(1, "k1", [{"name": "model", "type_name": "RF"}])
ctx.record_published(2, "k1", [{"name": "model", "type_name": "XGBoost"}])
# Both nodes should have their own published list
assert len(ctx.get_published_by_node(1)) == 1
assert len(ctx.get_published_by_node(2)) == 1
# Kernel artifacts should have the latest (from node 2)
ka = ctx.get_kernel_artifacts("k1")
assert ka["model"].source_node_id == 2

def test_record_published_allows_same_name_different_kernels(self):
"""Same node can publish same artifact name to different kernels."""
ctx = ArtifactContext()
ctx.record_published(1, "k1", [{"name": "model", "type_name": "RF"}])
ctx.record_published(1, "k2", [{"name": "model", "type_name": "XGBoost"}])
# Node 1 should have 2 published artifacts (one per kernel)
assert len(ctx.get_published_by_node(1)) == 2
# Each kernel should have its own version
assert ctx.get_kernel_artifacts("k1")["model"].type_name == "RF"
assert ctx.get_kernel_artifacts("k2")["model"].type_name == "XGBoost"

def test_record_consumed(self):
ctx = ArtifactContext()
ctx.record_consumed(5, ["model", "scaler"])
Expand Down