diff --git a/flowfile_core/flowfile_core/flowfile/flow_graph.py b/flowfile_core/flowfile_core/flowfile/flow_graph.py index 641b28cad..823504920 100644 --- a/flowfile_core/flowfile_core/flowfile/flow_graph.py +++ b/flowfile_core/flowfile_core/flowfile/flow_graph.py @@ -840,10 +840,24 @@ def add_user_defined_node( ): """Adds a user-defined custom node to the graph. + Routes to kernel execution when the node declares ``use_kernel=True`` + and a ``kernel_id`` is provided at runtime; otherwise falls back to the + standard lazy-plan execution path. + Args: custom_node: The custom node instance to add. user_defined_node_settings: The settings for the user-defined node. """ + kernel_id = user_defined_node_settings.kernel_id + if custom_node.use_kernel and kernel_id: + self._add_user_defined_node_kernel(custom_node, user_defined_node_settings, kernel_id) + else: + self._add_user_defined_node_lazy(custom_node, user_defined_node_settings) + + def _add_user_defined_node_lazy( + self, custom_node: CustomNodeBase, user_defined_node_settings: input_schema.UserDefinedNode + ): + """Standard lazy-plan execution for custom nodes.""" def _func(*flow_data_engine: FlowDataEngine) -> FlowDataEngine | None: user_id = user_defined_node_settings.user_id @@ -872,6 +886,153 @@ def _func(*flow_data_engine: FlowDataEngine) -> FlowDataEngine | None: node = self.get_node(user_defined_node_settings.node_id) self.add_node_to_starting_list(node) + # ------------------------------------------------------------------ + # Kernel-based execution for custom nodes + # ------------------------------------------------------------------ + + @staticmethod + def _build_kernel_code_for_custom_node( + custom_node: CustomNodeBase, + settings: Any | None, + ) -> str: + """Build the Python source sent to the kernel. + + Injects ``import`` statements, a ``self`` namespace that mirrors + the node's settings schema (so ``self.settings_schema.section.field.value`` + works identically to lazy mode), flattened convenience variables, + and finally the user's ``kernel_code``. + """ + lines: list[str] = ["import polars as pl", "import flowfile", ""] + + # Inject settings as plain Python variables + a self namespace + if settings and isinstance(settings, dict): + lines.append("# Settings (injected from node configuration)") + lines.append("from types import SimpleNamespace as _NS") + lines.append("class _V:") + lines.append(" def __init__(self, v): self.value = v; self.secret_value = v") + + section_parts: list[str] = [] + for section_name, section_values in settings.items(): + if isinstance(section_values, dict): + field_parts: list[str] = [] + for key, value in section_values.items(): + lines.append(f"{key} = {repr(value)}") + field_parts.append(f"{key}=_V({repr(value)})") + section_parts.append(f"{section_name}=_NS({', '.join(field_parts)})") + + lines.append(f"self = _NS(settings_schema=_NS({', '.join(section_parts)}))") + lines.append("") + + lines.append("# User code") + lines.append(custom_node.kernel_code) + return "\n".join(lines) + + def _add_user_defined_node_kernel( + self, + custom_node: CustomNodeBase, + user_defined_node_settings: input_schema.UserDefinedNode, + kernel_id: str, + ): + """Execute a custom node inside a kernel container. + + Follows the same pattern as ``add_python_script``: materialise inputs + to Parquet via the worker, send the code to the kernel for execution, + then read the output Parquet back. + """ + + def _func(*flowfile_tables: FlowDataEngine) -> FlowDataEngine: + manager = get_kernel_manager() + node_id = user_defined_node_settings.node_id + flow_id = self.flow_id + node_logger = self.flow_logger.get_node_logger(node_id) + + shared_base = manager.shared_volume_path + input_dir = os.path.join(shared_base, str(flow_id), str(node_id), "inputs") + output_dir = os.path.join(shared_base, str(flow_id), str(node_id), "outputs") + os.makedirs(input_dir, exist_ok=True) + os.makedirs(output_dir, exist_ok=True) + self.flow_logger.info(f"Prepared shared directories for kernel execution: {input_dir}, {output_dir}") + + # Materialise inputs to Parquet + input_paths: dict[str, list[str]] = {} + main_paths: list[str] = [] + for idx, ft in enumerate(flowfile_tables): + filename = f"main_{idx}.parquet" + local_path = os.path.join(input_dir, filename) + fetcher = ExternalDfFetcher( + flow_id=flow_id, + node_id=node_id, + lf=ft.data_frame, + wait_on_completion=True, + operation_type="write_parquet", + kwargs={"output_path": local_path}, + ) + if fetcher.has_error: + raise RuntimeError(f"Failed to write parquet for input {idx}: {fetcher.error_description}") + main_paths.append(manager.to_kernel_path(local_path)) + input_paths["main"] = main_paths + + # Build kernel code with injected settings + code = self._build_kernel_code_for_custom_node(custom_node, user_defined_node_settings.settings) + + # Log callback URL + if manager._kernel_volume: + log_callback_url = f"http://flowfile-core:{SERVER_PORT}/raw_logs" + else: + log_callback_url = f"http://host.docker.internal:{SERVER_PORT}/raw_logs" + + # Internal auth token + internal_token: str | None = None + try: + from flowfile_core.auth.jwt import get_internal_token + + internal_token = get_internal_token() + except (ValueError, ImportError): + pass + + reg_id = self._flow_settings.source_registration_id + request = ExecuteRequest( + node_id=node_id, + code=code, + input_paths=input_paths, + output_dir=manager.to_kernel_path(output_dir), + flow_id=flow_id, + source_registration_id=reg_id, + log_callback_url=log_callback_url, + internal_token=internal_token, + ) + result = manager.execute_sync(kernel_id, request, self.flow_logger) + + # Forward stdout / stderr + if result.stdout: + for line in result.stdout.strip().splitlines(): + node_logger.info(f"[stdout] {line}") + if result.stderr: + for line in result.stderr.strip().splitlines(): + node_logger.warning(f"[stderr] {line}") + + if not result.success: + raise RuntimeError(f"Kernel execution failed: {result.error}") + + # Read output + output_path = os.path.join(output_dir, "main.parquet") + if os.path.exists(output_path): + return FlowDataEngine(pl.scan_parquet(output_path)) + + # No output published – pass through first input + return flowfile_tables[0] if flowfile_tables else FlowDataEngine(pl.LazyFrame()) + + self.add_node_step( + node_id=user_defined_node_settings.node_id, + function=_func, + setting_input=user_defined_node_settings, + input_node_ids=user_defined_node_settings.depending_on_ids, + node_type=custom_node.item, + ) + if custom_node.number_of_inputs == 0: + node = self.get_node(user_defined_node_settings.node_id) + self.add_node_to_starting_list(node) + @with_history_capture(HistoryActionType.UPDATE_SETTINGS) def add_pivot(self, pivot_settings: input_schema.NodePivot): """Adds a pivot node to the graph. diff --git a/flowfile_core/flowfile_core/flowfile/node_designer/custom_node.py b/flowfile_core/flowfile_core/flowfile/node_designer/custom_node.py index e8e0ff565..2362a318b 100644 --- a/flowfile_core/flowfile_core/flowfile/node_designer/custom_node.py +++ b/flowfile_core/flowfile_core/flowfile/node_designer/custom_node.py @@ -336,6 +336,11 @@ class CustomNodeBase(BaseModel): number_of_inputs: int = 1 number_of_outputs: int = 1 + # Kernel execution configuration + use_kernel: bool = False + required_packages: list[str] = [] + kernel_code: str = "" + # Display properties in the UI node_group: str | None = "custom" title: str | None = "Custom Node" @@ -419,6 +424,9 @@ def get_frontend_schema(self) -> dict: "node_group": self.node_group, "title": self.title, "intro": self.intro, + "use_kernel": self.use_kernel, + "required_packages": self.required_packages, + "kernel_code": self.kernel_code, } if self.settings_schema: diff --git a/flowfile_core/flowfile_core/routes/user_defined_components.py b/flowfile_core/flowfile_core/routes/user_defined_components.py index bc67addaa..6433bab4d 100644 --- a/flowfile_core/flowfile_core/routes/user_defined_components.py +++ b/flowfile_core/flowfile_core/routes/user_defined_components.py @@ -300,17 +300,39 @@ def get_custom_node(file_name: str) -> dict[str, Any]: result["metadata"]["number_of_inputs"] = value elif attr_name == "number_of_outputs": result["metadata"]["number_of_outputs"] = value - - # Extract process method - for item in node.body: - if isinstance(item, ast.FunctionDef) and item.name == "process": - # Get the source code of the process method - start_line = item.lineno - 1 - end_line = item.end_lineno if hasattr(item, 'end_lineno') else start_line + 20 - lines = content.split('\n') - process_lines = lines[start_line:end_line] - result["processCode"] = '\n'.join(process_lines) - break + elif attr_name == "use_kernel" and isinstance(value, bool): + result["metadata"]["use_kernel"] = value + elif attr_name == "kernel_code" and isinstance(value, str): + result["metadata"]["kernel_code"] = value + + # Parse list values (e.g. required_packages) + if attr_name == "required_packages": + list_node = None + if isinstance(item, ast.AnnAssign) and item.value: + list_node = item.value + elif isinstance(item, ast.Assign): + list_node = item.value + if isinstance(list_node, ast.List): + packages = [] + for elt in list_node.elts: + if isinstance(elt, ast.Constant) and isinstance(elt.value, str): + packages.append(elt.value) + result["metadata"]["required_packages"] = packages + + # Extract process code + if result["metadata"].get("use_kernel"): + # Kernel mode: processCode is the kernel_code attribute + result["processCode"] = result["metadata"].get("kernel_code", "") + else: + # Lazy mode: extract the process method source + for item in node.body: + if isinstance(item, ast.FunctionDef) and item.name == "process": + start_line = item.lineno - 1 + end_line = item.end_lineno if hasattr(item, 'end_lineno') else start_line + 20 + lines = content.split('\n') + process_lines = lines[start_line:end_line] + result["processCode"] = '\n'.join(process_lines) + break break diff --git a/flowfile_core/flowfile_core/schemas/input_schema.py b/flowfile_core/flowfile_core/schemas/input_schema.py index e244bad18..9333b9d49 100644 --- a/flowfile_core/flowfile_core/schemas/input_schema.py +++ b/flowfile_core/flowfile_core/schemas/input_schema.py @@ -1168,3 +1168,4 @@ class UserDefinedNode(NodeMultiInput): """Settings for a node that contains the user defined node information""" settings: Any + kernel_id: str | None = None diff --git a/flowfile_frontend/src/renderer/app/components/nodes/node-types/elements/customNode/CustomNode.vue b/flowfile_frontend/src/renderer/app/components/nodes/node-types/elements/customNode/CustomNode.vue index c5103cdf2..9d2d09f1e 100644 --- a/flowfile_frontend/src/renderer/app/components/nodes/node-types/elements/customNode/CustomNode.vue +++ b/flowfile_frontend/src/renderer/app/components/nodes/node-types/elements/customNode/CustomNode.vue @@ -9,6 +9,67 @@ {{ schema.intro }} + +
+
Kernel
+
+ + + + + {{ kernel.name }} + ({{ kernel.state }}) + + missing packages + + + + + + Manage Kernels + +
+
+ Required: + {{ pkg }} +
+
+ + No kernel selected. A kernel is required to run this node. +
+
+ + Kernel is {{ selectedKernelState }}. + + + +
+
+