From fbccf46df909970e9b6d5d9e1aa396acac600b1b Mon Sep 17 00:00:00 2001 From: Claude Date: Wed, 11 Feb 2026 18:07:07 +0000 Subject: [PATCH 1/4] Integrate kernel runtime into custom node designer Add support for custom nodes to execute in kernel containers, enabling use of arbitrary Python packages beyond what Polars provides. Custom nodes can now declare required packages (spec), and at runtime users select a kernel that satisfies those requirements. Backend changes: - Add use_kernel, required_packages, kernel_code fields to CustomNodeBase - Add kernel_id to UserDefinedNode for runtime kernel selection - Split add_user_defined_node() into lazy and kernel execution paths - Add _add_user_defined_node_kernel() following add_python_script() pattern - Add _build_kernel_code_for_custom_node() for settings injection - Parse kernel fields (use_kernel, required_packages, kernel_code) in get_custom_node() route for Node Browser loading Frontend changes: - Add kernel execution toggle and required packages input to NodeDesigner - Add kernel-mode code editor with flowfile API hints - Implement dual-mode code generation (lazy vs kernel) - Add kernel-mode validation rules (flowfile.publish_output check) - Add runtime kernel selector with package matching to CustomNode.vue - Update types: NodeMetadata, CustomNodeSchema, NodeUserDefined https://claude.ai/code/session_01Sody7mJjQrzJvSrmF5HMVe --- .../flowfile_core/flowfile/flow_graph.py | 149 +++++++++++ .../flowfile/node_designer/custom_node.py | 8 + .../routes/user_defined_components.py | 44 ++- .../flowfile_core/schemas/input_schema.py | 1 + .../elements/customNode/CustomNode.vue | 250 +++++++++++++++++- .../elements/customNode/interface.ts | 3 + .../src/renderer/app/pages/NodeDesigner.vue | 102 ++++++- .../pages/nodeDesigner/ProcessCodeEditor.vue | 13 +- .../composables/useCodeGeneration.ts | 82 ++++-- .../composables/useNodeValidation.ts | 19 +- .../app/pages/nodeDesigner/constants.ts | 12 + .../renderer/app/pages/nodeDesigner/types.ts | 2 + .../src/renderer/app/types/node.types.ts | 1 + 13 files changed, 638 insertions(+), 48 deletions(-) diff --git a/flowfile_core/flowfile_core/flowfile/flow_graph.py b/flowfile_core/flowfile_core/flowfile/flow_graph.py index 641b28cad..659248af2 100644 --- a/flowfile_core/flowfile_core/flowfile/flow_graph.py +++ b/flowfile_core/flowfile_core/flowfile/flow_graph.py @@ -840,10 +840,24 @@ def add_user_defined_node( ): """Adds a user-defined custom node to the graph. + Routes to kernel execution when the node declares ``use_kernel=True`` + and a ``kernel_id`` is provided at runtime; otherwise falls back to the + standard lazy-plan execution path. + Args: custom_node: The custom node instance to add. user_defined_node_settings: The settings for the user-defined node. """ + kernel_id = user_defined_node_settings.kernel_id + if custom_node.use_kernel and kernel_id: + self._add_user_defined_node_kernel(custom_node, user_defined_node_settings, kernel_id) + else: + self._add_user_defined_node_lazy(custom_node, user_defined_node_settings) + + def _add_user_defined_node_lazy( + self, custom_node: CustomNodeBase, user_defined_node_settings: input_schema.UserDefinedNode + ): + """Standard lazy-plan execution for custom nodes.""" def _func(*flow_data_engine: FlowDataEngine) -> FlowDataEngine | None: user_id = user_defined_node_settings.user_id @@ -872,6 +886,141 @@ def _func(*flow_data_engine: FlowDataEngine) -> FlowDataEngine | None: node = self.get_node(user_defined_node_settings.node_id) self.add_node_to_starting_list(node) + # ------------------------------------------------------------------ + # Kernel-based execution for custom nodes + # ------------------------------------------------------------------ + + @staticmethod + def _build_kernel_code_for_custom_node( + custom_node: CustomNodeBase, + settings: Any | None, + ) -> str: + """Build the Python source sent to the kernel. + + Injects ``import`` statements, flattened settings values as Python + variables and finally the user's ``kernel_code``. + """ + lines: list[str] = ["import polars as pl", "import flowfile", ""] + + # Inject settings as plain Python variables + if settings and isinstance(settings, dict): + lines.append("# Settings (injected from node configuration)") + for section_values in settings.values(): + if isinstance(section_values, dict): + for key, value in section_values.items(): + lines.append(f"{key} = {repr(value)}") + lines.append("") + + lines.append("# User code") + lines.append(custom_node.kernel_code) + return "\n".join(lines) + + def _add_user_defined_node_kernel( + self, + custom_node: CustomNodeBase, + user_defined_node_settings: input_schema.UserDefinedNode, + kernel_id: str, + ): + """Execute a custom node inside a kernel container. + + Follows the same pattern as ``add_python_script``: materialise inputs + to Parquet via the worker, send the code to the kernel for execution, + then read the output Parquet back. + """ + + def _func(*flowfile_tables: FlowDataEngine) -> FlowDataEngine: + manager = get_kernel_manager() + node_id = user_defined_node_settings.node_id + flow_id = self.flow_id + node_logger = self.flow_logger.get_node_logger(node_id) + + shared_base = manager.shared_volume_path + input_dir = os.path.join(shared_base, str(flow_id), str(node_id), "inputs") + output_dir = os.path.join(shared_base, str(flow_id), str(node_id), "outputs") + os.makedirs(input_dir, exist_ok=True) + os.makedirs(output_dir, exist_ok=True) + self.flow_logger.info(f"Prepared shared directories for kernel execution: {input_dir}, {output_dir}") + + # Materialise inputs to Parquet + input_paths: dict[str, list[str]] = {} + main_paths: list[str] = [] + for idx, ft in enumerate(flowfile_tables): + filename = f"main_{idx}.parquet" + local_path = os.path.join(input_dir, filename) + fetcher = ExternalDfFetcher( + flow_id=flow_id, + node_id=node_id, + lf=ft.data_frame, + wait_on_completion=True, + operation_type="write_parquet", + kwargs={"output_path": local_path}, + ) + if fetcher.has_error: + raise RuntimeError(f"Failed to write parquet for input {idx}: {fetcher.error_description}") + main_paths.append(manager.to_kernel_path(local_path)) + input_paths["main"] = main_paths + + # Build kernel code with injected settings + code = self._build_kernel_code_for_custom_node(custom_node, user_defined_node_settings.settings) + + # Log callback URL + if manager._kernel_volume: + log_callback_url = f"http://flowfile-core:{SERVER_PORT}/raw_logs" + else: + log_callback_url = f"http://host.docker.internal:{SERVER_PORT}/raw_logs" + + # Internal auth token + internal_token: str | None = None + try: + from flowfile_core.auth.jwt import get_internal_token + + internal_token = get_internal_token() + except (ValueError, ImportError): + pass + + reg_id = self._flow_settings.source_registration_id + request = ExecuteRequest( + node_id=node_id, + code=code, + input_paths=input_paths, + output_dir=manager.to_kernel_path(output_dir), + flow_id=flow_id, + source_registration_id=reg_id, + log_callback_url=log_callback_url, + internal_token=internal_token, + ) + result = manager.execute_sync(kernel_id, request, self.flow_logger) + + # Forward stdout / stderr + if result.stdout: + for line in result.stdout.strip().splitlines(): + node_logger.info(f"[stdout] {line}") + if result.stderr: + for line in result.stderr.strip().splitlines(): + node_logger.warning(f"[stderr] {line}") + + if not result.success: + raise RuntimeError(f"Kernel execution failed: {result.error}") + + # Read output + output_path = os.path.join(output_dir, "main.parquet") + if os.path.exists(output_path): + return FlowDataEngine(pl.scan_parquet(output_path)) + + # No output published – pass through first input + return flowfile_tables[0] if flowfile_tables else FlowDataEngine(pl.LazyFrame()) + + self.add_node_step( + node_id=user_defined_node_settings.node_id, + function=_func, + setting_input=user_defined_node_settings, + input_node_ids=user_defined_node_settings.depending_on_ids, + node_type=custom_node.item, + ) + if custom_node.number_of_inputs == 0: + node = self.get_node(user_defined_node_settings.node_id) + self.add_node_to_starting_list(node) + @with_history_capture(HistoryActionType.UPDATE_SETTINGS) def add_pivot(self, pivot_settings: input_schema.NodePivot): """Adds a pivot node to the graph. diff --git a/flowfile_core/flowfile_core/flowfile/node_designer/custom_node.py b/flowfile_core/flowfile_core/flowfile/node_designer/custom_node.py index e8e0ff565..2362a318b 100644 --- a/flowfile_core/flowfile_core/flowfile/node_designer/custom_node.py +++ b/flowfile_core/flowfile_core/flowfile/node_designer/custom_node.py @@ -336,6 +336,11 @@ class CustomNodeBase(BaseModel): number_of_inputs: int = 1 number_of_outputs: int = 1 + # Kernel execution configuration + use_kernel: bool = False + required_packages: list[str] = [] + kernel_code: str = "" + # Display properties in the UI node_group: str | None = "custom" title: str | None = "Custom Node" @@ -419,6 +424,9 @@ def get_frontend_schema(self) -> dict: "node_group": self.node_group, "title": self.title, "intro": self.intro, + "use_kernel": self.use_kernel, + "required_packages": self.required_packages, + "kernel_code": self.kernel_code, } if self.settings_schema: diff --git a/flowfile_core/flowfile_core/routes/user_defined_components.py b/flowfile_core/flowfile_core/routes/user_defined_components.py index bc67addaa..6433bab4d 100644 --- a/flowfile_core/flowfile_core/routes/user_defined_components.py +++ b/flowfile_core/flowfile_core/routes/user_defined_components.py @@ -300,17 +300,39 @@ def get_custom_node(file_name: str) -> dict[str, Any]: result["metadata"]["number_of_inputs"] = value elif attr_name == "number_of_outputs": result["metadata"]["number_of_outputs"] = value - - # Extract process method - for item in node.body: - if isinstance(item, ast.FunctionDef) and item.name == "process": - # Get the source code of the process method - start_line = item.lineno - 1 - end_line = item.end_lineno if hasattr(item, 'end_lineno') else start_line + 20 - lines = content.split('\n') - process_lines = lines[start_line:end_line] - result["processCode"] = '\n'.join(process_lines) - break + elif attr_name == "use_kernel" and isinstance(value, bool): + result["metadata"]["use_kernel"] = value + elif attr_name == "kernel_code" and isinstance(value, str): + result["metadata"]["kernel_code"] = value + + # Parse list values (e.g. required_packages) + if attr_name == "required_packages": + list_node = None + if isinstance(item, ast.AnnAssign) and item.value: + list_node = item.value + elif isinstance(item, ast.Assign): + list_node = item.value + if isinstance(list_node, ast.List): + packages = [] + for elt in list_node.elts: + if isinstance(elt, ast.Constant) and isinstance(elt.value, str): + packages.append(elt.value) + result["metadata"]["required_packages"] = packages + + # Extract process code + if result["metadata"].get("use_kernel"): + # Kernel mode: processCode is the kernel_code attribute + result["processCode"] = result["metadata"].get("kernel_code", "") + else: + # Lazy mode: extract the process method source + for item in node.body: + if isinstance(item, ast.FunctionDef) and item.name == "process": + start_line = item.lineno - 1 + end_line = item.end_lineno if hasattr(item, 'end_lineno') else start_line + 20 + lines = content.split('\n') + process_lines = lines[start_line:end_line] + result["processCode"] = '\n'.join(process_lines) + break break diff --git a/flowfile_core/flowfile_core/schemas/input_schema.py b/flowfile_core/flowfile_core/schemas/input_schema.py index e244bad18..9333b9d49 100644 --- a/flowfile_core/flowfile_core/schemas/input_schema.py +++ b/flowfile_core/flowfile_core/schemas/input_schema.py @@ -1168,3 +1168,4 @@ class UserDefinedNode(NodeMultiInput): """Settings for a node that contains the user defined node information""" settings: Any + kernel_id: str | None = None diff --git a/flowfile_frontend/src/renderer/app/components/nodes/node-types/elements/customNode/CustomNode.vue b/flowfile_frontend/src/renderer/app/components/nodes/node-types/elements/customNode/CustomNode.vue index c5103cdf2..9d2d09f1e 100644 --- a/flowfile_frontend/src/renderer/app/components/nodes/node-types/elements/customNode/CustomNode.vue +++ b/flowfile_frontend/src/renderer/app/components/nodes/node-types/elements/customNode/CustomNode.vue @@ -9,6 +9,67 @@ {{ schema.intro }} + +
+
Kernel
+
+ + + + + {{ kernel.name }} + ({{ kernel.state }}) + + missing packages + + + + + + Manage Kernels + +
+
+ Required: + {{ pkg }} +
+
+ + No kernel selected. A kernel is required to run this node. +
+
+ + Kernel is {{ selectedKernelState }}. + + + +
+
+