Skip to content

Commit b223979

Browse files
committed
fix: Add SSE streaming support to bridge proxy
Detects 'stream: true' in request body and forwards response as a stream instead of buffering the entire response. This fixes 502 errors for LLM streaming requests through the bridge.
1 parent 371bec7 commit b223979

File tree

3 files changed

+66
-7
lines changed

3 files changed

+66
-7
lines changed

Cargo.lock

Lines changed: 15 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/platform-server/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ sentry = { workspace = true }
5353
clap = { workspace = true }
5454

5555
# HTTP client (for challenge proxy)
56-
reqwest = { version = "0.12", features = ["json"] }
56+
reqwest = { version = "0.12", features = ["json", "stream"] }
5757

5858
# Concurrency
5959
parking_lot = { workspace = true }

crates/platform-server/src/api/bridge.rs

Lines changed: 50 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,17 @@ fn get_challenge_url(state: &AppState, challenge_name: &str) -> Option<String> {
6262
None
6363
}
6464

65+
/// Check if request body has "stream": true (SSE streaming request)
66+
fn is_streaming_request(body: &[u8]) -> bool {
67+
if let Ok(json) = serde_json::from_slice::<serde_json::Value>(body) {
68+
json.get("stream")
69+
.and_then(|v| v.as_bool())
70+
.unwrap_or(false)
71+
} else {
72+
false
73+
}
74+
}
75+
6576
/// Generic proxy to any challenge
6677
async fn proxy_to_challenge(
6778
state: &AppState,
@@ -106,10 +117,6 @@ async fn proxy_to_challenge(
106117
path.trim_start_matches('/')
107118
),
108119
};
109-
debug!(
110-
"Proxying to challenge '{}': {} -> {}",
111-
challenge_name, path, url
112-
);
113120

114121
let method = request.method().clone();
115122
let headers = request.headers().clone();
@@ -122,8 +129,23 @@ async fn proxy_to_challenge(
122129
}
123130
};
124131

125-
// Use shared HTTP client from AppState (avoids creating new client per request)
126-
let mut req_builder = state.http_client.request(method, &url);
132+
let is_streaming = is_streaming_request(&body_bytes);
133+
debug!(
134+
"Proxying to challenge '{}': {} -> {} (streaming: {})",
135+
challenge_name, path, url, is_streaming
136+
);
137+
138+
// For streaming endpoints, create a client without timeout
139+
let client = if is_streaming {
140+
reqwest::Client::builder()
141+
.timeout(std::time::Duration::from_secs(300)) // 5 min timeout for streaming
142+
.build()
143+
.unwrap_or_else(|_| state.http_client.clone())
144+
} else {
145+
state.http_client.clone()
146+
};
147+
148+
let mut req_builder = client.request(method, &url);
127149
for (key, value) in headers.iter() {
128150
if key != "host" && key != "content-length" {
129151
req_builder = req_builder.header(key, value);
@@ -139,6 +161,28 @@ async fn proxy_to_challenge(
139161
let status = resp.status();
140162
let headers = resp.headers().clone();
141163

164+
// For streaming endpoints, forward the response body as a stream
165+
if is_streaming && status.is_success() {
166+
debug!("Streaming response for {}", path);
167+
let stream = resp.bytes_stream();
168+
let body = Body::from_stream(stream);
169+
170+
let mut response = Response::builder().status(status);
171+
for (key, value) in headers.iter() {
172+
response = response.header(key, value);
173+
}
174+
// Ensure SSE headers are set
175+
response = response
176+
.header("Content-Type", "text/event-stream")
177+
.header("Cache-Control", "no-cache")
178+
.header("Connection", "keep-alive");
179+
180+
return response
181+
.body(body)
182+
.unwrap_or_else(|_| StatusCode::INTERNAL_SERVER_ERROR.into_response());
183+
}
184+
185+
// Non-streaming: read full body
142186
match resp.bytes().await {
143187
Ok(body) => {
144188
let mut response = Response::builder().status(status);

0 commit comments

Comments
 (0)