Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ def test_publish_global_basic(self, kernel_manager_with_core: tuple[KernelManage
manager, kernel_id = kernel_manager_with_core

code = '''
artifact_id = flowfile.publish_global("kernel_test_model", {"accuracy": 0.95, "type": "classifier"})
artifact_id = ff_kernel.publish_global("kernel_test_model", {"accuracy": 0.95, "type": "classifier"})
print(f"Published artifact with ID: {artifact_id}")
'''
result: ExecuteResult = _run(
Expand All @@ -138,7 +138,7 @@ def test_publish_and_get_global_roundtrip(
# Publish an artifact
publish_code = '''
data = {"model_type": "random_forest", "n_estimators": 100, "accuracy": 0.92}
artifact_id = flowfile.publish_global("rf_model_test", data)
artifact_id = ff_kernel.publish_global("rf_model_test", data)
print(f"artifact_id={artifact_id}")
'''
result = _run(
Expand All @@ -157,7 +157,7 @@ def test_publish_and_get_global_roundtrip(

# Retrieve it
get_code = '''
retrieved = flowfile.get_global("rf_model_test")
retrieved = ff_kernel.get_global("rf_model_test")
assert retrieved["model_type"] == "random_forest", f"Got {retrieved}"
assert retrieved["n_estimators"] == 100
assert retrieved["accuracy"] == 0.92
Expand All @@ -184,7 +184,7 @@ def test_publish_global_with_metadata(
manager, kernel_id = kernel_manager_with_core

code = '''
artifact_id = flowfile.publish_global(
artifact_id = ff_kernel.publish_global(
"tagged_model",
{"weights": [1.0, 2.0, 3.0]},
description="A test model with weights",
Expand Down Expand Up @@ -214,8 +214,8 @@ def test_list_global_artifacts(

# Publish two artifacts
setup_code = '''
flowfile.publish_global("list_test_a", {"value": 1})
flowfile.publish_global("list_test_b", {"value": 2})
ff_kernel.publish_global("list_test_a", {"value": 1})
ff_kernel.publish_global("list_test_b", {"value": 2})
print("Published two artifacts")
'''
result = _run(
Expand All @@ -234,7 +234,7 @@ def test_list_global_artifacts(

# List artifacts
list_code = '''
artifacts = flowfile.list_global_artifacts()
artifacts = ff_kernel.list_global_artifacts()
names = [a["name"] for a in artifacts]
assert "list_test_a" in names, f"list_test_a not found in {names}"
assert "list_test_b" in names, f"list_test_b not found in {names}"
Expand Down Expand Up @@ -262,18 +262,18 @@ def test_delete_global_artifact(
# Publish then delete
code = '''
# Publish
flowfile.publish_global("to_delete", {"temp": True})
ff_kernel.publish_global("to_delete", {"temp": True})

# Verify it exists
obj = flowfile.get_global("to_delete")
obj = ff_kernel.get_global("to_delete")
assert obj["temp"] is True

# Delete
flowfile.delete_global_artifact("to_delete")
ff_kernel.delete_global_artifact("to_delete")

# Verify it's gone
try:
flowfile.get_global("to_delete")
ff_kernel.get_global("to_delete")
assert False, "Should have raised KeyError"
except KeyError:
print("Correctly deleted artifact")
Expand Down Expand Up @@ -301,7 +301,7 @@ def test_get_nonexistent_raises_key_error(

code = '''
try:
flowfile.get_global("definitely_does_not_exist_12345")
ff_kernel.get_global("definitely_does_not_exist_12345")
print("ERROR: Should have raised KeyError")
except KeyError as e:
print(f"Correctly raised KeyError: {e}")
Expand All @@ -328,20 +328,20 @@ def test_versioning_on_republish(

code = '''
# Publish v1
id1 = flowfile.publish_global("versioned_model", {"version": 1})
id1 = ff_kernel.publish_global("versioned_model", {"version": 1})

# Publish v2 (same name)
id2 = flowfile.publish_global("versioned_model", {"version": 2})
id2 = ff_kernel.publish_global("versioned_model", {"version": 2})

# Should be different artifact IDs (different versions)
assert id2 != id1, f"Expected different IDs, got {id1} and {id2}"

# Get latest (should be v2)
latest = flowfile.get_global("versioned_model")
latest = ff_kernel.get_global("versioned_model")
assert latest["version"] == 2, f"Expected version 2, got {latest}"

# Get specific version
v1 = flowfile.get_global("versioned_model", version=1)
v1 = ff_kernel.get_global("versioned_model", version=1)
assert v1["version"] == 1, f"Expected version 1, got {v1}"

print("Versioning works correctly!")
Expand Down Expand Up @@ -401,10 +401,10 @@ def test_publish_global_in_flow(
input_schema.NodePromise(flow_id=1, node_id=2, node_type="python_script")
)
code = '''
df = flowfile.read_input()
df = ff_kernel.read_input()
# Publish a global artifact (persists beyond flow run)
flowfile.publish_global("flow_published_model", {"trained_on": "flow_data"})
flowfile.publish_output(df)
ff_kernel.publish_global("flow_published_model", {"trained_on": "flow_data"})
ff_kernel.publish_output(df)
'''
graph.add_python_script(
input_schema.NodePythonScript(
Expand All @@ -426,7 +426,7 @@ def test_publish_global_in_flow(

# Verify the global artifact was published by retrieving it
verify_code = '''
model = flowfile.get_global("flow_published_model")
model = ff_kernel.get_global("flow_published_model")
assert model["trained_on"] == "flow_data"
print("Flow-published global artifact verified!")
'''
Expand Down Expand Up @@ -476,10 +476,10 @@ def test_use_global_artifact_across_flows(
input_schema.NodePromise(flow_id=1, node_id=2, node_type="python_script")
)
publish_code = '''
df = flowfile.read_input()
df = ff_kernel.read_input()
# Publish global artifact in Flow 1
flowfile.publish_global("cross_flow_artifact", {"source": "flow_1", "value": 42})
flowfile.publish_output(df)
ff_kernel.publish_global("cross_flow_artifact", {"source": "flow_1", "value": 42})
ff_kernel.publish_output(df)
'''
graph1.add_python_script(
input_schema.NodePythonScript(
Expand Down Expand Up @@ -520,15 +520,15 @@ def test_use_global_artifact_across_flows(
consume_code = '''
import polars as pl

df = flowfile.read_input().collect()
df = ff_kernel.read_input().collect()
# Read global artifact from Flow 1
artifact = flowfile.get_global("cross_flow_artifact")
artifact = ff_kernel.get_global("cross_flow_artifact")
assert artifact["source"] == "flow_1", f"Expected flow_1, got {artifact}"
assert artifact["value"] == 42

# Add artifact value to output
result = df.with_columns(pl.lit(artifact["value"]).alias("from_global"))
flowfile.publish_output(result)
ff_kernel.publish_output(result)
'''
graph2.add_python_script(
input_schema.NodePythonScript(
Expand Down Expand Up @@ -579,11 +579,11 @@ def test_publish_numpy_array(

# Publish a numpy array
arr = np.array([[1, 2, 3], [4, 5, 6], [7, 8, 9]])
artifact_id = flowfile.publish_global("numpy_matrix", arr)
artifact_id = ff_kernel.publish_global("numpy_matrix", arr)
print(f"Published numpy array, id={artifact_id}")

# Retrieve and verify
retrieved = flowfile.get_global("numpy_matrix")
retrieved = ff_kernel.get_global("numpy_matrix")
assert np.array_equal(retrieved, arr), f"Arrays don't match: {retrieved}"
print("Numpy array roundtrip successful!")
'''
Expand Down Expand Up @@ -617,11 +617,11 @@ def test_publish_polars_dataframe(
"name": ["Alice", "Bob", "Charlie"],
"score": [85.5, 92.0, 78.3],
})
artifact_id = flowfile.publish_global("polars_df", df)
artifact_id = ff_kernel.publish_global("polars_df", df)
print(f"Published Polars DataFrame, id={artifact_id}")

# Retrieve and verify
retrieved = flowfile.get_global("polars_df")
retrieved = ff_kernel.get_global("polars_df")
assert retrieved.equals(df), f"DataFrames don't match"
assert list(retrieved.columns) == ["id", "name", "score"]
print("Polars DataFrame roundtrip successful!")
Expand Down Expand Up @@ -665,11 +665,11 @@ def test_publish_nested_dict(
"target": "y",
},
}
artifact_id = flowfile.publish_global("model_config", config)
artifact_id = ff_kernel.publish_global("model_config", config)
print(f"Published nested config, id={artifact_id}")

# Retrieve and verify
retrieved = flowfile.get_global("model_config")
retrieved = ff_kernel.get_global("model_config")
assert retrieved["model"]["layers"] == [64, 128, 64]
assert retrieved["training"]["optimizer"]["lr"] == 0.001
print("Nested dict roundtrip successful!")
Expand Down Expand Up @@ -706,11 +706,11 @@ def predict(self, x):

# Publish custom object
model = ModelWrapper("linear", [1.0, 2.0, 3.0])
artifact_id = flowfile.publish_global("custom_model", model)
artifact_id = ff_kernel.publish_global("custom_model", model)
print(f"Published custom object, id={artifact_id}")

# Retrieve and verify
retrieved = flowfile.get_global("custom_model")
retrieved = ff_kernel.get_global("custom_model")
assert retrieved.name == "linear"
assert retrieved.weights == [1.0, 2.0, 3.0]
assert retrieved.predict([1, 1, 1]) == 6.0
Expand Down Expand Up @@ -748,7 +748,7 @@ def test_delete_nonexistent_raises_key_error(

code = '''
try:
flowfile.delete_global_artifact("nonexistent_artifact_xyz")
ff_kernel.delete_global_artifact("nonexistent_artifact_xyz")
print("ERROR: Should have raised KeyError")
except KeyError as e:
print(f"Correctly raised KeyError: {e}")
Expand All @@ -775,11 +775,11 @@ def test_get_specific_version_not_found(

code = '''
# Publish version 1
flowfile.publish_global("versioned_test", {"v": 1})
ff_kernel.publish_global("versioned_test", {"v": 1})

# Try to get version 999 (doesn't exist)
try:
flowfile.get_global("versioned_test", version=999)
ff_kernel.get_global("versioned_test", version=999)
print("ERROR: Should have raised KeyError")
except KeyError as e:
print(f"Correctly raised KeyError for missing version: {e}")
Expand Down
Loading