Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions flowfile_core/flowfile_core/flowfile/code_generator/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,23 @@
UnsupportedNodeError,
export_flow_to_polars,
)
from flowfile_core.flowfile.code_generator.python_script_rewriter import (
FlowfileUsageAnalysis,
analyze_flowfile_usage,
build_function_code,
extract_imports,
get_required_packages,
rewrite_flowfile_calls,
)

__all__ = [
"FlowGraphToPolarsConverter",
"UnsupportedNodeError",
"export_flow_to_polars",
"FlowfileUsageAnalysis",
"analyze_flowfile_usage",
"build_function_code",
"extract_imports",
"get_required_packages",
"rewrite_flowfile_calls",
]
110 changes: 110 additions & 0 deletions flowfile_core/flowfile_core/flowfile/code_generator/code_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,12 @@ def __init__(self, flow_graph: FlowGraph):
self.last_node_var = None
self.unsupported_nodes = []
self.custom_node_classes = {}
# Track which artifacts have been published: (kernel_id, artifact_name) → node_id
self._published_artifacts: dict[tuple[str, str], int] = {}
# Track if any python_script nodes exist (to emit _artifacts = {} once)
self._has_python_script_nodes: bool = False
# Track which kernel IDs are used (for initializing per-kernel sub-dicts)
self._kernel_ids_used: list[str] = []

def convert(self) -> str:
"""
Expand Down Expand Up @@ -1118,6 +1124,104 @@ def _handle_polars_code(
self._add_code(f"{var_name} = _polars_code_{var_name.replace('df_', '')}({args})")
self._add_code("")

def _handle_python_script(
self, settings: input_schema.NodePythonScript, var_name: str, input_vars: dict[str, str]
) -> None:
"""Handle python_script nodes by rewriting flowfile.* calls to plain Python."""
from flowfile_core.flowfile.code_generator.python_script_rewriter import (
analyze_flowfile_usage,
build_function_code,
extract_imports,
rewrite_flowfile_calls,
)

code = settings.python_script_input.code.strip()
kernel_id = settings.python_script_input.kernel_id
node_id = settings.node_id

# Handle empty code — pass through input
if not code:
if input_vars:
self._add_code(f"{var_name} = {list(input_vars.values())[0]}")
else:
self._add_code(f"{var_name} = pl.LazyFrame()")
return

# 1. Analyze flowfile usage
try:
analysis = analyze_flowfile_usage(code)
except SyntaxError as e:
self.unsupported_nodes.append((
node_id,
"python_script",
f"Syntax error in python_script code: {e}"
))
return
self._has_python_script_nodes = True
effective_kernel_id = kernel_id or "_default"
if effective_kernel_id not in self._kernel_ids_used:
self._kernel_ids_used.append(effective_kernel_id)

# 2. Validate artifact dependencies are available (same kernel only)
for artifact_name in analysis.artifacts_consumed:
if (effective_kernel_id, artifact_name) not in self._published_artifacts:
self.unsupported_nodes.append((
node_id,
"python_script",
f"Artifact '{artifact_name}' is consumed but not published by any "
f"upstream node on kernel '{effective_kernel_id}'"
))
return

# 4. Extract and register imports
user_imports = extract_imports(code)
for imp in user_imports:
self.imports.add(imp)

# 5. Add kernel package requirements as comments
if kernel_id:
self._add_kernel_requirements(kernel_id, user_imports)

# 6. Rewrite the code (kernel_id scopes artifact access)
rewritten, unsupported_markers = rewrite_flowfile_calls(code, analysis, kernel_id=kernel_id)

# 7. Build and emit the function
func_def, call_code = build_function_code(
node_id, rewritten, analysis, input_vars,
kernel_id=kernel_id, unsupported_markers=unsupported_markers,
)

self._add_code(f"# --- Node {node_id}: python_script ---")
for line in func_def.split("\n"):
self._add_code(line)
self._add_code("")
self._add_code(call_code)

# 8. Track published artifacts for validation of downstream nodes
for artifact_name, _ in analysis.artifacts_published:
self._published_artifacts[(effective_kernel_id, artifact_name)] = node_id

self._add_code("")

def _add_kernel_requirements(self, kernel_id: str, user_imports: list[str]) -> None:
"""Add a comment block with required packages from kernel config."""
try:
from flowfile_core.flowfile.code_generator.python_script_rewriter import get_required_packages
from flowfile_core.kernel.manager import get_kernel_manager

manager = get_kernel_manager()
kernel = manager._kernels.get(kernel_id)
if not kernel or not kernel.packages:
return

required = get_required_packages(user_imports, kernel.packages)
if required:
self._add_code(f"# Required packages: {', '.join(required)}")
self._add_code(f"# Install with: pip install {' '.join(required)}")
self._add_code("")
except Exception:
pass # Kernel manager not available; skip requirements comment

# Handlers for unsupported node types - these add nodes to the unsupported list

def _handle_explore_data(
Expand Down Expand Up @@ -1639,6 +1743,12 @@ def _build_final_code(self) -> str:
lines.append(f" ETL Pipeline: {self.flow_graph.__name__}")
lines.append(" Generated from Flowfile")
lines.append(' """')

# Artifact store — one sub-dict per kernel, matching runtime isolation
if self._has_python_script_nodes or self._published_artifacts:
kernel_init = ", ".join(f'"{kid}": {{}}' for kid in self._kernel_ids_used)
lines.append(f" _artifacts = {{{kernel_init}}} # Artifact store (per kernel)")

lines.append(" ")

# Add the generated code
Expand Down
Loading