Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions flowfile_core/flowfile_core/flowfile/flow_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -1218,6 +1218,15 @@ def _func(*flowfile_tables: FlowDataEngine) -> FlowDataEngine:
for line in result.stderr.strip().splitlines():
node_logger.warning(f"[stderr] {line}")

# Store display outputs on the node so the frontend can retrieve them
if result.display_outputs:
node = self.get_node(node_id)
if node is not None:
node.results.display_outputs = [
{"mime_type": d.mime_type, "data": d.data, "title": d.title}
for d in result.display_outputs
]

if not result.success:
raise RuntimeError(f"Kernel execution failed: {result.error}")

Expand Down
3 changes: 3 additions & 0 deletions flowfile_core/flowfile_core/flowfile/flow_node/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,7 @@ class NodeResults:
errors: str | None = None
warnings: str | None = None
analysis_data_generator: Callable[[], pa.Table] | None = None
display_outputs: list[dict] | None = None

def __init__(self):
self._resulting_data = None
Expand All @@ -265,6 +266,7 @@ def __init__(self):
self.warnings = None
self.example_data_generator = None
self.analysis_data_generator = None
self.display_outputs = None

def get_example_data(self) -> pa.Table | None:
"""
Expand Down Expand Up @@ -294,3 +296,4 @@ def reset(self):
"""Resets all result attributes to their default, empty state."""
self._resulting_data = None
self.run_time = -1
self.display_outputs = None
15 changes: 15 additions & 0 deletions flowfile_core/flowfile_core/routes/routes.py
Original file line number Diff line number Diff line change
Expand Up @@ -868,6 +868,21 @@ def get_node(flow_id: int, node_id: int, get_data: bool = False):
return v


@router.get("/node/display_outputs", tags=["editor"])
def get_node_display_outputs(flow_id: int, node_id: int):
"""Retrieves display outputs (images, HTML, text) from the last flow execution of a python_script node."""
flow = flow_file_handler.get_flow(flow_id)
if not flow:
raise HTTPException(status_code=404, detail="Flow not found")
node = flow.get_node(node_id)
if node is None:
raise HTTPException(status_code=404, detail="Node not found")
display_outputs = node.results.display_outputs
if display_outputs is None:
return []
return display_outputs


@router.post("/node/description/", tags=["editor"])
def update_description_node(flow_id: int, node_id: int, description: str = Body(...)):
"""Updates the description text for a specific node."""
Expand Down
16 changes: 15 additions & 1 deletion flowfile_frontend/src/renderer/app/api/node.api.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
// Node API Service - Handles all node-related HTTP requests
import axios from "../services/axios.config";
import type { NodeData, TableExample, NodeDescriptionResponse } from "../types";
import type { NodeData, TableExample, NodeDescriptionResponse, DisplayOutput } from "../types";

export class NodeApi {
/**
Expand Down Expand Up @@ -116,6 +116,20 @@ export class NodeApi {
return response.data;
}

/**
* Get display outputs from the last flow execution of a python_script node
*/
static async getDisplayOutputs(flowId: number, nodeId: number): Promise<DisplayOutput[]> {
try {
const response = await axios.get<DisplayOutput[]>("/node/display_outputs", {
params: { flow_id: flowId, node_id: nodeId },
});
return response.data;
} catch {
return [];
}
}

/**
* Update user-defined node settings
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,16 +176,18 @@
</template>

<script lang="ts" setup>
import { ref, computed, onUnmounted } from "vue";
import { ref, computed, watch, onUnmounted } from "vue";
import { CodeLoader } from "vue-content-loader";

import { useNodeStore } from "../../../../../stores/node-store";
import { useEditorStore } from "../../../../../stores/editor-store";
import { useNodeSettings } from "../../../../../composables/useNodeSettings";
import type { NodePythonScript, NotebookCell } from "../../../../../types/node.types";
import type { NodeData } from "../../../baseNode/nodeInterfaces";
import type { KernelInfo, KernelMemoryInfo } from "../../../../../types/kernel.types";
import { KernelApi } from "../../../../../api/kernel.api";
import { FlowApi } from "../../../../../api/flow.api";
import { NodeApi } from "../../../../../api/node.api";
import GenericNodeSettings from "../../../baseNode/genericNodeSettings.vue";
import FlowfileApiHelp from "./FlowfileApiHelp.vue";
import NotebookEditor from "./NotebookEditor.vue";
Expand All @@ -194,6 +196,7 @@ import { createPythonScriptNode, DEFAULT_PYTHON_SCRIPT_CODE } from "./utils";
// ─── State ──────────────────────────────────────────────────────────────────

const nodeStore = useNodeStore();
const editorStore = useEditorStore();
const dataLoaded = ref(false);
const showEditor = ref(false);
const showHelp = ref(false);
Expand Down Expand Up @@ -382,6 +385,50 @@ const loadArtifacts = async () => {
}
};

// ─── Flow run display outputs ────────────────────────────────────────────────

const loadFlowRunDisplayOutputs = async () => {
const flowId = nodePythonScript.value?.flow_id;
const nodeId = nodePythonScript.value?.node_id;
if (flowId == null || nodeId == null) return;

try {
const outputs = await NodeApi.getDisplayOutputs(Number(flowId), nodeId);
if (outputs.length > 0 && cells.value.length > 0) {
// Attach display outputs to the last cell
const lastCell = cells.value[cells.value.length - 1];
cells.value = cells.value.map(c =>
c.id === lastCell.id
? {
...c,
output: {
stdout: "",
stderr: "",
display_outputs: outputs,
error: null,
execution_time_ms: 0,
execution_count: 0,
},
}
: c,
);
}
} catch {
// Silently ignore — display outputs are best-effort
}
};

// Watch for flow run completion to refresh display outputs
watch(
() => editorStore.isRunning,
(running, wasRunning) => {
if (wasRunning && !running && dataLoaded.value) {
loadFlowRunDisplayOutputs();
loadArtifacts();
}
},
);

// ─── Cell sync ──────────────────────────────────────────────────────────────

const handleCellsUpdate = (updatedCells: NotebookCell[]) => {
Expand Down Expand Up @@ -457,10 +504,11 @@ const loadNodeData = async (nodeId: number) => {
showEditor.value = true;
dataLoaded.value = true;

// Load kernels, artifacts, and start memory polling
// Load kernels, artifacts, display outputs, and start memory polling
await loadKernels();
startKernelPolling();
loadArtifacts();
loadFlowRunDisplayOutputs();
if (selectedKernelId.value) {
startMemoryPolling();
}
Expand Down
Loading