+ Convenience helpers for working with files on the shared volume. Files written here + are accessible from all FlowFile services and persist across kernel executions. +
+flowfile.get_shared_location("test_file.csv")
+ Returns the absolute path for a file in the shared directory. + Parent directories are created automatically.
+flowfile.get_shared_location("subdir/report.parquet")
+ Supports nested paths — subdirectories are created as needed.
+import polars as pl
+
+df = flowfile.read_input().collect()
+
+# Write to the shared directory (accessible from all services)
+df.write_csv(flowfile.get_shared_location("exports/output.csv"))
+df.write_parquet(flowfile.get_shared_location("exports/output.parquet"))
+ import polars as pl
diff --git a/flowfile_frontend/src/renderer/app/components/nodes/node-types/elements/pythonScript/flowfileCompletions.ts b/flowfile_frontend/src/renderer/app/components/nodes/node-types/elements/pythonScript/flowfileCompletions.ts
index 4f8e3f71..91a1cf14 100644
--- a/flowfile_frontend/src/renderer/app/components/nodes/node-types/elements/pythonScript/flowfileCompletions.ts
+++ b/flowfile_frontend/src/renderer/app/components/nodes/node-types/elements/pythonScript/flowfileCompletions.ts
@@ -90,6 +90,12 @@ export const flowfileCompletionVals = [
detail: "flowfile.list_global_artifacts(namespace_id?, tags?)",
apply: "list_global_artifacts()",
},
+ {label: "get_shared_location",
+ type: "function",
+ info: "Get the shared location to make objects available to other processes",
+ detail: "flowfile.get_shared_location()->str",
+ apply: "get_shared_location()",
+ },
{
label: "delete_global_artifact",
type: "function",
diff --git a/kernel_runtime/kernel_runtime/flowfile_client.py b/kernel_runtime/kernel_runtime/flowfile_client.py
index 0e8a2fc8..82b5001b 100644
--- a/kernel_runtime/kernel_runtime/flowfile_client.py
+++ b/kernel_runtime/kernel_runtime/flowfile_client.py
@@ -502,6 +502,38 @@ def delete_global_artifact(
resp.raise_for_status()
+# ===== File Utilities =====
+
+
+def get_shared_location(filename: str) -> str:
+ """Return the absolute path for a file in the shared directory.
+
+ The shared directory is accessible from all FlowFile services (core,
+ worker, kernel) and persists across kernel executions. Use this to
+ write files that should be readable by other services or that should
+ survive container restarts.
+
+ Parent directories are created automatically.
+
+ Args:
+ filename: Relative filename or path, e.g. ``"test_file.csv"`` or
+ ``"other_dir/test_file.csv"``.
+
+ Returns:
+ Absolute path as a string, ready to pass to file-writing functions.
+
+ Examples::
+
+ df.write_csv(flowfile.get_shared_location("test_file.csv"))
+ df.write_csv(flowfile.get_shared_location("reports/monthly.csv"))
+ """
+ base = os.environ.get("FLOWFILE_KERNEL_SHARED_DIR", "/shared")
+ full_path = os.path.join(base, "user_files", filename)
+ parent = os.path.dirname(full_path)
+ os.makedirs(parent, exist_ok=True)
+ return full_path
+
+
# ===== Logging APIs =====
diff --git a/kernel_runtime/tests/test_flowfile_client.py b/kernel_runtime/tests/test_flowfile_client.py
index efdefc5f..74d1d2ed 100644
--- a/kernel_runtime/tests/test_flowfile_client.py
+++ b/kernel_runtime/tests/test_flowfile_client.py
@@ -1,5 +1,6 @@
"""Tests for kernel_runtime.flowfile_client."""
+import os
from pathlib import Path
import polars as pl
@@ -366,3 +367,97 @@ def test_is_pil_image_without_import(self):
"""Without PIL installed, should return False."""
result = flowfile_client._is_pil_image("not an image")
assert result is False
+
+
+class TestSharedLocation:
+ """Tests for flowfile.shared_location()."""
+
+ def test_returns_path_under_user_files(self, tmp_dir: Path, monkeypatch: pytest.MonkeyPatch):
+ shared_dir = str(tmp_dir / "shared")
+ monkeypatch.setenv("FLOWFILE_KERNEL_SHARED_DIR", shared_dir)
+
+ result = flowfile_client.get_shared_location("test_file.csv")
+ assert result == os.path.join(shared_dir, "user_files", "test_file.csv")
+
+ def test_creates_parent_directories(self, tmp_dir: Path, monkeypatch: pytest.MonkeyPatch):
+ shared_dir = str(tmp_dir / "shared")
+ monkeypatch.setenv("FLOWFILE_KERNEL_SHARED_DIR", shared_dir)
+
+ result = flowfile_client.get_shared_location("other_dir/test_file.csv")
+ expected = os.path.join(shared_dir, "user_files", "other_dir", "test_file.csv")
+ assert result == expected
+ assert os.path.isdir(os.path.dirname(result))
+
+ def test_nested_subdirectories(self, tmp_dir: Path, monkeypatch: pytest.MonkeyPatch):
+ shared_dir = str(tmp_dir / "shared")
+ monkeypatch.setenv("FLOWFILE_KERNEL_SHARED_DIR", shared_dir)
+
+ result = flowfile_client.get_shared_location("a/b/c/deep_file.parquet")
+ expected = os.path.join(shared_dir, "user_files", "a", "b", "c", "deep_file.parquet")
+ assert result == expected
+ assert os.path.isdir(os.path.dirname(result))
+
+ def test_defaults_to_shared_when_env_not_set(self, tmp_dir: Path, monkeypatch: pytest.MonkeyPatch):
+ monkeypatch.delenv("FLOWFILE_KERNEL_SHARED_DIR", raising=False)
+ # Patch os.makedirs to avoid PermissionError on /shared in CI
+ created = []
+ monkeypatch.setattr(os, "makedirs", lambda p, exist_ok=False: created.append(p))
+
+ result = flowfile_client.get_shared_location("test.csv")
+ assert result == os.path.join("/shared", "user_files", "test.csv")
+ assert created == [os.path.join("/shared", "user_files")]
+
+ def test_file_is_writable(self, tmp_dir: Path, monkeypatch: pytest.MonkeyPatch):
+ shared_dir = str(tmp_dir / "shared")
+ monkeypatch.setenv("FLOWFILE_KERNEL_SHARED_DIR", shared_dir)
+
+ path = flowfile_client.get_shared_location("writable_test.csv")
+ with open(path, "w") as f:
+ f.write("col1,col2\n1,2\n")
+ assert os.path.isfile(path)
+
+ def test_does_not_require_execution_context(self, tmp_dir: Path, monkeypatch: pytest.MonkeyPatch):
+ """shared_location works without _set_context() being called."""
+ shared_dir = str(tmp_dir / "shared")
+ monkeypatch.setenv("FLOWFILE_KERNEL_SHARED_DIR", shared_dir)
+ flowfile_client._clear_context()
+
+ result = flowfile_client.get_shared_location("no_context.csv")
+ assert "no_context.csv" in result
+
+ def test_write_parquet_roundtrip(self, tmp_dir: Path, monkeypatch: pytest.MonkeyPatch):
+ """Write a Polars DataFrame to shared_location and read it back."""
+ monkeypatch.setenv("FLOWFILE_KERNEL_SHARED_DIR", str(tmp_dir / "shared"))
+
+ df = pl.DataFrame({"id": [1, 2, 3], "value": [10.5, 20.0, 30.1]})
+ path = flowfile_client.get_shared_location("output.parquet")
+ df.write_parquet(path)
+
+ result = pl.read_parquet(path)
+ assert result.shape == (3, 2)
+ assert result["id"].to_list() == [1, 2, 3]
+ assert result["value"].to_list() == [10.5, 20.0, 30.1]
+
+ def test_write_parquet_nested_path(self, tmp_dir: Path, monkeypatch: pytest.MonkeyPatch):
+ """Write parquet into a nested subdirectory via shared_location."""
+ monkeypatch.setenv("FLOWFILE_KERNEL_SHARED_DIR", str(tmp_dir / "shared"))
+
+ df = pl.DataFrame({"name": ["alice", "bob"], "score": [95, 87]})
+ path = flowfile_client.get_shared_location("exports/daily/scores.parquet")
+ df.write_parquet(path)
+
+ result = pl.read_parquet(path)
+ assert result["name"].to_list() == ["alice", "bob"]
+
+ def test_write_csv_and_parquet_same_dir(self, tmp_dir: Path, monkeypatch: pytest.MonkeyPatch):
+ """Write both CSV and Parquet to the same shared subdirectory."""
+ monkeypatch.setenv("FLOWFILE_KERNEL_SHARED_DIR", str(tmp_dir / "shared"))
+
+ df = pl.DataFrame({"x": [1, 2], "y": [3, 4]})
+ csv_path = flowfile_client.get_shared_location("reports/data.csv")
+ parquet_path = flowfile_client.get_shared_location("reports/data.parquet")
+ df.write_csv(csv_path)
+ df.write_parquet(parquet_path)
+
+ assert pl.read_csv(csv_path).shape == (2, 2)
+ assert pl.read_parquet(parquet_path)["x"].to_list() == [1, 2]