Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
161 changes: 161 additions & 0 deletions flowfile_core/flowfile_core/flowfile/flow_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -840,10 +840,24 @@ def add_user_defined_node(
):
"""Adds a user-defined custom node to the graph.

Routes to kernel execution when the node declares ``use_kernel=True``
and a ``kernel_id`` is provided at runtime; otherwise falls back to the
standard lazy-plan execution path.

Args:
custom_node: The custom node instance to add.
user_defined_node_settings: The settings for the user-defined node.
"""
kernel_id = user_defined_node_settings.kernel_id
if custom_node.use_kernel and kernel_id:
self._add_user_defined_node_kernel(custom_node, user_defined_node_settings, kernel_id)
else:
self._add_user_defined_node_lazy(custom_node, user_defined_node_settings)

def _add_user_defined_node_lazy(
self, custom_node: CustomNodeBase, user_defined_node_settings: input_schema.UserDefinedNode
):
"""Standard lazy-plan execution for custom nodes."""

def _func(*flow_data_engine: FlowDataEngine) -> FlowDataEngine | None:
user_id = user_defined_node_settings.user_id
Expand Down Expand Up @@ -872,6 +886,153 @@ def _func(*flow_data_engine: FlowDataEngine) -> FlowDataEngine | None:
node = self.get_node(user_defined_node_settings.node_id)
self.add_node_to_starting_list(node)

# ------------------------------------------------------------------
# Kernel-based execution for custom nodes
# ------------------------------------------------------------------

@staticmethod
def _build_kernel_code_for_custom_node(
custom_node: CustomNodeBase,
settings: Any | None,
) -> str:
"""Build the Python source sent to the kernel.

Injects ``import`` statements, a ``self`` namespace that mirrors
the node's settings schema (so ``self.settings_schema.section.field.value``
works identically to lazy mode), flattened convenience variables,
and finally the user's ``kernel_code``.
"""
lines: list[str] = ["import polars as pl", "import flowfile", ""]

# Inject settings as plain Python variables + a self namespace
if settings and isinstance(settings, dict):
lines.append("# Settings (injected from node configuration)")
lines.append("from types import SimpleNamespace as _NS")
lines.append("class _V:")
lines.append(" def __init__(self, v): self.value = v; self.secret_value = v")

section_parts: list[str] = []
for section_name, section_values in settings.items():
if isinstance(section_values, dict):
field_parts: list[str] = []
for key, value in section_values.items():
lines.append(f"{key} = {repr(value)}")
field_parts.append(f"{key}=_V({repr(value)})")
section_parts.append(f"{section_name}=_NS({', '.join(field_parts)})")

lines.append(f"self = _NS(settings_schema=_NS({', '.join(section_parts)}))")
lines.append("")

lines.append("# User code")
lines.append(custom_node.kernel_code)
return "\n".join(lines)

def _add_user_defined_node_kernel(
self,
custom_node: CustomNodeBase,
user_defined_node_settings: input_schema.UserDefinedNode,
kernel_id: str,
):
"""Execute a custom node inside a kernel container.

Follows the same pattern as ``add_python_script``: materialise inputs
to Parquet via the worker, send the code to the kernel for execution,
then read the output Parquet back.
"""

def _func(*flowfile_tables: FlowDataEngine) -> FlowDataEngine:
manager = get_kernel_manager()
node_id = user_defined_node_settings.node_id
flow_id = self.flow_id
node_logger = self.flow_logger.get_node_logger(node_id)

shared_base = manager.shared_volume_path
input_dir = os.path.join(shared_base, str(flow_id), str(node_id), "inputs")
output_dir = os.path.join(shared_base, str(flow_id), str(node_id), "outputs")
os.makedirs(input_dir, exist_ok=True)
os.makedirs(output_dir, exist_ok=True)
self.flow_logger.info(f"Prepared shared directories for kernel execution: {input_dir}, {output_dir}")

# Materialise inputs to Parquet
input_paths: dict[str, list[str]] = {}
main_paths: list[str] = []
for idx, ft in enumerate(flowfile_tables):
filename = f"main_{idx}.parquet"
local_path = os.path.join(input_dir, filename)
fetcher = ExternalDfFetcher(
flow_id=flow_id,
node_id=node_id,
lf=ft.data_frame,
wait_on_completion=True,
operation_type="write_parquet",
kwargs={"output_path": local_path},
)
if fetcher.has_error:
raise RuntimeError(f"Failed to write parquet for input {idx}: {fetcher.error_description}")
main_paths.append(manager.to_kernel_path(local_path))
input_paths["main"] = main_paths

# Build kernel code with injected settings
code = self._build_kernel_code_for_custom_node(custom_node, user_defined_node_settings.settings)

# Log callback URL
if manager._kernel_volume:
log_callback_url = f"http://flowfile-core:{SERVER_PORT}/raw_logs"
else:
log_callback_url = f"http://host.docker.internal:{SERVER_PORT}/raw_logs"

# Internal auth token
internal_token: str | None = None
try:
from flowfile_core.auth.jwt import get_internal_token

internal_token = get_internal_token()
except (ValueError, ImportError):
pass

reg_id = self._flow_settings.source_registration_id
request = ExecuteRequest(
node_id=node_id,
code=code,
input_paths=input_paths,
output_dir=manager.to_kernel_path(output_dir),
flow_id=flow_id,
source_registration_id=reg_id,
log_callback_url=log_callback_url,
internal_token=internal_token,
)
result = manager.execute_sync(kernel_id, request, self.flow_logger)

# Forward stdout / stderr
if result.stdout:
for line in result.stdout.strip().splitlines():
node_logger.info(f"[stdout] {line}")
if result.stderr:
for line in result.stderr.strip().splitlines():
node_logger.warning(f"[stderr] {line}")

if not result.success:
raise RuntimeError(f"Kernel execution failed: {result.error}")

# Read output
output_path = os.path.join(output_dir, "main.parquet")
if os.path.exists(output_path):
return FlowDataEngine(pl.scan_parquet(output_path))

# No output published – pass through first input
return flowfile_tables[0] if flowfile_tables else FlowDataEngine(pl.LazyFrame())

self.add_node_step(
node_id=user_defined_node_settings.node_id,
function=_func,
setting_input=user_defined_node_settings,
input_node_ids=user_defined_node_settings.depending_on_ids,
node_type=custom_node.item,
)
if custom_node.number_of_inputs == 0:
node = self.get_node(user_defined_node_settings.node_id)
self.add_node_to_starting_list(node)

@with_history_capture(HistoryActionType.UPDATE_SETTINGS)
def add_pivot(self, pivot_settings: input_schema.NodePivot):
"""Adds a pivot node to the graph.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,11 @@ class CustomNodeBase(BaseModel):
number_of_inputs: int = 1
number_of_outputs: int = 1

# Kernel execution configuration
use_kernel: bool = False
required_packages: list[str] = []
kernel_code: str = ""

# Display properties in the UI
node_group: str | None = "custom"
title: str | None = "Custom Node"
Expand Down Expand Up @@ -419,6 +424,9 @@ def get_frontend_schema(self) -> dict:
"node_group": self.node_group,
"title": self.title,
"intro": self.intro,
"use_kernel": self.use_kernel,
"required_packages": self.required_packages,
"kernel_code": self.kernel_code,
}

if self.settings_schema:
Expand Down
44 changes: 33 additions & 11 deletions flowfile_core/flowfile_core/routes/user_defined_components.py
Original file line number Diff line number Diff line change
Expand Up @@ -300,17 +300,39 @@ def get_custom_node(file_name: str) -> dict[str, Any]:
result["metadata"]["number_of_inputs"] = value
elif attr_name == "number_of_outputs":
result["metadata"]["number_of_outputs"] = value

# Extract process method
for item in node.body:
if isinstance(item, ast.FunctionDef) and item.name == "process":
# Get the source code of the process method
start_line = item.lineno - 1
end_line = item.end_lineno if hasattr(item, 'end_lineno') else start_line + 20
lines = content.split('\n')
process_lines = lines[start_line:end_line]
result["processCode"] = '\n'.join(process_lines)
break
elif attr_name == "use_kernel" and isinstance(value, bool):
result["metadata"]["use_kernel"] = value
elif attr_name == "kernel_code" and isinstance(value, str):
result["metadata"]["kernel_code"] = value

# Parse list values (e.g. required_packages)
if attr_name == "required_packages":
list_node = None
if isinstance(item, ast.AnnAssign) and item.value:
list_node = item.value
elif isinstance(item, ast.Assign):
list_node = item.value
if isinstance(list_node, ast.List):
packages = []
for elt in list_node.elts:
if isinstance(elt, ast.Constant) and isinstance(elt.value, str):
packages.append(elt.value)
result["metadata"]["required_packages"] = packages

# Extract process code
if result["metadata"].get("use_kernel"):
# Kernel mode: processCode is the kernel_code attribute
result["processCode"] = result["metadata"].get("kernel_code", "")
else:
# Lazy mode: extract the process method source
for item in node.body:
if isinstance(item, ast.FunctionDef) and item.name == "process":
start_line = item.lineno - 1
end_line = item.end_lineno if hasattr(item, 'end_lineno') else start_line + 20
lines = content.split('\n')
process_lines = lines[start_line:end_line]
result["processCode"] = '\n'.join(process_lines)
break

break

Expand Down
1 change: 1 addition & 0 deletions flowfile_core/flowfile_core/schemas/input_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -1168,3 +1168,4 @@ class UserDefinedNode(NodeMultiInput):
"""Settings for a node that contains the user defined node information"""

settings: Any
kernel_id: str | None = None
Loading