Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions config/plano_config_schema.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ properties:
- streamable-http
tool:
type: string
streaming:
type: boolean
additionalProperties: false
required:
- id
Expand Down
2 changes: 1 addition & 1 deletion crates/brightstaff/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ opentelemetry-stdout = "0.31"
opentelemetry_sdk = { version = "0.31", features = ["rt-tokio"] }
pretty_assertions = "1.4.1"
rand = "0.9.2"
reqwest = { version = "0.12.15", features = ["stream"] }
reqwest = { version = "0.12.15", features = ["stream", "http2"] }
serde = { version = "1.0.219", features = ["derive"] }
serde_json = "1.0.140"
serde_with = "3.13.0"
Expand Down
1 change: 1 addition & 0 deletions crates/brightstaff/src/handlers/agent_selector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,7 @@ mod tests {
url: "http://localhost:8080".to_string(),
tool: None,
transport: None,
streaming: None,
}
}

Expand Down
2 changes: 2 additions & 0 deletions crates/brightstaff/src/handlers/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,13 +52,15 @@ mod tests {
url: "http://localhost:8081".to_string(),
tool: None,
transport: None,
streaming: None,
},
Agent {
id: "terminal-agent".to_string(),
agent_type: Some("terminal".to_string()),
url: "http://localhost:8082".to_string(),
tool: None,
transport: None,
streaming: None,
},
];

Expand Down
190 changes: 168 additions & 22 deletions crates/brightstaff/src/handlers/pipeline_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ use hermesllm::{ProviderRequest, ProviderRequestType};
use hyper::header::HeaderMap;
use opentelemetry::global;
use opentelemetry_http::HeaderInjector;
use tokio::sync::mpsc;
use tokio_stream::wrappers::ReceiverStream;
use tokio_stream::StreamExt;
use tracing::{debug, info, instrument, warn};

use crate::handlers::jsonrpc::{
Expand Down Expand Up @@ -50,6 +53,18 @@ pub enum PipelineError {
},
}

/// A live streaming filter pipeline. LLM chunks go into `input_tx`;
/// processed chunks come out of `output_rx`. Each filter in the chain
/// is connected via a single bidirectional streaming HTTP connection.
#[derive(Debug)]
pub struct StreamingFilterPipeline {
pub input_tx: mpsc::Sender<Bytes>,
pub output_rx: mpsc::Receiver<Bytes>,
pub handles: Vec<tokio::task::JoinHandle<()>>,
}

const STREAMING_PIPELINE_BUFFER: usize = 16;

/// Service for processing agent pipelines
pub struct PipelineProcessor {
client: reqwest::Client,
Expand Down Expand Up @@ -429,6 +444,130 @@ impl PipelineProcessor {
session_id
}

/// Build headers for an HTTP raw filter request (shared by per-chunk and streaming paths).
fn build_raw_filter_headers(
request_headers: &HeaderMap,
agent_id: &str,
) -> Result<HeaderMap, PipelineError> {
let mut headers = request_headers.clone();
headers.remove(hyper::header::CONTENT_LENGTH);

headers.remove(TRACE_PARENT_HEADER);
global::get_text_map_propagator(|propagator| {
let cx =
tracing_opentelemetry::OpenTelemetrySpanExt::context(&tracing::Span::current());
propagator.inject_context(&cx, &mut HeaderInjector(&mut headers));
});

headers.insert(
ARCH_UPSTREAM_HOST_HEADER,
hyper::header::HeaderValue::from_str(agent_id)
.map_err(|_| PipelineError::AgentNotFound(agent_id.to_string()))?,
);
headers.insert(
ENVOY_RETRY_HEADER,
hyper::header::HeaderValue::from_str("3").unwrap(),
);
headers.insert(
"Accept",
hyper::header::HeaderValue::from_static("application/json, text/event-stream"),
);
headers.insert(
"Content-Type",
hyper::header::HeaderValue::from_static("application/octet-stream"),
);

Ok(headers)
}

/// Set up a bidirectional streaming output filter pipeline.
///
/// Opens one streaming POST per filter (using chunked transfer encoding)
/// and chains them: the response stream of filter N feeds the request body
/// of filter N+1. Returns a pipeline where the caller pushes LLM chunks
/// into `input_tx` and reads processed chunks from `output_rx`.
pub async fn start_streaming_output_pipeline(
agents: &[&Agent],
request_headers: &HeaderMap,
request_path: &str,
) -> Result<StreamingFilterPipeline, PipelineError> {
let client = reqwest::Client::builder()
.build()
.map_err(PipelineError::RequestFailed)?;

let (input_tx, first_rx) = mpsc::channel::<Bytes>(STREAMING_PIPELINE_BUFFER);
let mut current_rx = first_rx;
let mut handles = Vec::new();

for agent in agents {
let url = format!("{}{}", agent.url, request_path);
let headers = Self::build_raw_filter_headers(request_headers, &agent.id)?;

let body_stream = ReceiverStream::new(current_rx).map(Ok::<_, std::io::Error>);
let body = reqwest::Body::wrap_stream(body_stream);

debug!(agent = %agent.id, url = %url, "opening streaming filter connection");

let response = client.post(&url).headers(headers).body(body).send().await?;

let http_status = response.status();
if !http_status.is_success() {
let error_body = response
.text()
.await
.unwrap_or_else(|_| "<unreadable>".to_string());
return Err(if http_status.is_client_error() {
PipelineError::ClientError {
agent: agent.id.clone(),
status: http_status.as_u16(),
body: error_body,
}
} else {
PipelineError::ServerError {
agent: agent.id.clone(),
status: http_status.as_u16(),
body: error_body,
}
});
}

let (next_tx, next_rx) = mpsc::channel::<Bytes>(STREAMING_PIPELINE_BUFFER);
let agent_id = agent.id.clone();

let handle = tokio::spawn(async move {
let mut resp_stream = response.bytes_stream();
while let Some(item) = resp_stream.next().await {
match item {
Ok(chunk) => {
if next_tx.send(chunk).await.is_err() {
debug!(agent = %agent_id, "streaming pipeline receiver dropped");
break;
}
}
Err(e) => {
warn!(agent = %agent_id, error = %e, "streaming filter response error");
break;
}
}
}
debug!(agent = %agent_id, "streaming filter stage completed");
});

handles.push(handle);
current_rx = next_rx;
}

info!(
filter_count = agents.len(),
"streaming output filter pipeline established"
);
Ok(StreamingFilterPipeline {
input_tx,
output_rx: current_rx,
handles,
})
}

/// Execute a raw bytes filter — POST bytes to agent.url, receive bytes back.
/// Used for input and output filters where the full raw request/response is passed through.
/// No MCP protocol wrapping; agent_type is ignored.
Expand All @@ -454,25 +593,7 @@ impl PipelineProcessor {
span.update_name(format!("execute_raw_filter ({})", agent.id));
});

let mut agent_headers = request_headers.clone();
agent_headers.remove(hyper::header::CONTENT_LENGTH);

agent_headers.remove(TRACE_PARENT_HEADER);
global::get_text_map_propagator(|propagator| {
let cx =
tracing_opentelemetry::OpenTelemetrySpanExt::context(&tracing::Span::current());
propagator.inject_context(&cx, &mut HeaderInjector(&mut agent_headers));
});

agent_headers.insert(
ARCH_UPSTREAM_HOST_HEADER,
hyper::header::HeaderValue::from_str(&agent.id)
.map_err(|_| PipelineError::AgentNotFound(agent.id.clone()))?,
);
agent_headers.insert(
ENVOY_RETRY_HEADER,
hyper::header::HeaderValue::from_str("3").unwrap(),
);
let mut agent_headers = Self::build_raw_filter_headers(request_headers, &agent.id)?;
agent_headers.insert(
"Accept",
hyper::header::HeaderValue::from_static("application/json"),
Expand All @@ -482,9 +603,6 @@ impl PipelineProcessor {
hyper::header::HeaderValue::from_static("application/json"),
);

// Append the original request path so the filter endpoint encodes the API format.
// e.g. agent.url="http://host/anonymize" + request_path="/v1/chat/completions"
// -> POST http://host/anonymize/v1/chat/completions
let url = format!("{}{}", agent.url, request_path);
debug!(agent = %agent.id, url = %url, "sending raw filter request");

Expand Down Expand Up @@ -682,6 +800,7 @@ mod tests {
tool: None,
url: server_url,
agent_type: None,
streaming: None,
};

let body = serde_json::json!({"messages": [{"role": "user", "content": "Hello"}]});
Expand Down Expand Up @@ -722,6 +841,7 @@ mod tests {
tool: None,
url: server_url,
agent_type: None,
streaming: None,
};

let body = serde_json::json!({"messages": [{"role": "user", "content": "Ping"}]});
Expand Down Expand Up @@ -775,6 +895,7 @@ mod tests {
tool: None,
url: server_url,
agent_type: None,
streaming: None,
};

let body = serde_json::json!({"messages": [{"role": "user", "content": "Hi"}]});
Expand All @@ -793,4 +914,29 @@ mod tests {
_ => panic!("Expected client error when isError flag is set"),
}
}

#[tokio::test]
async fn test_streaming_pipeline_connection_refused() {
let agent = Agent {
id: "unreachable".to_string(),
transport: None,
tool: None,
url: "http://127.0.0.1:1".to_string(),
agent_type: Some("http".to_string()),
streaming: Some(true),
};
let headers = HeaderMap::new();
let result = PipelineProcessor::start_streaming_output_pipeline(
&[&agent],
&headers,
"/v1/chat/completions",
)
.await;

assert!(result.is_err());
assert!(matches!(
result.unwrap_err(),
PipelineError::RequestFailed(_)
));
}
}
Loading
Loading