Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 41 additions & 0 deletions flow-examples/flows/query-downstream/flow.oo.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
nodes:
- node_id: +python#1
title: "Python #1"
icon: ":logos:python:"
task:
ui:
default_width: 450
inputs_def:
[]
outputs_def:
- handle: output1
json_schema:
{}
executor:
name: python
options:
entry: scriptlets/+scriptlet#1.py
- node_id: end
title: "Python #2"
icon: ":logos:python:"
task:
ui:
default_width: 450
inputs_def:
- handle: output1
json_schema:
{}
nullable: false
description: this is a description for handle
outputs_def:
[]
executor:
name: python
options:
entry: scriptlets/+scriptlet#2.py
inputs_from:
- handle: output1
from_node:
- node_id: +python#1
output_handle: output1
description: this is a downstream node
34 changes: 34 additions & 0 deletions flow-examples/flows/query-downstream/scriptlets/+scriptlet#1.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
from oocana import Context

#region generated meta
import typing
Inputs = typing.Dict[str, typing.Any]
class Outputs(typing.TypedDict):
output: typing.Any
#endregion

async def main(params: Inputs, context: Context) -> Outputs:

result = await context.query_downstream()

print("query result:", result)

assert isinstance(result, dict), "Expected result to be a dictionary"
assert "output1" in result, "Expected 'output1' key to be present in the result"
output1 = result["output1"]
assert "to_node" in output1, "Expected 'to_node' key to be present in the result"
node_upstream = output1["to_node"]
assert isinstance(node_upstream, list), "Expected 'to_node' to be a list"
assert len(node_upstream) > 0, "Expected 'to_node' list to contain at least one node"

first_upstream_node = node_upstream[0]
assert isinstance(first_upstream_node, dict), "Expected first upstream node to be a dictionary"
assert first_upstream_node.get("node_id") == "end", "Expected first upstream node to have node_id 'end'"
assert first_upstream_node.get("description") == "this is a downstream node", "Expected first upstream node to have description"
assert first_upstream_node.get("input_handle") == "output1", "Expected first upstream node to have input_handle 'output1'"
input_handle_def = first_upstream_node.get("input_handle_def")
assert isinstance(input_handle_def, dict), "Expected input_handle_def to be a dictionary"
assert input_handle_def.get("handle") == "output1", "Expected input_handle_def to have handle 'output1'"
assert input_handle_def.get("description") == "this is a description for handle", "Expected input_handle_def to have description"

return { "output": "output_value" }
14 changes: 14 additions & 0 deletions flow-examples/flows/query-downstream/scriptlets/+scriptlet#2.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
from oocana import Context

#region generated meta
import typing
class Inputs(typing.TypedDict):
output1: typing.Any
Outputs = typing.Dict[str, typing.Any]
#endregion

def main(params: Inputs, context: Context) -> Outputs:

# your code

return { "output1": "output_value" }
6 changes: 6 additions & 0 deletions flow-examples/test/flow.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,12 @@ describe(
expect(code).toBe(0);
});

it("run query-downstream flow", async () => {
files.delete("query-downstream");
const { code } = await run("query-downstream");
expect(code).toBe(0);
});

it("run var flow", async () => {
files.delete("var");
const { code } = await run("var");
Expand Down
48 changes: 48 additions & 0 deletions oocana/oocana/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,22 @@ class QueryBlockResponse(TypedDict):
"""if the block has additional inputs, this field should be True, otherwise False.
"""


class FlowDownstream(TypedDict):
output_handle: str
output_handle_def: HandleDefDict | None

class NodeDownstream(TypedDict):
node_id: str
description: str | None
"""node description"""
input_handle: str
input_handle_def: HandleDefDict | None

class Downstream(TypedDict):
to_flow: list[FlowDownstream]
to_node: list[NodeDownstream]

class OnlyEqualSelf:
def __eq__(self, value: object) -> bool:
return self is value
Expand Down Expand Up @@ -581,6 +597,38 @@ def send_error(self, error: str):

def error(self, error: str):
self.__mainframe.send(self.job_info, {"type": "BlockError", "error": error})

async def query_downstream(self, handles: list[str] | None = None) -> Dict[str, Downstream]:
"""
query the downstream nodes of the given output handles.
:param handle: the handle of the output, should be defined in the block schema output defs. If None means query all handles.
:return: a dict that contains the downstream nodes, including the node id and input handle.
"""
request_id = random_string(16)
loop = asyncio.get_running_loop()
f: asyncio.Future[Dict[str, Any]] = loop.create_future()

def response_callback(payload: Dict[str, Any]):
if payload.get("request_id") != request_id:
return
self.__mainframe.remove_request_response_callback(self.session_id, request_id, response_callback)
if payload.get("result") is not None:
loop.call_soon_threadsafe(lambda: f.set_result(payload.get("result", {})))
elif payload.get("error") is not None:
loop.call_soon_threadsafe(lambda: f.set_exception(ValueError(payload.get("error", "Unknown error occurred while querying the downstream."))))

self.__mainframe.add_request_response_callback(self.session_id, request_id, response_callback)

self.__mainframe.send(self.job_info, {
"type": "BlockRequest",
"action": "QueryDownstream",
"handles": handles,
"session_id": self.session_id,
"job_id": self.job_id,
"request_id": request_id,
})

return await f

async def query_block(self, block: str) -> QueryBlockResponse:
"""
Expand Down
Loading