Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions flowfile_core/flowfile_core/kernel/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -406,6 +406,10 @@ def _build_kernel_env(self, kernel_id: str, kernel: KernelInfo) -> dict[str, str
# kernel, so no translation is required and the variable is omitted.
if not self._kernel_volume:
env["FLOWFILE_HOST_SHARED_DIR"] = self._shared_volume
# FLOWFILE_KERNEL_SHARED_DIR tells the kernel the absolute path of
# the shared directory *as seen from inside the kernel container*.
# Used by flowfile.get_shared_location() to resolve user file paths.
env["FLOWFILE_KERNEL_SHARED_DIR"] = self.to_kernel_path(self._shared_volume)
# Persistence settings from kernel config
env["KERNEL_ID"] = kernel_id
env["PERSISTENCE_ENABLED"] = "true" if kernel.persistence_enabled else "false"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,23 @@
</div>
</section>

<section class="api-section">
<h4>File Utilities</h4>
<p class="section-description">
Convenience helpers for working with files on the shared volume. Files written here
are accessible from all FlowFile services and persist across kernel executions.
</p>
<div class="api-item">
<code>flowfile.get_shared_location("test_file.csv")</code>
<p>Returns the absolute path for a file in the shared directory.
Parent directories are created automatically.</p>
</div>
<div class="api-item">
<code>flowfile.get_shared_location("subdir/report.parquet")</code>
<p>Supports nested paths — subdirectories are created as needed.</p>
</div>
</section>

<section class="api-section">
<h4>Common Patterns</h4>

Expand Down Expand Up @@ -184,6 +201,17 @@ flowfile.publish_global("rf_model", model,
flowfile.log_info("Model published to catalog")</code></pre>
</div>

<div class="pattern">
<h5>Write to Shared Directory</h5>
<pre><code>import polars as pl

df = flowfile.read_input().collect()

# Write to the shared directory (accessible from all services)
df.write_csv(flowfile.get_shared_location("exports/output.csv"))
df.write_parquet(flowfile.get_shared_location("exports/output.parquet"))</code></pre>
</div>

<div class="pattern">
<h5>Multiple Inputs</h5>
<pre><code>import polars as pl
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,12 @@ export const flowfileCompletionVals = [
detail: "flowfile.list_global_artifacts(namespace_id?, tags?)",
apply: "list_global_artifacts()",
},
{label: "get_shared_location",
type: "function",
info: "Get the shared location to make objects available to other processes",
detail: "flowfile.get_shared_location()->str",
apply: "get_shared_location()",
},
{
label: "delete_global_artifact",
type: "function",
Expand Down
32 changes: 32 additions & 0 deletions kernel_runtime/kernel_runtime/flowfile_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -502,6 +502,38 @@ def delete_global_artifact(
resp.raise_for_status()


# ===== File Utilities =====


def get_shared_location(filename: str) -> str:
"""Return the absolute path for a file in the shared directory.

The shared directory is accessible from all FlowFile services (core,
worker, kernel) and persists across kernel executions. Use this to
write files that should be readable by other services or that should
survive container restarts.

Parent directories are created automatically.

Args:
filename: Relative filename or path, e.g. ``"test_file.csv"`` or
``"other_dir/test_file.csv"``.

Returns:
Absolute path as a string, ready to pass to file-writing functions.

Examples::

df.write_csv(flowfile.get_shared_location("test_file.csv"))
df.write_csv(flowfile.get_shared_location("reports/monthly.csv"))
"""
base = os.environ.get("FLOWFILE_KERNEL_SHARED_DIR", "/shared")
full_path = os.path.join(base, "user_files", filename)
parent = os.path.dirname(full_path)
os.makedirs(parent, exist_ok=True)
return full_path


# ===== Logging APIs =====


Expand Down
95 changes: 95 additions & 0 deletions kernel_runtime/tests/test_flowfile_client.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""Tests for kernel_runtime.flowfile_client."""

import os
from pathlib import Path

import polars as pl
Expand Down Expand Up @@ -366,3 +367,97 @@ def test_is_pil_image_without_import(self):
"""Without PIL installed, should return False."""
result = flowfile_client._is_pil_image("not an image")
assert result is False


class TestSharedLocation:
"""Tests for flowfile.shared_location()."""

def test_returns_path_under_user_files(self, tmp_dir: Path, monkeypatch: pytest.MonkeyPatch):
shared_dir = str(tmp_dir / "shared")
monkeypatch.setenv("FLOWFILE_KERNEL_SHARED_DIR", shared_dir)

result = flowfile_client.get_shared_location("test_file.csv")
assert result == os.path.join(shared_dir, "user_files", "test_file.csv")

def test_creates_parent_directories(self, tmp_dir: Path, monkeypatch: pytest.MonkeyPatch):
shared_dir = str(tmp_dir / "shared")
monkeypatch.setenv("FLOWFILE_KERNEL_SHARED_DIR", shared_dir)

result = flowfile_client.get_shared_location("other_dir/test_file.csv")
expected = os.path.join(shared_dir, "user_files", "other_dir", "test_file.csv")
assert result == expected
assert os.path.isdir(os.path.dirname(result))

def test_nested_subdirectories(self, tmp_dir: Path, monkeypatch: pytest.MonkeyPatch):
shared_dir = str(tmp_dir / "shared")
monkeypatch.setenv("FLOWFILE_KERNEL_SHARED_DIR", shared_dir)

result = flowfile_client.get_shared_location("a/b/c/deep_file.parquet")
expected = os.path.join(shared_dir, "user_files", "a", "b", "c", "deep_file.parquet")
assert result == expected
assert os.path.isdir(os.path.dirname(result))

def test_defaults_to_shared_when_env_not_set(self, tmp_dir: Path, monkeypatch: pytest.MonkeyPatch):
monkeypatch.delenv("FLOWFILE_KERNEL_SHARED_DIR", raising=False)
# Patch os.makedirs to avoid PermissionError on /shared in CI
created = []
monkeypatch.setattr(os, "makedirs", lambda p, exist_ok=False: created.append(p))

result = flowfile_client.get_shared_location("test.csv")
assert result == os.path.join("/shared", "user_files", "test.csv")
assert created == [os.path.join("/shared", "user_files")]

def test_file_is_writable(self, tmp_dir: Path, monkeypatch: pytest.MonkeyPatch):
shared_dir = str(tmp_dir / "shared")
monkeypatch.setenv("FLOWFILE_KERNEL_SHARED_DIR", shared_dir)

path = flowfile_client.get_shared_location("writable_test.csv")
with open(path, "w") as f:
f.write("col1,col2\n1,2\n")
assert os.path.isfile(path)

def test_does_not_require_execution_context(self, tmp_dir: Path, monkeypatch: pytest.MonkeyPatch):
"""shared_location works without _set_context() being called."""
shared_dir = str(tmp_dir / "shared")
monkeypatch.setenv("FLOWFILE_KERNEL_SHARED_DIR", shared_dir)
flowfile_client._clear_context()

result = flowfile_client.get_shared_location("no_context.csv")
assert "no_context.csv" in result

def test_write_parquet_roundtrip(self, tmp_dir: Path, monkeypatch: pytest.MonkeyPatch):
"""Write a Polars DataFrame to shared_location and read it back."""
monkeypatch.setenv("FLOWFILE_KERNEL_SHARED_DIR", str(tmp_dir / "shared"))

df = pl.DataFrame({"id": [1, 2, 3], "value": [10.5, 20.0, 30.1]})
path = flowfile_client.get_shared_location("output.parquet")
df.write_parquet(path)

result = pl.read_parquet(path)
assert result.shape == (3, 2)
assert result["id"].to_list() == [1, 2, 3]
assert result["value"].to_list() == [10.5, 20.0, 30.1]

def test_write_parquet_nested_path(self, tmp_dir: Path, monkeypatch: pytest.MonkeyPatch):
"""Write parquet into a nested subdirectory via shared_location."""
monkeypatch.setenv("FLOWFILE_KERNEL_SHARED_DIR", str(tmp_dir / "shared"))

df = pl.DataFrame({"name": ["alice", "bob"], "score": [95, 87]})
path = flowfile_client.get_shared_location("exports/daily/scores.parquet")
df.write_parquet(path)

result = pl.read_parquet(path)
assert result["name"].to_list() == ["alice", "bob"]

def test_write_csv_and_parquet_same_dir(self, tmp_dir: Path, monkeypatch: pytest.MonkeyPatch):
"""Write both CSV and Parquet to the same shared subdirectory."""
monkeypatch.setenv("FLOWFILE_KERNEL_SHARED_DIR", str(tmp_dir / "shared"))

df = pl.DataFrame({"x": [1, 2], "y": [3, 4]})
csv_path = flowfile_client.get_shared_location("reports/data.csv")
parquet_path = flowfile_client.get_shared_location("reports/data.parquet")
df.write_csv(csv_path)
df.write_parquet(parquet_path)

assert pl.read_csv(csv_path).shape == (2, 2)
assert pl.read_parquet(parquet_path)["x"].to_list() == [1, 2]
Loading