diff --git a/flowfile_core/tests/flowfile/test_global_artifacts_kernel_integration.py b/flowfile_core/tests/flowfile/test_global_artifacts_kernel_integration.py index c3b23556..5ceb8008 100644 --- a/flowfile_core/tests/flowfile/test_global_artifacts_kernel_integration.py +++ b/flowfile_core/tests/flowfile/test_global_artifacts_kernel_integration.py @@ -111,7 +111,7 @@ def test_publish_global_basic(self, kernel_manager_with_core: tuple[KernelManage manager, kernel_id = kernel_manager_with_core code = ''' -artifact_id = flowfile.publish_global("kernel_test_model", {"accuracy": 0.95, "type": "classifier"}) +artifact_id = ff_kernel.publish_global("kernel_test_model", {"accuracy": 0.95, "type": "classifier"}) print(f"Published artifact with ID: {artifact_id}") ''' result: ExecuteResult = _run( @@ -138,7 +138,7 @@ def test_publish_and_get_global_roundtrip( # Publish an artifact publish_code = ''' data = {"model_type": "random_forest", "n_estimators": 100, "accuracy": 0.92} -artifact_id = flowfile.publish_global("rf_model_test", data) +artifact_id = ff_kernel.publish_global("rf_model_test", data) print(f"artifact_id={artifact_id}") ''' result = _run( @@ -157,7 +157,7 @@ def test_publish_and_get_global_roundtrip( # Retrieve it get_code = ''' -retrieved = flowfile.get_global("rf_model_test") +retrieved = ff_kernel.get_global("rf_model_test") assert retrieved["model_type"] == "random_forest", f"Got {retrieved}" assert retrieved["n_estimators"] == 100 assert retrieved["accuracy"] == 0.92 @@ -184,7 +184,7 @@ def test_publish_global_with_metadata( manager, kernel_id = kernel_manager_with_core code = ''' -artifact_id = flowfile.publish_global( +artifact_id = ff_kernel.publish_global( "tagged_model", {"weights": [1.0, 2.0, 3.0]}, description="A test model with weights", @@ -214,8 +214,8 @@ def test_list_global_artifacts( # Publish two artifacts setup_code = ''' -flowfile.publish_global("list_test_a", {"value": 1}) -flowfile.publish_global("list_test_b", {"value": 2}) +ff_kernel.publish_global("list_test_a", {"value": 1}) +ff_kernel.publish_global("list_test_b", {"value": 2}) print("Published two artifacts") ''' result = _run( @@ -234,7 +234,7 @@ def test_list_global_artifacts( # List artifacts list_code = ''' -artifacts = flowfile.list_global_artifacts() +artifacts = ff_kernel.list_global_artifacts() names = [a["name"] for a in artifacts] assert "list_test_a" in names, f"list_test_a not found in {names}" assert "list_test_b" in names, f"list_test_b not found in {names}" @@ -262,18 +262,18 @@ def test_delete_global_artifact( # Publish then delete code = ''' # Publish -flowfile.publish_global("to_delete", {"temp": True}) +ff_kernel.publish_global("to_delete", {"temp": True}) # Verify it exists -obj = flowfile.get_global("to_delete") +obj = ff_kernel.get_global("to_delete") assert obj["temp"] is True # Delete -flowfile.delete_global_artifact("to_delete") +ff_kernel.delete_global_artifact("to_delete") # Verify it's gone try: - flowfile.get_global("to_delete") + ff_kernel.get_global("to_delete") assert False, "Should have raised KeyError" except KeyError: print("Correctly deleted artifact") @@ -301,7 +301,7 @@ def test_get_nonexistent_raises_key_error( code = ''' try: - flowfile.get_global("definitely_does_not_exist_12345") + ff_kernel.get_global("definitely_does_not_exist_12345") print("ERROR: Should have raised KeyError") except KeyError as e: print(f"Correctly raised KeyError: {e}") @@ -328,20 +328,20 @@ def test_versioning_on_republish( code = ''' # Publish v1 -id1 = flowfile.publish_global("versioned_model", {"version": 1}) +id1 = ff_kernel.publish_global("versioned_model", {"version": 1}) # Publish v2 (same name) -id2 = flowfile.publish_global("versioned_model", {"version": 2}) +id2 = ff_kernel.publish_global("versioned_model", {"version": 2}) # Should be different artifact IDs (different versions) assert id2 != id1, f"Expected different IDs, got {id1} and {id2}" # Get latest (should be v2) -latest = flowfile.get_global("versioned_model") +latest = ff_kernel.get_global("versioned_model") assert latest["version"] == 2, f"Expected version 2, got {latest}" # Get specific version -v1 = flowfile.get_global("versioned_model", version=1) +v1 = ff_kernel.get_global("versioned_model", version=1) assert v1["version"] == 1, f"Expected version 1, got {v1}" print("Versioning works correctly!") @@ -401,10 +401,10 @@ def test_publish_global_in_flow( input_schema.NodePromise(flow_id=1, node_id=2, node_type="python_script") ) code = ''' -df = flowfile.read_input() +df = ff_kernel.read_input() # Publish a global artifact (persists beyond flow run) -flowfile.publish_global("flow_published_model", {"trained_on": "flow_data"}) -flowfile.publish_output(df) +ff_kernel.publish_global("flow_published_model", {"trained_on": "flow_data"}) +ff_kernel.publish_output(df) ''' graph.add_python_script( input_schema.NodePythonScript( @@ -426,7 +426,7 @@ def test_publish_global_in_flow( # Verify the global artifact was published by retrieving it verify_code = ''' -model = flowfile.get_global("flow_published_model") +model = ff_kernel.get_global("flow_published_model") assert model["trained_on"] == "flow_data" print("Flow-published global artifact verified!") ''' @@ -476,10 +476,10 @@ def test_use_global_artifact_across_flows( input_schema.NodePromise(flow_id=1, node_id=2, node_type="python_script") ) publish_code = ''' -df = flowfile.read_input() +df = ff_kernel.read_input() # Publish global artifact in Flow 1 -flowfile.publish_global("cross_flow_artifact", {"source": "flow_1", "value": 42}) -flowfile.publish_output(df) +ff_kernel.publish_global("cross_flow_artifact", {"source": "flow_1", "value": 42}) +ff_kernel.publish_output(df) ''' graph1.add_python_script( input_schema.NodePythonScript( @@ -520,15 +520,15 @@ def test_use_global_artifact_across_flows( consume_code = ''' import polars as pl -df = flowfile.read_input().collect() +df = ff_kernel.read_input().collect() # Read global artifact from Flow 1 -artifact = flowfile.get_global("cross_flow_artifact") +artifact = ff_kernel.get_global("cross_flow_artifact") assert artifact["source"] == "flow_1", f"Expected flow_1, got {artifact}" assert artifact["value"] == 42 # Add artifact value to output result = df.with_columns(pl.lit(artifact["value"]).alias("from_global")) -flowfile.publish_output(result) +ff_kernel.publish_output(result) ''' graph2.add_python_script( input_schema.NodePythonScript( @@ -579,11 +579,11 @@ def test_publish_numpy_array( # Publish a numpy array arr = np.array([[1, 2, 3], [4, 5, 6], [7, 8, 9]]) -artifact_id = flowfile.publish_global("numpy_matrix", arr) +artifact_id = ff_kernel.publish_global("numpy_matrix", arr) print(f"Published numpy array, id={artifact_id}") # Retrieve and verify -retrieved = flowfile.get_global("numpy_matrix") +retrieved = ff_kernel.get_global("numpy_matrix") assert np.array_equal(retrieved, arr), f"Arrays don't match: {retrieved}" print("Numpy array roundtrip successful!") ''' @@ -617,11 +617,11 @@ def test_publish_polars_dataframe( "name": ["Alice", "Bob", "Charlie"], "score": [85.5, 92.0, 78.3], }) -artifact_id = flowfile.publish_global("polars_df", df) +artifact_id = ff_kernel.publish_global("polars_df", df) print(f"Published Polars DataFrame, id={artifact_id}") # Retrieve and verify -retrieved = flowfile.get_global("polars_df") +retrieved = ff_kernel.get_global("polars_df") assert retrieved.equals(df), f"DataFrames don't match" assert list(retrieved.columns) == ["id", "name", "score"] print("Polars DataFrame roundtrip successful!") @@ -665,11 +665,11 @@ def test_publish_nested_dict( "target": "y", }, } -artifact_id = flowfile.publish_global("model_config", config) +artifact_id = ff_kernel.publish_global("model_config", config) print(f"Published nested config, id={artifact_id}") # Retrieve and verify -retrieved = flowfile.get_global("model_config") +retrieved = ff_kernel.get_global("model_config") assert retrieved["model"]["layers"] == [64, 128, 64] assert retrieved["training"]["optimizer"]["lr"] == 0.001 print("Nested dict roundtrip successful!") @@ -706,11 +706,11 @@ def predict(self, x): # Publish custom object model = ModelWrapper("linear", [1.0, 2.0, 3.0]) -artifact_id = flowfile.publish_global("custom_model", model) +artifact_id = ff_kernel.publish_global("custom_model", model) print(f"Published custom object, id={artifact_id}") # Retrieve and verify -retrieved = flowfile.get_global("custom_model") +retrieved = ff_kernel.get_global("custom_model") assert retrieved.name == "linear" assert retrieved.weights == [1.0, 2.0, 3.0] assert retrieved.predict([1, 1, 1]) == 6.0 @@ -748,7 +748,7 @@ def test_delete_nonexistent_raises_key_error( code = ''' try: - flowfile.delete_global_artifact("nonexistent_artifact_xyz") + ff_kernel.delete_global_artifact("nonexistent_artifact_xyz") print("ERROR: Should have raised KeyError") except KeyError as e: print(f"Correctly raised KeyError: {e}") @@ -775,11 +775,11 @@ def test_get_specific_version_not_found( code = ''' # Publish version 1 -flowfile.publish_global("versioned_test", {"v": 1}) +ff_kernel.publish_global("versioned_test", {"v": 1}) # Try to get version 999 (doesn't exist) try: - flowfile.get_global("versioned_test", version=999) + ff_kernel.get_global("versioned_test", version=999) print("ERROR: Should have raised KeyError") except KeyError as e: print(f"Correctly raised KeyError for missing version: {e}") diff --git a/flowfile_core/tests/flowfile/test_kernel_integration.py b/flowfile_core/tests/flowfile/test_kernel_integration.py index 1334a154..91677f55 100644 --- a/flowfile_core/tests/flowfile/test_kernel_integration.py +++ b/flowfile_core/tests/flowfile/test_kernel_integration.py @@ -127,7 +127,7 @@ def test_publish_and_list_artifacts(self, kernel_manager: tuple[KernelManager, s kernel_id, ExecuteRequest( node_id=3, - code='flowfile.publish_artifact("my_dict", {"a": 1, "b": 2})', + code='ff_kernel.publish_artifact("my_dict", {"a": 1, "b": 2})', input_paths={}, output_dir="/shared/test_artifact", ), @@ -152,9 +152,9 @@ def test_read_and_write_parquet(self, kernel_manager: tuple[KernelManager, str]) code = """ import polars as pl -df = flowfile.read_input() +df = ff_kernel.read_input() df = df.with_columns((pl.col("x") * pl.col("y")).alias("product")) -flowfile.publish_output(df) +ff_kernel.publish_output(df) """ result: ExecuteResult = _run( @@ -196,11 +196,11 @@ def test_multiple_inputs(self, kernel_manager: tuple[KernelManager, str]): ) code = """ -inputs = flowfile.read_inputs() +inputs = ff_kernel.read_inputs() left = inputs["left"][0].collect() right = inputs["right"][0].collect() merged = left.join(right, on="id") -flowfile.publish_output(merged) +ff_kernel.publish_output(merged) """ result = _run( manager.execute( @@ -299,8 +299,8 @@ def test_python_script_passthrough(self, kernel_manager: tuple[KernelManager, st graph.add_node_promise(node_promise_2) code = """ -df = flowfile.read_input() -flowfile.publish_output(df) +df = ff_kernel.read_input() +ff_kernel.publish_output(df) """ graph.add_python_script( input_schema.NodePythonScript( @@ -360,9 +360,9 @@ def test_python_script_transform(self, kernel_manager: tuple[KernelManager, str] code = """ import polars as pl -df = flowfile.read_input().collect() +df = ff_kernel.read_input().collect() df = df.with_columns((pl.col("val") * 10).alias("val_x10")) -flowfile.publish_output(df) +ff_kernel.publish_output(df) """ graph.add_python_script( input_schema.NodePythonScript( @@ -464,9 +464,9 @@ def test_published_artifacts_recorded_in_context(self, kernel_manager: tuple[Ker graph.add_node_promise(node_promise_2) code = """ -df = flowfile.read_input() -flowfile.publish_artifact("my_model", {"accuracy": 0.95}) -flowfile.publish_output(df) +df = ff_kernel.read_input() +ff_kernel.publish_artifact("my_model", {"accuracy": 0.95}) +ff_kernel.publish_output(df) """ graph.add_python_script( input_schema.NodePythonScript( @@ -513,9 +513,9 @@ def test_available_artifacts_computed_before_execution(self, kernel_manager: tup node_promise_2 = input_schema.NodePromise(flow_id=1, node_id=2, node_type="python_script") graph.add_node_promise(node_promise_2) code_publish = """ -df = flowfile.read_input() -flowfile.publish_artifact("trained_model", {"type": "RF"}) -flowfile.publish_output(df) +df = ff_kernel.read_input() +ff_kernel.publish_artifact("trained_model", {"type": "RF"}) +ff_kernel.publish_output(df) """ graph.add_python_script( input_schema.NodePythonScript( @@ -531,9 +531,9 @@ def test_available_artifacts_computed_before_execution(self, kernel_manager: tup node_promise_3 = input_schema.NodePromise(flow_id=1, node_id=3, node_type="python_script") graph.add_node_promise(node_promise_3) code_consume = """ -df = flowfile.read_input() -model = flowfile.read_artifact("trained_model") -flowfile.publish_output(df) +df = ff_kernel.read_input() +model = ff_kernel.read_artifact("trained_model") +ff_kernel.publish_output(df) """ graph.add_python_script( input_schema.NodePythonScript( @@ -579,9 +579,9 @@ def test_artifacts_cleared_between_runs(self, kernel_manager: tuple[KernelManage graph.add_node_promise(node_promise_2) code = """ -df = flowfile.read_input() -flowfile.publish_artifact("run_artifact", [1, 2, 3]) -flowfile.publish_output(df) +df = ff_kernel.read_input() +ff_kernel.publish_artifact("run_artifact", [1, 2, 3]) +ff_kernel.publish_output(df) """ graph.add_python_script( input_schema.NodePythonScript( @@ -637,10 +637,10 @@ def test_multiple_artifacts_from_single_node(self, kernel_manager: tuple[KernelM graph.add_node_promise(node_promise_2) code = """ -df = flowfile.read_input() -flowfile.publish_artifact("model", {"type": "classifier"}) -flowfile.publish_artifact("encoder", {"type": "label_encoder"}) -flowfile.publish_output(df) +df = ff_kernel.read_input() +ff_kernel.publish_artifact("model", {"type": "classifier"}) +ff_kernel.publish_artifact("encoder", {"type": "label_encoder"}) +ff_kernel.publish_output(df) """ graph.add_python_script( input_schema.NodePythonScript( @@ -688,9 +688,9 @@ def test_artifact_context_to_dict_after_run(self, kernel_manager: tuple[KernelMa graph.add_node_promise(node_promise_2) code = """ -df = flowfile.read_input() -flowfile.publish_artifact("ctx_model", {"version": 1}) -flowfile.publish_output(df) +df = ff_kernel.read_input() +ff_kernel.publish_artifact("ctx_model", {"version": 1}) +ff_kernel.publish_output(df) """ graph.add_python_script( input_schema.NodePythonScript( @@ -750,12 +750,12 @@ def test_train_model_and_apply(self, kernel_manager: tuple[KernelManager, str]): import numpy as np import polars as pl -df = flowfile.read_input().collect() +df = ff_kernel.read_input().collect() X = np.column_stack([df["x1"].to_numpy(), df["x2"].to_numpy(), np.ones(len(df))]) y_vals = df["y"].to_numpy() coeffs = np.linalg.lstsq(X, y_vals, rcond=None)[0] -flowfile.publish_artifact("linear_model", {"coefficients": coeffs.tolist()}) -flowfile.publish_output(df) +ff_kernel.publish_artifact("linear_model", {"coefficients": coeffs.tolist()}) +ff_kernel.publish_output(df) """ graph.add_python_script( input_schema.NodePythonScript( @@ -774,13 +774,13 @@ def test_train_model_and_apply(self, kernel_manager: tuple[KernelManager, str]): import numpy as np import polars as pl -df = flowfile.read_input().collect() -model = flowfile.read_artifact("linear_model") +df = ff_kernel.read_input().collect() +model = ff_kernel.read_artifact("linear_model") coeffs = np.array(model["coefficients"]) X = np.column_stack([df["x1"].to_numpy(), df["x2"].to_numpy(), np.ones(len(df))]) predictions = X @ coeffs result = df.with_columns(pl.Series("predicted_y", predictions)) -flowfile.publish_output(result) +ff_kernel.publish_output(result) """ graph.add_python_script( input_schema.NodePythonScript( @@ -845,9 +845,9 @@ def test_publish_delete_republish_access(self, kernel_manager: tuple[KernelManag node_promise_2 = input_schema.NodePromise(flow_id=1, node_id=2, node_type="python_script") graph.add_node_promise(node_promise_2) code_a = """ -df = flowfile.read_input() -flowfile.publish_artifact("artifact_model", {"version": 1, "weights": [0.5]}) -flowfile.publish_output(df) +df = ff_kernel.read_input() +ff_kernel.publish_artifact("artifact_model", {"version": 1, "weights": [0.5]}) +ff_kernel.publish_output(df) """ graph.add_python_script( input_schema.NodePythonScript( @@ -863,11 +863,11 @@ def test_publish_delete_republish_access(self, kernel_manager: tuple[KernelManag node_promise_3 = input_schema.NodePromise(flow_id=1, node_id=3, node_type="python_script") graph.add_node_promise(node_promise_3) code_b = """ -df = flowfile.read_input() -model = flowfile.read_artifact("artifact_model") +df = ff_kernel.read_input() +model = ff_kernel.read_artifact("artifact_model") assert model["version"] == 1, f"Expected v1, got {model}" -flowfile.delete_artifact("artifact_model") -flowfile.publish_output(df) +ff_kernel.delete_artifact("artifact_model") +ff_kernel.publish_output(df) """ graph.add_python_script( input_schema.NodePythonScript( @@ -883,9 +883,9 @@ def test_publish_delete_republish_access(self, kernel_manager: tuple[KernelManag node_promise_4 = input_schema.NodePromise(flow_id=1, node_id=4, node_type="python_script") graph.add_node_promise(node_promise_4) code_c = """ -df = flowfile.read_input() -flowfile.publish_artifact("artifact_model", {"version": 2, "weights": [0.9]}) -flowfile.publish_output(df) +df = ff_kernel.read_input() +ff_kernel.publish_artifact("artifact_model", {"version": 2, "weights": [0.9]}) +ff_kernel.publish_output(df) """ graph.add_python_script( input_schema.NodePythonScript( @@ -901,10 +901,10 @@ def test_publish_delete_republish_access(self, kernel_manager: tuple[KernelManag node_promise_5 = input_schema.NodePromise(flow_id=1, node_id=5, node_type="python_script") graph.add_node_promise(node_promise_5) code_d = """ -df = flowfile.read_input() -model = flowfile.read_artifact("artifact_model") +df = ff_kernel.read_input() +model = ff_kernel.read_artifact("artifact_model") assert model["version"] == 2, f"Expected v2, got {model}" -flowfile.publish_output(df) +ff_kernel.publish_output(df) """ graph.add_python_script( input_schema.NodePythonScript( @@ -957,9 +957,9 @@ def test_duplicate_publish_fails(self, kernel_manager: tuple[KernelManager, str] node_promise_2 = input_schema.NodePromise(flow_id=1, node_id=2, node_type="python_script") graph.add_node_promise(node_promise_2) code_publish = """ -df = flowfile.read_input() -flowfile.publish_artifact("model", "v1") -flowfile.publish_output(df) +df = ff_kernel.read_input() +ff_kernel.publish_artifact("model", "v1") +ff_kernel.publish_output(df) """ graph.add_python_script( input_schema.NodePythonScript( @@ -975,9 +975,9 @@ def test_duplicate_publish_fails(self, kernel_manager: tuple[KernelManager, str] node_promise_3 = input_schema.NodePromise(flow_id=1, node_id=3, node_type="python_script") graph.add_node_promise(node_promise_3) code_dup = """ -df = flowfile.read_input() -flowfile.publish_artifact("model", "v2") -flowfile.publish_output(df) +df = ff_kernel.read_input() +ff_kernel.publish_artifact("model", "v2") +ff_kernel.publish_output(df) """ graph.add_python_script( input_schema.NodePythonScript( @@ -1040,10 +1040,10 @@ def test_multi_input_python_script(self, kernel_manager: tuple[KernelManager, st code = """ import polars as pl -df = flowfile.read_input().collect() +df = ff_kernel.read_input().collect() # Should contain all 4 rows from both inputs assert len(df) == 4, f"Expected 4 rows, got {len(df)}" -flowfile.publish_output(df) +ff_kernel.publish_output(df) """ graph.add_python_script( input_schema.NodePythonScript( @@ -1115,10 +1115,10 @@ def test_multi_input_read_inputs_named(self, kernel_manager: tuple[KernelManager code = """ import polars as pl -df = flowfile.read_first().collect() +df = ff_kernel.read_first().collect() # read_first should return only the first input (2 rows, not 4) assert len(df) == 2, f"Expected 2 rows from read_first, got {len(df)}" -flowfile.publish_output(df) +ff_kernel.publish_output(df) """ graph.add_python_script( input_schema.NodePythonScript( @@ -1204,12 +1204,12 @@ def test_artifact_survives_when_producer_skipped( import numpy as np import polars as pl -df = flowfile.read_input().collect() +df = ff_kernel.read_input().collect() X = np.column_stack([df["x1"].to_numpy(), df["x2"].to_numpy(), np.ones(len(df))]) y_vals = df["y"].to_numpy() coeffs = np.linalg.lstsq(X, y_vals, rcond=None)[0] -flowfile.publish_artifact("linear_model", {"coefficients": coeffs.tolist()}) -flowfile.publish_output(df) +ff_kernel.publish_artifact("linear_model", {"coefficients": coeffs.tolist()}) +ff_kernel.publish_output(df) """ graph.add_python_script( input_schema.NodePythonScript( @@ -1233,13 +1233,13 @@ def test_artifact_survives_when_producer_skipped( import numpy as np import polars as pl -df = flowfile.read_input().collect() -model = flowfile.read_artifact("linear_model") +df = ff_kernel.read_input().collect() +model = ff_kernel.read_artifact("linear_model") coeffs = np.array(model["coefficients"]) X = np.column_stack([df["x1"].to_numpy(), df["x2"].to_numpy(), np.ones(len(df))]) predictions = X @ coeffs result = df.with_columns(pl.Series("predicted_y", predictions)) -flowfile.publish_output(result) +ff_kernel.publish_output(result) """ graph.add_python_script( input_schema.NodePythonScript( @@ -1270,8 +1270,8 @@ def test_artifact_survives_when_producer_skipped( import numpy as np import polars as pl -df = flowfile.read_input().collect() -model = flowfile.read_artifact("linear_model") +df = ff_kernel.read_input().collect() +model = ff_kernel.read_artifact("linear_model") coeffs = np.array(model["coefficients"]) X = np.column_stack([df["x1"].to_numpy(), df["x2"].to_numpy(), np.ones(len(df))]) predictions = X @ coeffs @@ -1280,7 +1280,7 @@ def test_artifact_survives_when_producer_skipped( pl.Series("predicted_y", predictions), pl.Series("residual", residuals), ) -flowfile.publish_output(result) +ff_kernel.publish_output(result) """ graph.add_python_script( input_schema.NodePythonScript( @@ -1360,10 +1360,10 @@ def test_multiple_artifacts_survive_selective_clear( input_schema.NodePromise(flow_id=1, node_id=2, node_type="python_script"), ) producer_code = """ -df = flowfile.read_input() -flowfile.publish_artifact("model", {"type": "linear", "coeff": 2.0}) -flowfile.publish_artifact("scaler", {"mean": 20.0, "std": 10.0}) -flowfile.publish_output(df) +df = ff_kernel.read_input() +ff_kernel.publish_artifact("model", {"type": "linear", "coeff": 2.0}) +ff_kernel.publish_artifact("scaler", {"mean": 20.0, "std": 10.0}) +ff_kernel.publish_output(df) """ graph.add_python_script( input_schema.NodePythonScript( @@ -1384,13 +1384,13 @@ def test_multiple_artifacts_survive_selective_clear( ) consumer_code_v1 = """ import polars as pl -df = flowfile.read_input().collect() -model = flowfile.read_artifact("model") -scaler = flowfile.read_artifact("scaler") +df = ff_kernel.read_input().collect() +model = ff_kernel.read_artifact("model") +scaler = ff_kernel.read_artifact("scaler") result = df.with_columns( (pl.col("val") * model["coeff"]).alias("scaled"), ) -flowfile.publish_output(result) +ff_kernel.publish_output(result) """ graph.add_python_script( input_schema.NodePythonScript( @@ -1411,15 +1411,15 @@ def test_multiple_artifacts_survive_selective_clear( # Change the consumer's code — also use the scaler now consumer_code_v2 = """ import polars as pl -df = flowfile.read_input().collect() -model = flowfile.read_artifact("model") -scaler = flowfile.read_artifact("scaler") +df = ff_kernel.read_input().collect() +model = ff_kernel.read_artifact("model") +scaler = ff_kernel.read_artifact("scaler") normalized = (pl.col("val") - scaler["mean"]) / scaler["std"] result = df.with_columns( (pl.col("val") * model["coeff"]).alias("scaled"), normalized.alias("normalized"), ) -flowfile.publish_output(result) +ff_kernel.publish_output(result) """ graph.add_python_script( input_schema.NodePythonScript( @@ -1478,9 +1478,9 @@ def test_rerun_producer_clears_old_artifacts( input_schema.NodePromise(flow_id=1, node_id=2, node_type="python_script"), ) code_v1 = """ -df = flowfile.read_input() -flowfile.publish_artifact("model", {"version": 1}) -flowfile.publish_output(df) +df = ff_kernel.read_input() +ff_kernel.publish_artifact("model", {"version": 1}) +ff_kernel.publish_output(df) """ graph.add_python_script( input_schema.NodePythonScript( @@ -1500,10 +1500,10 @@ def test_rerun_producer_clears_old_artifacts( input_schema.NodePromise(flow_id=1, node_id=3, node_type="python_script"), ) consumer_code = """ -df = flowfile.read_input() -model = flowfile.read_artifact("model") +df = ff_kernel.read_input() +model = ff_kernel.read_artifact("model") print(f"model version: {model['version']}") -flowfile.publish_output(df) +ff_kernel.publish_output(df) """ graph.add_python_script( input_schema.NodePythonScript( @@ -1526,9 +1526,9 @@ def test_rerun_producer_clears_old_artifacts( # Change the PRODUCER (Node 2) — publish v2 of the artifact code_v2 = """ -df = flowfile.read_input() -flowfile.publish_artifact("model", {"version": 2}) -flowfile.publish_output(df) +df = ff_kernel.read_input() +ff_kernel.publish_artifact("model", {"version": 2}) +ff_kernel.publish_output(df) """ graph.add_python_script( input_schema.NodePythonScript( @@ -1587,9 +1587,9 @@ def test_deleted_artifact_producer_reruns_on_consumer_change( input_schema.NodePromise(flow_id=1, node_id=2, node_type="python_script"), ) producer_code = """ -df = flowfile.read_input() -flowfile.publish_artifact("linear_model", {"coefficients": [1.0, 2.0, 3.0]}) -flowfile.publish_output(df) +df = ff_kernel.read_input() +ff_kernel.publish_artifact("linear_model", {"coefficients": [1.0, 2.0, 3.0]}) +ff_kernel.publish_output(df) """ graph.add_python_script( input_schema.NodePythonScript( @@ -1610,12 +1610,12 @@ def test_deleted_artifact_producer_reruns_on_consumer_change( ) consumer_code_v1 = """ import polars as pl -df = flowfile.read_input().collect() -model = flowfile.read_artifact("linear_model") +df = ff_kernel.read_input().collect() +model = ff_kernel.read_artifact("linear_model") coeffs = model["coefficients"] result = df.with_columns(pl.lit(coeffs[0]).alias("c0")) -flowfile.publish_output(result) -flowfile.delete_artifact("linear_model") +ff_kernel.publish_output(result) +ff_kernel.delete_artifact("linear_model") """ graph.add_python_script( input_schema.NodePythonScript( @@ -1640,15 +1640,15 @@ def test_deleted_artifact_producer_reruns_on_consumer_change( # Change the consumer's code (node 3) — still deletes the artifact consumer_code_v2 = """ import polars as pl -df = flowfile.read_input().collect() -model = flowfile.read_artifact("linear_model") +df = ff_kernel.read_input().collect() +model = ff_kernel.read_artifact("linear_model") coeffs = model["coefficients"] result = df.with_columns( pl.lit(coeffs[0]).alias("c0"), pl.lit(coeffs[1]).alias("c1"), ) -flowfile.publish_output(result) -flowfile.delete_artifact("linear_model") +ff_kernel.publish_output(result) +ff_kernel.delete_artifact("linear_model") """ graph.add_python_script( input_schema.NodePythonScript( @@ -1791,8 +1791,8 @@ def test_python_script_node_with_stopped_kernel(self, kernel_manager: tuple[Kern graph.add_node_promise(node_promise_2) code = """ -df = flowfile.read_input() -flowfile.publish_output(df) +df = ff_kernel.read_input() +ff_kernel.publish_output(df) """ graph.add_python_script( input_schema.NodePythonScript( diff --git a/flowfile_frontend/src/renderer/app/components/nodes/node-types/elements/pythonScript/FlowfileApiHelp.vue b/flowfile_frontend/src/renderer/app/components/nodes/node-types/elements/pythonScript/FlowfileApiHelp.vue index cbb3b417..97edf19e 100644 --- a/flowfile_frontend/src/renderer/app/components/nodes/node-types/elements/pythonScript/FlowfileApiHelp.vue +++ b/flowfile_frontend/src/renderer/app/components/nodes/node-types/elements/pythonScript/FlowfileApiHelp.vue @@ -2,7 +2,7 @@