Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions crates/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/brightstaff/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ tokio-postgres = { version = "0.7", features = ["with-serde_json-1"] }
tokio-stream = "0.1"
time = { version = "0.3", features = ["formatting", "macros"] }
tracing = "0.1"
urlencoding = "2.1.3"
tracing-opentelemetry = "0.32.1"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
uuid = { version = "1.0", features = ["v4", "serde"] }
Expand Down
107 changes: 94 additions & 13 deletions crates/brightstaff/src/handlers/routing_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,26 +13,43 @@ use tracing::{debug, info, info_span, warn, Instrument};
use super::extract_or_generate_traceparent;
use crate::handlers::llm::model_selection::router_chat_get_upstream_model;
use crate::router::llm::RouterService;
use crate::router::policy_provider::PolicyProvider;
use crate::tracing::{collect_custom_trace_attributes, operation_component, set_service_name};

const ROUTING_POLICY_SIZE_WARNING_BYTES: usize = 5120;

/// Extracts `routing_policy` from a JSON body, returning the cleaned body bytes
/// and parsed preferences. The `routing_policy` field is removed from the JSON
/// before re-serializing so downstream parsers don't see the non-standard field.
/// Extracted routing metadata from a request body.
#[derive(Debug, Default)]
pub struct RoutingMetadata {
/// Inline routing policy (highest priority).
pub inline_policy: Option<Vec<ModelUsagePreference>>,
/// Policy ID for external policy provider lookup.
pub policy_id: Option<String>,
/// Revision for revision-aware caching.
pub revision: Option<u64>,
}

/// Extracts routing metadata from a JSON body, returning the cleaned body bytes
/// and parsed metadata.
///
/// Fields removed from the JSON before re-serializing:
/// - `routing_policy`: Inline routing preferences
/// - `policy_id`: External policy identifier
/// - `revision`: Policy revision for cache invalidation
///
/// If `warn_on_size` is true, logs a warning when the serialized policy exceeds 5KB.
pub fn extract_routing_policy(
pub fn extract_routing_metadata(
raw_bytes: &[u8],
warn_on_size: bool,
) -> Result<(Bytes, Option<Vec<ModelUsagePreference>>), String> {
) -> Result<(Bytes, RoutingMetadata), String> {
let mut json_body: serde_json::Value = serde_json::from_slice(raw_bytes)
.map_err(|err| format!("Failed to parse JSON: {}", err))?;

let preferences = json_body
.as_object_mut()
.and_then(|obj| obj.remove("routing_policy"))
.and_then(|policy_value| {
let mut metadata = RoutingMetadata::default();

if let Some(obj) = json_body.as_object_mut() {
// Extract inline routing_policy (highest priority)
if let Some(policy_value) = obj.remove("routing_policy") {
if warn_on_size {
let policy_str = serde_json::to_string(&policy_value).unwrap_or_default();
if policy_str.len() > ROUTING_POLICY_SIZE_WARNING_BYTES {
Expand All @@ -49,17 +66,81 @@ pub fn extract_routing_policy(
num_models = prefs.len(),
"using inline routing_policy from request body"
);
Some(prefs)
metadata.inline_policy = Some(prefs);
}
Err(err) => {
warn!(error = %err, "failed to parse routing_policy");
None
}
}
});
}

// Extract policy_id for external policy provider
if let Some(policy_id_value) = obj.remove("policy_id") {
if let Some(policy_id) = policy_id_value.as_str() {
debug!(policy_id = %policy_id, "extracted policy_id from request");
metadata.policy_id = Some(policy_id.to_string());
}
}

// Extract revision for revision-aware caching
if let Some(revision_value) = obj.remove("revision") {
if let Some(revision) = revision_value.as_u64() {
debug!(revision = revision, "extracted revision from request");
metadata.revision = Some(revision);
}
}
}

let bytes = Bytes::from(serde_json::to_vec(&json_body).unwrap());
Ok((bytes, preferences))
Ok((bytes, metadata))
}

/// Resolves routing preferences using the following priority:
/// 1. Inline `routing_policy` in request payload (highest priority)
/// 2. `policy_id` + `revision` → HTTP policy provider (with cache)
/// 3. None (fallback to default routing)
pub async fn resolve_routing_preferences(
metadata: RoutingMetadata,
policy_provider: Option<&PolicyProvider>,
) -> Option<Vec<ModelUsagePreference>> {
// Priority 1: Inline policy
if let Some(inline) = metadata.inline_policy {
return Some(inline);
}

// Priority 2: External policy provider
if let (Some(provider), Some(policy_id)) = (policy_provider, &metadata.policy_id) {
match provider.get_policy(policy_id, metadata.revision).await {
Ok(Some(policy)) => {
info!(
policy_id = %policy_id,
num_models = policy.len(),
"using policy from external provider"
);
return Some(policy);
}
Ok(None) => {
warn!(policy_id = %policy_id, "policy not found from external provider");
}
Err(err) => {
warn!(error = %err, policy_id = %policy_id, "failed to fetch policy from external provider");
}
}
}

// Priority 3: No preferences (fallback to default)
None
}

/// Backward-compatible function that only extracts inline routing_policy.
/// Deprecated: Use `extract_routing_metadata` instead.
#[deprecated(note = "Use extract_routing_metadata instead")]
pub fn extract_routing_policy(
raw_bytes: &[u8],
warn_on_size: bool,
) -> Result<(Bytes, Option<Vec<ModelUsagePreference>>), String> {
let (bytes, metadata) = extract_routing_metadata(raw_bytes, warn_on_size)?;
Ok((bytes, metadata.inline_policy))
}

#[derive(serde::Serialize)]
Expand Down
1 change: 1 addition & 0 deletions crates/brightstaff/src/router/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,6 @@ pub mod llm;
pub mod orchestrator;
pub mod orchestrator_model;
pub mod orchestrator_model_v1;
pub mod policy_provider;
pub mod router_model;
pub mod router_model_v1;
Loading