Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 41 additions & 23 deletions dynamiq/callbacks/streaming.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@
from dynamiq.callbacks import BaseCallbackHandler
from dynamiq.callbacks.base import get_run_id
from dynamiq.types.streaming import (
AgentReasoningEventMessageData,
AgentToolData,
AgentToolInputDeltaData,
AgentToolInputStartData,
StreamingEntitySource,
StreamingEventMessage,
StreamingMode,
Expand Down Expand Up @@ -607,12 +607,10 @@ def _emit(self, content: str, step: str, force: bool = False) -> None:
def _emit_tool_input_start(self) -> None:
"""Emit a tool_input start event with full metadata before the first delta."""
tool_data = self._resolve_tool_data()
start_model = AgentReasoningEventMessageData(
start_model = AgentToolInputStartData(
tool_run_id=self.agent._streaming_tool_run_id or "",
thought="",
action=self._current_action_name or "",
tool=tool_data,
action_input="",
loop_num=self.loop_num,
)
self.agent.stream_content(
Expand Down Expand Up @@ -1030,21 +1028,16 @@ def _initialize_json_object_field_state(self, buf: str, field_name: str, state:
return True
return False

def _process_json_mode(self, final_answer_only: bool) -> None:
"""
Processing for function calling mode.
def _try_initialize_next_json_field(self, buf: str, final_answer_only: bool) -> None:
"""Try to initialize the next JSON field state (thought, answer, or action_input).

Supports multiple tool calls (parallel function calling) — no single-cycle
constraint is enforced here, unlike structured output mode.

Args:
final_answer_only: Whether to stream only final answers
Each initializer is a no-op when _current_state is already set, so this is safe
to call multiple times within a single chunk processing cycle.
"""
buf = self._buffer

self._initialize_json_field_state(
buf, JSONStreamingField.THOUGHT.value, StreamingState.REASONING, final_answer_only
)
if not self._state_has_emitted.get(StreamingState.REASONING, False):
self._initialize_json_field_state(
buf, JSONStreamingField.THOUGHT.value, StreamingState.REASONING, final_answer_only
)

if self._answer_started:
self._initialize_json_field_state(buf, JSONStreamingField.ANSWER.value, StreamingState.ANSWER)
Expand All @@ -1058,15 +1051,38 @@ def _process_json_mode(self, final_answer_only: bool) -> None:
buf, JSONStreamingField.ACTION_INPUT.value, StreamingState.TOOL_INPUT
)

def _emit_tool_input_state(self, buf: str) -> None:
"""Emit content for the current TOOL_INPUT state."""
if self._fc_object_tool_input:
self._emit_json_object_field_content(buf, StreamingState.TOOL_INPUT)
else:
self._emit_json_field_content(buf, StreamingState.TOOL_INPUT)

def _process_json_mode(self, final_answer_only: bool) -> None:
"""
Processing for function calling mode.

Supports multiple tool calls (parallel function calling) — no single-cycle
constraint is enforced here, unlike structured output mode.

Args:
final_answer_only: Whether to stream only final answers
"""
buf = self._buffer

self._try_initialize_next_json_field(buf, final_answer_only)

if self._current_state == StreamingState.REASONING:
self._emit_json_field_content(buf, StreamingState.REASONING)
field_complete = self._emit_json_field_content(buf, StreamingState.REASONING)
if field_complete:
# Reasoning completed — the buffer may already contain the next field
# (e.g. action_input). Re-run to detect and process it in the same chunk,
# before _reset_tool_call_state clears the buffer on the next parallel call.
self._process_json_mode(final_answer_only)
elif self._current_state == StreamingState.ANSWER:
self._emit_json_field_content(buf, StreamingState.ANSWER)
elif self._current_state == StreamingState.TOOL_INPUT:
if self._fc_object_tool_input:
self._emit_json_object_field_content(buf, StreamingState.TOOL_INPUT)
else:
self._emit_json_field_content(buf, StreamingState.TOOL_INPUT)
self._emit_tool_input_state(buf)

def _skip_whitespace(self, text: str, start: int) -> int:
"""Skip whitespace characters starting from the given position."""
Expand Down Expand Up @@ -1096,7 +1112,9 @@ def _emit_json_field_content(self, buf: str, step: str) -> bool:
self._emit(buf[segment_start:segment_end], step=step)
segment_start = segment_end
self._state_last_emit_index = end_quote
# Reset the state
# Mark the field as emitted and reset the state
if step in self._state_has_emitted:
self._state_has_emitted[step] = True
self._current_state = None
return True

Expand Down
35 changes: 19 additions & 16 deletions dynamiq/nodes/agents/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -371,7 +371,7 @@ def _stream_batch_reasoning_event(
per_tool_reasoning.append(
AgentReasoningEventMessageData(
tool_run_id=tid,
thought="",
thought=tp.get("thought", ""),
action=tp["name"],
tool=tool_data,
action_input=tp["input"],
Expand Down Expand Up @@ -585,7 +585,7 @@ def _handle_function_calling_mode(
tc_input = tc_args["action_input"]
if not isinstance(tc_input, dict):
tc_input = {"input": tc_input}
tool_items.append(ToolCallItem(name=tc_name, input=tc_input))
tool_items.append(ToolCallItem(name=tc_name, input=tc_input, thought=tc_args.get("thought", "")))

validated = ParallelToolCallsInputSchema(tools=tool_items)
action_input = validated.model_dump()
Expand Down Expand Up @@ -850,19 +850,20 @@ def _execute_single_tool(
action_type=tool.action_type.value if tool.action_type else None,
)

self._stream_agent_event(
AgentReasoningEventMessageData(
tool_run_id=tool_run_id,
thought=thought or "",
action=action,
tool=tool_data,
action_input=action_input,
loop_num=loop_num,
),
"reasoning",
config,
**kwargs,
)
if not is_parallel:
self._stream_agent_event(
AgentReasoningEventMessageData(
tool_run_id=tool_run_id,
thought=thought or "",
action=action,
tool=tool_data,
action_input=action_input,
loop_num=loop_num,
),
"reasoning",
config,
**kwargs,
)
try:
if isinstance(tool, ContextManagerTool):
tool_result = None
Expand Down Expand Up @@ -1669,7 +1670,9 @@ def _execute_tools(
}
)
continue
prepared_tools.append({"order": idx, "name": tool_name, "input": tool_input})
prepared_tools.append(
{"order": idx, "name": tool_name, "input": tool_input, "thought": td.get("thought", "")}
)

def _execute_single_tool_to_result(tool_payload: dict[str, Any], **extra) -> dict[str, Any]:
"""Execute a single tool and wrap the result as a dict."""
Expand Down
1 change: 1 addition & 0 deletions dynamiq/nodes/tools/parallel_tool_calls.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ class ToolCallItem(BaseModel):
default_factory=dict,
description="Input parameters for the tool as key-value pairs",
)
thought: str = Field(default="", description="Reasoning for this tool call.")

model_config = ConfigDict(extra="forbid")

Expand Down
43 changes: 30 additions & 13 deletions dynamiq/types/streaming.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,13 +106,43 @@ class AgentReasoningEventMessageData(BaseModel):
loop_num: int


# ---------------------------------------------------------------------------
# Tool input streaming models
# ---------------------------------------------------------------------------


class AgentToolInputStartData(BaseModel):
"""Emitted once when tool_input streaming begins for a tool call."""

tool_run_id: str
action: str
tool: AgentToolData
loop_num: int


class AgentToolInputDeltaData(BaseModel):
"""Lean delta for tool_input streaming. Only tool_run_id and action_input change."""

tool_run_id: str
action_input: Any


class AgentToolInputErrorEventMessageData(BaseModel):
"""Emitted when action parsing fails after tool input was already
partially streamed, so consumers can discard the invalid chunks.
"""

tool_run_id: str
name: str
error: str
loop_num: int


# ---------------------------------------------------------------------------
# Tool result streaming model
# ---------------------------------------------------------------------------


class AgentToolResultEventMessageData(BaseModel):
"""Model for agent tool result streaming event data."""

Expand Down Expand Up @@ -144,19 +174,6 @@ def to_dict(self, **kwargs) -> dict:
return data


class AgentToolInputErrorEventMessageData(BaseModel):
"""Model for agent tool input error streaming event data.

Emitted when action parsing fails after tool input was already
partially streamed, so consumers can discard the invalid chunks.
"""

tool_run_id: str
name: str
error: str
loop_num: int


class StreamingConfig(BaseModel):
"""Configuration for streaming.

Expand Down
Loading
Loading