Skip to content

Commit b16abe4

Browse files
committed
fix: consolidated protocol robustness improvements
## Summary This PR consolidates **2 protocol robustness improvements**. ### Included PRs: - #84: Add buffer size limit to StreamProcessor - #85: Add ToolState transition validation ### Key Changes: - Added MAX_BUFFER_SIZE constant (10,000 events) for StreamProcessor - Modified process() to drop oldest events when buffer is full - Pre-allocated buffer capacity in new() for better performance - Added can_transition_to() method to ToolState enum - Updated update_tool_state to log warnings on invalid transitions - Documented valid state machine transitions ### Files Modified: - src/cortex-engine/src/streaming.rs - src/cortex-protocol/src/protocol/message_parts.rs Closes #84, #85
1 parent c398212 commit b16abe4

File tree

4 files changed

+60
-1
lines changed

4 files changed

+60
-1
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/cortex-engine/src/streaming.rs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,10 @@ use futures::Stream;
1515
use serde::{Deserialize, Serialize};
1616
use tokio::sync::mpsc;
1717

18+
/// Maximum number of events to buffer before dropping old ones.
19+
/// Prevents unbounded memory growth if drain_events() is not called regularly.
20+
const MAX_BUFFER_SIZE: usize = 10_000;
21+
1822
/// Token usage for streaming.
1923
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
2024
pub struct StreamTokenUsage {
@@ -213,7 +217,7 @@ impl StreamProcessor {
213217
Self {
214218
state: StreamState::Idle,
215219
content: StreamContent::new(),
216-
buffer: VecDeque::new(),
220+
buffer: VecDeque::with_capacity(1024), // Pre-allocate reasonable capacity
217221
start_time: None,
218222
first_token_time: None,
219223
last_event_time: None,
@@ -284,6 +288,10 @@ impl StreamProcessor {
284288
}
285289
}
286290

291+
// Enforce buffer size limit to prevent unbounded memory growth
292+
if self.buffer.len() >= MAX_BUFFER_SIZE {
293+
self.buffer.pop_front();
294+
}
287295
self.buffer.push_back(event);
288296
}
289297

src/cortex-protocol/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ uuid = { workspace = true, features = ["serde", "v4"] }
2020
chrono = { workspace = true }
2121
strum_macros = "0.27"
2222
base64 = { workspace = true }
23+
tracing = { workspace = true }
2324

2425
[dev-dependencies]
2526
pretty_assertions = { workspace = true }

src/cortex-protocol/src/protocol/message_parts.rs

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -182,6 +182,45 @@ pub enum ToolState {
182182
},
183183
}
184184

185+
186+
impl ToolState {
187+
/// Check if transitioning to the given state is valid.
188+
///
189+
/// Valid transitions:
190+
/// - Pending -> Running, Completed, Error
191+
/// - Running -> Completed, Error
192+
/// - Completed -> (terminal, no transitions)
193+
/// - Error -> (terminal, no transitions)
194+
///
195+
/// State machine:
196+
/// ```text
197+
/// Pending -> Running -> Completed
198+
/// | |
199+
/// | +-> Error
200+
/// +-> Completed
201+
/// +-> Error
202+
/// ```
203+
pub fn can_transition_to(&self, target: &ToolState) -> bool {
204+
match (self, target) {
205+
// From Pending, can go to any non-Pending state
206+
(ToolState::Pending { .. }, ToolState::Running { .. }) => true,
207+
(ToolState::Pending { .. }, ToolState::Completed { .. }) => true,
208+
(ToolState::Pending { .. }, ToolState::Error { .. }) => true,
209+
210+
// From Running, can go to Completed or Error
211+
(ToolState::Running { .. }, ToolState::Completed { .. }) => true,
212+
(ToolState::Running { .. }, ToolState::Error { .. }) => true,
213+
214+
// Terminal states cannot transition
215+
(ToolState::Completed { .. }, _) => false,
216+
(ToolState::Error { .. }, _) => false,
217+
218+
// Any other transition is invalid
219+
_ => false,
220+
}
221+
}
222+
}
223+
185224
/// Subtask execution status.
186225
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, JsonSchema)]
187226
#[serde(rename_all = "snake_case")]
@@ -552,6 +591,8 @@ impl MessageWithParts {
552591
}
553592

554593
/// Update a tool state by call ID.
594+
///
595+
/// Logs a warning if the state transition is invalid (e.g., from a terminal state).
555596
pub fn update_tool_state(&mut self, call_id: &str, new_state: ToolState) -> bool {
556597
for part in &mut self.parts {
557598
if let MessagePart::Tool {
@@ -561,6 +602,14 @@ impl MessageWithParts {
561602
} = &mut part.part
562603
{
563604
if cid == call_id {
605+
if !state.can_transition_to(&new_state) {
606+
tracing::warn!(
607+
"Invalid ToolState transition from {:?} to {:?} for call_id {}",
608+
state,
609+
new_state,
610+
call_id
611+
);
612+
}
564613
*state = new_state;
565614
return true;
566615
}

0 commit comments

Comments
 (0)