Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions flow-examples/blocks/inputs/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
from oocana import Context

def main(params, context: Context):
return {
"nullable_output": params.get("nullable_input"),
"default_output": params.get("default_input")
}
15 changes: 15 additions & 0 deletions flow-examples/blocks/inputs/block.oo.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
executor:
name: python
options:
entry: __init__.py
inputs_def:
- handle: nullable_input
nullable: true
- handle: default_input
value: "default_value"
- handle: schema_input
json_schema:
type: "string"
outputs_def:
- handle: nullable_output
- handle: default_output
13 changes: 11 additions & 2 deletions flow-examples/flows/run-block/a/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,19 @@ async def main(inputs, context: Context):
run_res.add_event_callback(lambda payload: print("event callback", payload))
run_res.add_output_callback(lambda handle, value: print("output callback", handle, value))
res = await run_res.finish()
assert res.get("error") is None
assert res.get("error") is None, "Expected no error, got: {}".format(res.get("error"))

run_res = context.run_block("blk_bccc", inputs={"my_input": "111"})
res = await run_res.finish()
assert res.get("error") is not None
assert res.get("error") is not None, "Expected error, got: {}".format(res.get("error"))


run_res = context.run_block("self::inputs", inputs={"schema_input": 111, "nullable_input": None, "default_input": "default_value"}, strict=True)
res = await run_res.finish()
assert res.get("error") is not None, "Expected error, got: {}".format(res.get("error"))

run_res = context.run_block("self::inputs", inputs={"schema_input": "aaa"})
res = await run_res.finish()
assert res.get("error") is None, "Expected no error, got: {}".format(res.get("error"))

return {"a": "a"}
10 changes: 8 additions & 2 deletions oocana/oocana/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -679,12 +679,17 @@ def response_callback(payload: Dict[str, Any]):
return await f


def run_block(self, block: str, *, inputs: Dict[str, Any], additional_inputs_def: list[HandleDefDict] | None = None, additional_outputs_def: list[HandleDefDict] | None = None) -> RunResponse:
def run_block(self, block: str, *, inputs: Dict[str, Any], additional_inputs_def: list[HandleDefDict] | None = None, additional_outputs_def: list[HandleDefDict] | None = None, strict: bool = False) -> RunResponse:
"""
:param block: the id of the block to run. format: `self::<block_name>` or `<package_name>::<block_name>`.
:param inputs: the inputs of the block. if the block has no inputs, this parameter can be dict. If the inputs missing some required inputs, the response's finish future will send {"error": "error message" }.
:param inputs: the inputs of the block. if the block has no inputs, this parameter can be dict.
If the inputs missing some required inputs, the response's finish future will send {"error": "<error message>" }.
some missing inputs will be filled:
1. with the default value if the block's input_def has a default value (which is defined in the value field).
2. input_def's nullable is true, the missing input will be filled with Null.
:param additional_inputs_def: additional inputs definitions, this is a list of dicts, each dict should contain the handle(required), description, json_schema, kind, nullable and is_additional fields. This is used to define additional inputs that are not defined in the block schema.
:param additional_outputs_def: additional outputs definitions, this is a list of dicts, each dict should contain the handle(required), description, json_schema, kind, nullable and is_additional fields. This is used to define additional outputs that are not defined in the block schema.
:param strict: if True, oocana will use input_def's json_schema(only when it exists) to validate the inputs and if they are not valid oocana will reject to run this block, return error message. otherwise it will run with the inputs as they are.
:return: a RunResponse object, which contains the event callbacks and output callbacks. You can use the `add_event_callback` and `add_output_callback` methods to register callbacks for the events and outputs of the block. You can also use the `finish` method to wait for the block to finish and get the result.
Notice do not call any context.send_message or context.report_progress or context.preview and other context methods(which will send message) directly in the callbacks, it may cause deadlock.

Expand Down Expand Up @@ -721,6 +726,7 @@ def run_block(self, block: str, *, inputs: Dict[str, Any], additional_inputs_def
"additional_outputs_def": additional_outputs_def,
},
"stacks": self.__block_info.stacks,
"strict": strict,
"request_id": request_id,
})

Expand Down
Loading