diff --git a/config/plano_config_schema.yaml b/config/plano_config_schema.yaml index 5190fecf7..a1bd15abb 100644 --- a/config/plano_config_schema.yaml +++ b/config/plano_config_schema.yaml @@ -9,6 +9,7 @@ properties: - 0.1-beta - 0.2.0 - v0.3.0 + - v0.4.0 agents: type: array @@ -470,6 +471,107 @@ properties: additionalProperties: false required: - jailbreak + routing_preferences: + type: array + items: + type: object + properties: + name: + type: string + description: + type: string + models: + type: array + items: + type: string + minItems: 1 + selection_policy: + type: object + properties: + prefer: + type: string + enum: + - cheapest + - fastest + - random + - none + additionalProperties: false + required: + - prefer + additionalProperties: false + required: + - name + - description + - models + - selection_policy + + model_metrics_sources: + type: array + items: + oneOf: + - type: object + properties: + type: + type: string + const: cost_metrics + url: + type: string + refresh_interval: + type: integer + minimum: 1 + auth: + type: object + properties: + type: + type: string + enum: + - bearer + token: + type: string + required: + - type + - token + additionalProperties: false + required: + - type + - url + additionalProperties: false + - type: object + properties: + type: + type: string + const: prometheus_metrics + url: + type: string + query: + type: string + refresh_interval: + type: integer + minimum: 1 + description: "Refresh interval in seconds" + required: + - type + - url + - query + additionalProperties: false + - type: object + properties: + type: + type: string + const: digitalocean_pricing + refresh_interval: + type: integer + minimum: 1 + description: "Refresh interval in seconds" + model_aliases: + type: object + description: "Map DO catalog keys (lowercase(creator)/model_id) to Plano model names used in routing_preferences. Example: 'openai/openai-gpt-oss-120b: openai/gpt-4o'" + additionalProperties: + type: string + required: + - type + additionalProperties: false + additionalProperties: false required: - version diff --git a/crates/brightstaff/src/handlers/llm/mod.rs b/crates/brightstaff/src/handlers/llm/mod.rs index 9d4a2dfb3..5b0898bbd 100644 --- a/crates/brightstaff/src/handlers/llm/mod.rs +++ b/crates/brightstaff/src/handlers/llm/mod.rs @@ -119,7 +119,7 @@ async fn llm_chat_inner( temperature, tool_names, user_message_preview, - inline_routing_policy, + inline_routing_preferences, client_api, provider_id, } = parsed; @@ -261,7 +261,7 @@ async fn llm_chat_inner( &traceparent, &request_path, &request_id, - inline_routing_policy, + inline_routing_preferences, ) .await } @@ -323,7 +323,7 @@ struct PreparedRequest { temperature: Option, tool_names: Option>, user_message_preview: Option, - inline_routing_policy: Option>, + inline_routing_preferences: Option>, client_api: Option, provider_id: hermesllm::ProviderId, } @@ -352,16 +352,14 @@ async fn parse_and_validate_request( "request body received" ); - // Extract routing_policy from request body if present - let (chat_request_bytes, inline_routing_policy) = - crate::handlers::routing_service::extract_routing_policy(&raw_bytes, false).map_err( - |err| { - warn!(error = %err, "failed to parse request JSON"); - let mut r = Response::new(full(format!("Failed to parse request: {}", err))); - *r.status_mut() = StatusCode::BAD_REQUEST; - r - }, - )?; + // Extract routing_preferences from request body if present + let (chat_request_bytes, inline_routing_preferences) = + crate::handlers::routing_service::extract_routing_policy(&raw_bytes).map_err(|err| { + warn!(error = %err, "failed to parse request JSON"); + let mut r = Response::new(full(format!("Failed to parse request: {}", err))); + *r.status_mut() = StatusCode::BAD_REQUEST; + r + })?; let api_type = SupportedAPIsFromClient::from_endpoint(request_path).ok_or_else(|| { warn!(path = %request_path, "unsupported endpoint"); @@ -439,7 +437,7 @@ async fn parse_and_validate_request( temperature, tool_names, user_message_preview, - inline_routing_policy, + inline_routing_preferences, client_api, provider_id, }) diff --git a/crates/brightstaff/src/handlers/llm/model_selection.rs b/crates/brightstaff/src/handlers/llm/model_selection.rs index 455b7c0e2..1f5aea712 100644 --- a/crates/brightstaff/src/handlers/llm/model_selection.rs +++ b/crates/brightstaff/src/handlers/llm/model_selection.rs @@ -1,6 +1,6 @@ -use common::configuration::ModelUsagePreference; +use common::configuration::TopLevelRoutingPreference; use hermesllm::clients::endpoints::SupportedUpstreamAPIs; -use hermesllm::{ProviderRequest, ProviderRequestType}; +use hermesllm::ProviderRequestType; use hyper::StatusCode; use std::sync::Arc; use tracing::{debug, info, warn}; @@ -10,7 +10,10 @@ use crate::streaming::truncate_message; use crate::tracing::routing; pub struct RoutingResult { + /// Primary model to use (first in the ranked list). pub model_name: String, + /// Full ranked list — use subsequent entries as fallbacks on 429/5xx. + pub models: Vec, pub route_name: Option, } @@ -39,11 +42,8 @@ pub async fn router_chat_get_upstream_model( traceparent: &str, request_path: &str, request_id: &str, - inline_usage_preferences: Option>, + inline_routing_preferences: Option>, ) -> Result { - // Clone metadata for routing before converting (which consumes client_request) - let routing_metadata = client_request.metadata().clone(); - // Convert to ChatCompletionsRequest for routing (regardless of input type) let chat_request = match ProviderRequestType::try_from(( client_request, @@ -78,22 +78,6 @@ pub async fn router_chat_get_upstream_model( "router request" ); - // Use inline preferences if provided, otherwise fall back to metadata extraction - let usage_preferences: Option> = if inline_usage_preferences.is_some() - { - inline_usage_preferences - } else { - let usage_preferences_str: Option = - routing_metadata.as_ref().and_then(|metadata| { - metadata - .get("plano_preference_config") - .map(|value| value.to_string()) - }); - usage_preferences_str - .as_ref() - .and_then(|s| serde_yaml::from_str(s).ok()) - }; - // Prepare log message with latest message from chat request let latest_message_for_log = chat_request .messages @@ -107,7 +91,6 @@ pub async fn router_chat_get_upstream_model( let latest_message_for_log = truncate_message(&latest_message_for_log, 50); info!( - has_usage_preferences = usage_preferences.is_some(), path = %request_path, latest_message = %latest_message_for_log, "processing router request" @@ -121,7 +104,7 @@ pub async fn router_chat_get_upstream_model( .determine_route( &chat_request.messages, traceparent, - usage_preferences, + inline_routing_preferences, request_id, ) .await; @@ -132,10 +115,12 @@ pub async fn router_chat_get_upstream_model( match routing_result { Ok(route) => match route { - Some((route_name, model_name)) => { + Some((route_name, ranked_models)) => { + let model_name = ranked_models.first().cloned().unwrap_or_default(); current_span.record("route.selected_model", model_name.as_str()); Ok(RoutingResult { model_name, + models: ranked_models, route_name: Some(route_name), }) } @@ -147,6 +132,7 @@ pub async fn router_chat_get_upstream_model( Ok(RoutingResult { model_name: "none".to_string(), + models: vec!["none".to_string()], route_name: None, }) } diff --git a/crates/brightstaff/src/handlers/routing_service.rs b/crates/brightstaff/src/handlers/routing_service.rs index ec09f06fb..0411f90a4 100644 --- a/crates/brightstaff/src/handlers/routing_service.rs +++ b/crates/brightstaff/src/handlers/routing_service.rs @@ -1,5 +1,5 @@ use bytes::Bytes; -use common::configuration::{ModelUsagePreference, SpanAttributes}; +use common::configuration::{SpanAttributes, TopLevelRoutingPreference}; use common::consts::REQUEST_ID_HEADER; use common::errors::BrightStaffError; use hermesllm::clients::SupportedAPIsFromClient; @@ -15,56 +15,42 @@ use crate::handlers::llm::model_selection::router_chat_get_upstream_model; use crate::router::llm::RouterService; use crate::tracing::{collect_custom_trace_attributes, operation_component, set_service_name}; -const ROUTING_POLICY_SIZE_WARNING_BYTES: usize = 5120; - -/// Extracts `routing_policy` from a JSON body, returning the cleaned body bytes -/// and parsed preferences. The `routing_policy` field is removed from the JSON -/// before re-serializing so downstream parsers don't see the non-standard field. -/// -/// If `warn_on_size` is true, logs a warning when the serialized policy exceeds 5KB. +/// Extracts `routing_preferences` from a JSON body, returning the cleaned body bytes +/// and the parsed preferences. The field is removed from the JSON before re-serializing +/// so downstream parsers don't see it. pub fn extract_routing_policy( raw_bytes: &[u8], - warn_on_size: bool, -) -> Result<(Bytes, Option>), String> { +) -> Result<(Bytes, Option>), String> { let mut json_body: serde_json::Value = serde_json::from_slice(raw_bytes) .map_err(|err| format!("Failed to parse JSON: {}", err))?; - let preferences = json_body + let routing_preferences = json_body .as_object_mut() - .and_then(|obj| obj.remove("routing_policy")) - .and_then(|policy_value| { - if warn_on_size { - let policy_str = serde_json::to_string(&policy_value).unwrap_or_default(); - if policy_str.len() > ROUTING_POLICY_SIZE_WARNING_BYTES { - warn!( - size_bytes = policy_str.len(), - limit_bytes = ROUTING_POLICY_SIZE_WARNING_BYTES, - "routing_policy exceeds recommended size limit" - ); - } - } - match serde_json::from_value::>(policy_value) { + .and_then(|o| o.remove("routing_preferences")) + .and_then( + |value| match serde_json::from_value::>(value) { Ok(prefs) => { info!( - num_models = prefs.len(), - "using inline routing_policy from request body" + num_routes = prefs.len(), + "using inline routing_preferences from request body" ); Some(prefs) } Err(err) => { - warn!(error = %err, "failed to parse routing_policy"); + warn!(error = %err, "failed to parse routing_preferences"); None } - } - }); + }, + ); let bytes = Bytes::from(serde_json::to_vec(&json_body).unwrap()); - Ok((bytes, preferences)) + Ok((bytes, routing_preferences)) } #[derive(serde::Serialize)] struct RoutingDecisionResponse { - model: String, + /// Ranked model list — use first, fall back to next on 429/5xx. + models: Vec, route: Option, trace_id: String, } @@ -136,8 +122,9 @@ async fn routing_decision_inner( "routing decision request body received" ); - // Extract routing_policy from request body before parsing as ProviderRequestType - let (chat_request_bytes, inline_preferences) = match extract_routing_policy(&raw_bytes, true) { + // Extract routing_preferences from body before parsing as ProviderRequestType + let (chat_request_bytes, inline_routing_preferences) = match extract_routing_policy(&raw_bytes) + { Ok(result) => result, Err(err) => { warn!(error = %err, "failed to parse request JSON"); @@ -164,27 +151,27 @@ async fn routing_decision_inner( } }; - // Call the existing routing logic with inline preferences let routing_result = router_chat_get_upstream_model( router_service, client_request, &traceparent, &request_path, &request_id, - inline_preferences, + inline_routing_preferences, ) .await; match routing_result { Ok(result) => { let response = RoutingDecisionResponse { - model: result.model_name, + models: result.models, route: result.route_name, trace_id, }; info!( - model = %response.model, + primary_model = %response.models.first().map(|s| s.as_str()).unwrap_or("none"), + total_models = response.models.len(), route = ?response.route, "routing decision completed" ); @@ -227,101 +214,70 @@ mod tests { #[test] fn extract_routing_policy_no_policy() { let body = make_chat_body(""); - let (cleaned, prefs) = extract_routing_policy(&body, false).unwrap(); + let (cleaned, prefs) = extract_routing_policy(&body).unwrap(); assert!(prefs.is_none()); let cleaned_json: serde_json::Value = serde_json::from_slice(&cleaned).unwrap(); assert_eq!(cleaned_json["model"], "gpt-4o-mini"); - assert!(cleaned_json.get("routing_policy").is_none()); - } - - #[test] - fn extract_routing_policy_valid_policy() { - let policy = r#""routing_policy": [ - { - "model": "openai/gpt-4o", - "routing_preferences": [ - {"name": "coding", "description": "code generation tasks"} - ] - }, - { - "model": "openai/gpt-4o-mini", - "routing_preferences": [ - {"name": "general", "description": "general questions"} - ] - } - ]"#; - let body = make_chat_body(policy); - let (cleaned, prefs) = extract_routing_policy(&body, false).unwrap(); - - let prefs = prefs.expect("should have parsed preferences"); - assert_eq!(prefs.len(), 2); - assert_eq!(prefs[0].model, "openai/gpt-4o"); - assert_eq!(prefs[0].routing_preferences[0].name, "coding"); - assert_eq!(prefs[1].model, "openai/gpt-4o-mini"); - assert_eq!(prefs[1].routing_preferences[0].name, "general"); - - // routing_policy should be stripped from cleaned body - let cleaned_json: serde_json::Value = serde_json::from_slice(&cleaned).unwrap(); - assert!(cleaned_json.get("routing_policy").is_none()); - assert_eq!(cleaned_json["model"], "gpt-4o-mini"); - } - - #[test] - fn extract_routing_policy_invalid_policy_returns_none() { - // routing_policy is present but has wrong shape - let policy = r#""routing_policy": "not-an-array""#; - let body = make_chat_body(policy); - let (cleaned, prefs) = extract_routing_policy(&body, false).unwrap(); - - // Invalid policy should be ignored (returns None), not error - assert!(prefs.is_none()); - // routing_policy should still be stripped from cleaned body - let cleaned_json: serde_json::Value = serde_json::from_slice(&cleaned).unwrap(); - assert!(cleaned_json.get("routing_policy").is_none()); } #[test] fn extract_routing_policy_invalid_json_returns_error() { let body = b"not valid json"; - let result = extract_routing_policy(body, false); + let result = extract_routing_policy(body); assert!(result.is_err()); assert!(result.unwrap_err().contains("Failed to parse JSON")); } #[test] - fn extract_routing_policy_empty_array() { - let policy = r#""routing_policy": []"#; + fn extract_routing_policy_routing_preferences() { + let policy = r#""routing_preferences": [ + { + "name": "code generation", + "description": "generate new code", + "models": ["openai/gpt-4o", "openai/gpt-4o-mini"], + "selection_policy": {"prefer": "fastest"} + } + ]"#; let body = make_chat_body(policy); - let (_, prefs) = extract_routing_policy(&body, false).unwrap(); + let (cleaned, prefs) = extract_routing_policy(&body).unwrap(); + + let prefs = prefs.expect("should have parsed routing_preferences"); + assert_eq!(prefs.len(), 1); + assert_eq!(prefs[0].name, "code generation"); + assert_eq!(prefs[0].models, vec!["openai/gpt-4o", "openai/gpt-4o-mini"]); - let prefs = prefs.expect("empty array is valid"); - assert_eq!(prefs.len(), 0); + let cleaned_json: serde_json::Value = serde_json::from_slice(&cleaned).unwrap(); + assert!(cleaned_json.get("routing_preferences").is_none()); } #[test] fn extract_routing_policy_preserves_other_fields() { - let policy = r#""routing_policy": [{"model": "gpt-4o", "routing_preferences": [{"name": "test", "description": "test"}]}], "temperature": 0.5, "max_tokens": 100"#; + let policy = r#""routing_preferences": [{"name": "test", "description": "test", "models": ["gpt-4o"], "selection_policy": {"prefer": "none"}}], "temperature": 0.5, "max_tokens": 100"#; let body = make_chat_body(policy); - let (cleaned, prefs) = extract_routing_policy(&body, false).unwrap(); + let (cleaned, prefs) = extract_routing_policy(&body).unwrap(); assert!(prefs.is_some()); let cleaned_json: serde_json::Value = serde_json::from_slice(&cleaned).unwrap(); assert_eq!(cleaned_json["temperature"], 0.5); assert_eq!(cleaned_json["max_tokens"], 100); - assert!(cleaned_json.get("routing_policy").is_none()); + assert!(cleaned_json.get("routing_preferences").is_none()); } #[test] fn routing_decision_response_serialization() { let response = RoutingDecisionResponse { - model: "openai/gpt-4o".to_string(), + models: vec![ + "openai/gpt-4o-mini".to_string(), + "openai/gpt-4o".to_string(), + ], route: Some("code_generation".to_string()), trace_id: "abc123".to_string(), }; let json = serde_json::to_string(&response).unwrap(); let parsed: serde_json::Value = serde_json::from_str(&json).unwrap(); - assert_eq!(parsed["model"], "openai/gpt-4o"); + assert_eq!(parsed["models"][0], "openai/gpt-4o-mini"); + assert_eq!(parsed["models"][1], "openai/gpt-4o"); assert_eq!(parsed["route"], "code_generation"); assert_eq!(parsed["trace_id"], "abc123"); } @@ -329,13 +285,13 @@ mod tests { #[test] fn routing_decision_response_serialization_no_route() { let response = RoutingDecisionResponse { - model: "none".to_string(), + models: vec!["none".to_string()], route: None, trace_id: "abc123".to_string(), }; let json = serde_json::to_string(&response).unwrap(); let parsed: serde_json::Value = serde_json::from_str(&json).unwrap(); - assert_eq!(parsed["model"], "none"); + assert_eq!(parsed["models"][0], "none"); assert!(parsed["route"].is_null()); } } diff --git a/crates/brightstaff/src/main.rs b/crates/brightstaff/src/main.rs index 60a69bca6..19ef9efbc 100644 --- a/crates/brightstaff/src/main.rs +++ b/crates/brightstaff/src/main.rs @@ -6,6 +6,7 @@ use brightstaff::handlers::llm::llm_chat; use brightstaff::handlers::models::list_models; use brightstaff::handlers::routing_service::routing_decision; use brightstaff::router::llm::RouterService; +use brightstaff::router::model_metrics::ModelMetricsService; use brightstaff::router::orchestrator::OrchestratorService; use brightstaff::state::memory::MemoryConversationalStorage; use brightstaff::state::postgresql::PostgreSQLConversationStorage; @@ -40,6 +41,17 @@ const DEFAULT_ROUTING_MODEL_NAME: &str = "Arch-Router"; const DEFAULT_ORCHESTRATOR_LLM_PROVIDER: &str = "plano-orchestrator"; const DEFAULT_ORCHESTRATOR_MODEL_NAME: &str = "Plano-Orchestrator"; +/// Parse a version string like `v0.4.0`, `v0.3.0`, `0.2.0` into a `(major, minor, patch)` tuple. +/// Missing parts default to 0. Non-numeric parts are treated as 0. +fn parse_semver(version: &str) -> (u32, u32, u32) { + let v = version.trim_start_matches('v'); + let mut parts = v.splitn(3, '.').map(|p| p.parse::().unwrap_or(0)); + let major = parts.next().unwrap_or(0); + let minor = parts.next().unwrap_or(0); + let patch = parts.next().unwrap_or(0); + (major, minor, patch) +} + /// CORS pre-flight response for the models endpoint. fn cors_preflight() -> Result>, hyper::Error> { let mut response = Response::new(empty()); @@ -162,8 +174,150 @@ async fn init_app_state( .map(|p| p.name.clone()) .unwrap_or_else(|| DEFAULT_ROUTING_LLM_PROVIDER.to_string()); + // Validate that top-level routing_preferences requires v0.4.0+. + let config_version = parse_semver(&config.version); + let is_v040_plus = config_version >= (0, 4, 0); + + if !is_v040_plus && config.routing_preferences.is_some() { + return Err( + "top-level routing_preferences requires version v0.4.0 or above. \ + Update the version field or remove routing_preferences." + .into(), + ); + } + + // Validate that all models referenced in top-level routing_preferences exist in model_providers. + // The CLI renders model_providers with `name` = "openai/gpt-4o" and `model` = "gpt-4o", + // so we accept a match against either field. + if let Some(ref route_prefs) = config.routing_preferences { + let provider_model_names: std::collections::HashSet<&str> = config + .model_providers + .iter() + .flat_map(|p| std::iter::once(p.name.as_str()).chain(p.model.as_deref())) + .collect(); + for pref in route_prefs { + for model in &pref.models { + if !provider_model_names.contains(model.as_str()) { + return Err(format!( + "routing_preferences route '{}' references model '{}' \ + which is not declared in model_providers", + pref.name, model + ) + .into()); + } + } + } + } + + // Validate and initialize ModelMetricsService if model_metrics_sources is configured. + let metrics_service: Option> = if let Some(ref sources) = + config.model_metrics_sources + { + use common::configuration::MetricsSource; + let cost_count = sources + .iter() + .filter(|s| matches!(s, MetricsSource::CostMetrics { .. })) + .count(); + let prom_count = sources + .iter() + .filter(|s| matches!(s, MetricsSource::PrometheusMetrics { .. })) + .count(); + let do_count = sources + .iter() + .filter(|s| matches!(s, MetricsSource::DigitalOceanPricing { .. })) + .count(); + if cost_count > 1 { + return Err("model_metrics_sources: only one cost_metrics source is allowed".into()); + } + if prom_count > 1 { + return Err( + "model_metrics_sources: only one prometheus_metrics source is allowed".into(), + ); + } + if do_count > 1 { + return Err( + "model_metrics_sources: only one digitalocean_pricing source is allowed".into(), + ); + } + if cost_count > 0 && do_count > 0 { + return Err( + "model_metrics_sources: cost_metrics and digitalocean_pricing cannot both be configured — use one or the other".into(), + ); + } + let svc = ModelMetricsService::new(sources, reqwest::Client::new()).await; + Some(Arc::new(svc)) + } else { + None + }; + + // Validate that selection_policy.prefer is compatible with the configured metric sources. + if let Some(ref prefs) = config.routing_preferences { + use common::configuration::{MetricsSource, SelectionPreference}; + + let has_cost_source = config + .model_metrics_sources + .as_deref() + .unwrap_or_default() + .iter() + .any(|s| { + matches!( + s, + MetricsSource::CostMetrics { .. } | MetricsSource::DigitalOceanPricing { .. } + ) + }); + let has_prometheus = config + .model_metrics_sources + .as_deref() + .unwrap_or_default() + .iter() + .any(|s| matches!(s, MetricsSource::PrometheusMetrics { .. })); + + for pref in prefs { + if pref.selection_policy.prefer == SelectionPreference::Cheapest && !has_cost_source { + return Err(format!( + "routing_preferences route '{}' uses prefer: cheapest but no cost data source is configured — \ + add cost_metrics or digitalocean_pricing to model_metrics_sources", + pref.name + ) + .into()); + } + if pref.selection_policy.prefer == SelectionPreference::Fastest && !has_prometheus { + return Err(format!( + "routing_preferences route '{}' uses prefer: fastest but no prometheus_metrics source is configured — \ + add prometheus_metrics to model_metrics_sources", + pref.name + ) + .into()); + } + } + } + + // Warn about models in routing_preferences that have no matching pricing/latency data. + if let (Some(ref prefs), Some(ref svc)) = (&config.routing_preferences, &metrics_service) { + let cost_data = svc.cost_snapshot().await; + let latency_data = svc.latency_snapshot().await; + for pref in prefs { + use common::configuration::SelectionPreference; + for model in &pref.models { + let missing = match pref.selection_policy.prefer { + SelectionPreference::Cheapest => !cost_data.contains_key(model.as_str()), + SelectionPreference::Fastest => !latency_data.contains_key(model.as_str()), + _ => false, + }; + if missing { + warn!( + model = %model, + route = %pref.name, + "model has no metric data — will be ranked last" + ); + } + } + } + } + let router_service = Arc::new(RouterService::new( - config.model_providers.clone(), + config.routing_preferences.clone(), + metrics_service, format!("{llm_provider_url}{CHAT_COMPLETIONS_PATH}"), routing_model_name, routing_llm_provider, diff --git a/crates/brightstaff/src/router/llm.rs b/crates/brightstaff/src/router/llm.rs index 7d27e80a2..305c548aa 100644 --- a/crates/brightstaff/src/router/llm.rs +++ b/crates/brightstaff/src/router/llm.rs @@ -1,15 +1,18 @@ use std::{collections::HashMap, sync::Arc}; use common::{ - configuration::{LlmProvider, ModelUsagePreference, RoutingPreference}, + configuration::TopLevelRoutingPreference, consts::{ARCH_PROVIDER_HINT_HEADER, REQUEST_ID_HEADER, TRACE_PARENT_HEADER}, }; + +use super::router_model::{ModelUsagePreference, RoutingPreference}; use hermesllm::apis::openai::Message; use hyper::header; use thiserror::Error; use tracing::{debug, info}; use super::http::{self, post_and_extract_content}; +use super::model_metrics::ModelMetricsService; use super::router_model::RouterModel; use crate::router::router_model_v1; @@ -19,7 +22,8 @@ pub struct RouterService { client: reqwest::Client, router_model: Arc, routing_provider_name: String, - llm_usage_defined: bool, + top_level_preferences: HashMap, + metrics_service: Option>, } #[derive(Debug, Error)] @@ -35,29 +39,37 @@ pub type Result = std::result::Result; impl RouterService { pub fn new( - providers: Vec, + top_level_prefs: Option>, + metrics_service: Option>, router_url: String, routing_model_name: String, routing_provider_name: String, ) -> Self { - let providers_with_usage = providers - .iter() - .filter(|provider| provider.routing_preferences.is_some()) - .cloned() - .collect::>(); - - let llm_routes: HashMap> = providers_with_usage + let top_level_preferences: HashMap = top_level_prefs + .map_or_else(HashMap::new, |prefs| { + prefs.into_iter().map(|p| (p.name.clone(), p)).collect() + }); + + // Build sentinel routes for RouterModelV1: route_name → first model. + // RouterModelV1 uses this to build its prompt; RouterService overrides + // the model selection via rank_models() after the route is determined. + let sentinel_routes: HashMap> = top_level_preferences .iter() - .filter_map(|provider| { - provider - .routing_preferences - .as_ref() - .map(|prefs| (provider.name.clone(), prefs.clone())) + .filter_map(|(name, pref)| { + pref.models.first().map(|first_model| { + ( + first_model.clone(), + vec![RoutingPreference { + name: name.clone(), + description: pref.description.clone(), + }], + ) + }) }) .collect(); let router_model = Arc::new(router_model_v1::RouterModelV1::new( - llm_routes, + sentinel_routes, routing_model_name, router_model_v1::MAX_TOKEN_LEN, )); @@ -67,7 +79,8 @@ impl RouterService { client: reqwest::Client::new(), router_model, routing_provider_name, - llm_usage_defined: !providers_with_usage.is_empty(), + top_level_preferences, + metrics_service, } } @@ -75,24 +88,43 @@ impl RouterService { &self, messages: &[Message], traceparent: &str, - usage_preferences: Option>, + inline_routing_preferences: Option>, request_id: &str, - ) -> Result> { + ) -> Result)>> { if messages.is_empty() { return Ok(None); } - if usage_preferences - .as_ref() - .is_none_or(|prefs| prefs.len() < 2) - && !self.llm_usage_defined - { + // Build inline top-level map from request if present (inline overrides config). + let inline_top_map: Option> = + inline_routing_preferences + .map(|prefs| prefs.into_iter().map(|p| (p.name.clone(), p)).collect()); + + // No routing defined — skip the router call entirely. + if inline_top_map.is_none() && self.top_level_preferences.is_empty() { return Ok(None); } + // For inline overrides, build synthetic ModelUsagePreference list so RouterModelV1 + // generates the correct prompt (route name + description pairs). + // For config-level prefs the sentinel routes are already baked into RouterModelV1. + let effective_usage_preferences: Option> = + inline_top_map.as_ref().map(|inline_map| { + inline_map + .values() + .map(|p| ModelUsagePreference { + model: p.models.first().cloned().unwrap_or_default(), + routing_preferences: vec![RoutingPreference { + name: p.name.clone(), + description: p.description.clone(), + }], + }) + .collect() + }); + let router_request = self .router_model - .generate_request(messages, &usage_preferences); + .generate_request(messages, &effective_usage_preferences); debug!( model = %self.router_model.get_model_name(), @@ -132,17 +164,37 @@ impl RouterService { return Ok(None); }; + // Parse the route name from the router response. let parsed = self .router_model - .parse_response(&content, &usage_preferences)?; + .parse_response(&content, &effective_usage_preferences)?; + + let result = if let Some((route_name, _sentinel)) = parsed { + let top_pref = inline_top_map + .as_ref() + .and_then(|m| m.get(&route_name)) + .or_else(|| self.top_level_preferences.get(&route_name)); + + if let Some(pref) = top_pref { + let ranked = match &self.metrics_service { + Some(svc) => svc.rank_models(&pref.models, &pref.selection_policy).await, + None => pref.models.clone(), + }; + Some((route_name, ranked)) + } else { + None + } + } else { + None + }; info!( content = %content.replace("\n", "\\n"), - selected_model = ?parsed, + selected_model = ?result, response_time_ms = elapsed.as_millis(), "arch-router determined route" ); - Ok(parsed) + Ok(result) } } diff --git a/crates/brightstaff/src/router/mod.rs b/crates/brightstaff/src/router/mod.rs index b010d80c9..2d9d00a75 100644 --- a/crates/brightstaff/src/router/mod.rs +++ b/crates/brightstaff/src/router/mod.rs @@ -1,5 +1,6 @@ pub(crate) mod http; pub mod llm; +pub mod model_metrics; pub mod orchestrator; pub mod orchestrator_model; pub mod orchestrator_model_v1; diff --git a/crates/brightstaff/src/router/model_metrics.rs b/crates/brightstaff/src/router/model_metrics.rs new file mode 100644 index 000000000..75b10b7cc --- /dev/null +++ b/crates/brightstaff/src/router/model_metrics.rs @@ -0,0 +1,438 @@ +use std::collections::HashMap; +use std::sync::Arc; +use std::time::Duration; + +use common::configuration::{MetricsSource, SelectionPolicy, SelectionPreference}; +use tokio::sync::RwLock; +use tracing::{info, warn}; + +const DO_PRICING_URL: &str = "https://api.digitalocean.com/v2/gen-ai/models/catalog"; + +pub struct ModelMetricsService { + cost: Arc>>, + latency: Arc>>, +} + +impl ModelMetricsService { + pub async fn new(sources: &[MetricsSource], client: reqwest::Client) -> Self { + let cost_data = Arc::new(RwLock::new(HashMap::new())); + let latency_data = Arc::new(RwLock::new(HashMap::new())); + + for source in sources { + match source { + MetricsSource::CostMetrics { + url, + refresh_interval, + auth, + } => { + let data = fetch_cost_metrics(url, auth.as_ref(), &client).await; + info!(models = data.len(), url = %url, "fetched cost metrics"); + *cost_data.write().await = data; + + if let Some(interval_secs) = refresh_interval { + let cost_clone = Arc::clone(&cost_data); + let client_clone = client.clone(); + let url = url.clone(); + let auth = auth.clone(); + let interval = Duration::from_secs(*interval_secs); + tokio::spawn(async move { + loop { + tokio::time::sleep(interval).await; + let data = + fetch_cost_metrics(&url, auth.as_ref(), &client_clone).await; + info!(models = data.len(), url = %url, "refreshed cost metrics"); + *cost_clone.write().await = data; + } + }); + } + } + MetricsSource::PrometheusMetrics { + url, + query, + refresh_interval, + } => { + let data = fetch_prometheus_metrics(url, query, &client).await; + info!(models = data.len(), url = %url, "fetched prometheus latency metrics"); + *latency_data.write().await = data; + + if let Some(interval_secs) = refresh_interval { + let latency_clone = Arc::clone(&latency_data); + let client_clone = client.clone(); + let url = url.clone(); + let query = query.clone(); + let interval = Duration::from_secs(*interval_secs); + tokio::spawn(async move { + loop { + tokio::time::sleep(interval).await; + let data = + fetch_prometheus_metrics(&url, &query, &client_clone).await; + info!(models = data.len(), url = %url, "refreshed prometheus latency metrics"); + *latency_clone.write().await = data; + } + }); + } + } + MetricsSource::DigitalOceanPricing { + refresh_interval, + model_aliases, + } => { + let aliases = model_aliases.clone().unwrap_or_default(); + let data = fetch_do_pricing(&client, &aliases).await; + info!(models = data.len(), "fetched digitalocean pricing"); + *cost_data.write().await = data; + + if let Some(interval_secs) = refresh_interval { + let cost_clone = Arc::clone(&cost_data); + let client_clone = client.clone(); + let interval = Duration::from_secs(*interval_secs); + tokio::spawn(async move { + loop { + tokio::time::sleep(interval).await; + let data = fetch_do_pricing(&client_clone, &aliases).await; + info!(models = data.len(), "refreshed digitalocean pricing"); + *cost_clone.write().await = data; + } + }); + } + } + } + } + + ModelMetricsService { + cost: cost_data, + latency: latency_data, + } + } + + /// Rank `models` by `policy`, returning them in preference order. + /// Models with no metric data are appended at the end in their original order. + pub async fn rank_models(&self, models: &[String], policy: &SelectionPolicy) -> Vec { + match policy.prefer { + SelectionPreference::Cheapest => { + let data = self.cost.read().await; + for m in models { + if !data.contains_key(m.as_str()) { + warn!(model = %m, "no cost data for model — ranking last (prefer: cheapest)"); + } + } + rank_by_ascending_metric(models, &data) + } + SelectionPreference::Fastest => { + let data = self.latency.read().await; + for m in models { + if !data.contains_key(m.as_str()) { + warn!(model = %m, "no latency data for model — ranking last (prefer: fastest)"); + } + } + rank_by_ascending_metric(models, &data) + } + SelectionPreference::Random => shuffle(models), + SelectionPreference::None => models.to_vec(), + } + } + + /// Returns a snapshot of the current cost data. Used at startup to warn about unmatched models. + pub async fn cost_snapshot(&self) -> HashMap { + self.cost.read().await.clone() + } + + /// Returns a snapshot of the current latency data. Used at startup to warn about unmatched models. + pub async fn latency_snapshot(&self) -> HashMap { + self.latency.read().await.clone() + } +} + +fn rank_by_ascending_metric(models: &[String], data: &HashMap) -> Vec { + let mut with_data: Vec<(&String, f64)> = models + .iter() + .filter_map(|m| data.get(m.as_str()).map(|v| (m, *v))) + .collect(); + with_data.sort_by(|a, b| a.1.partial_cmp(&b.1).unwrap_or(std::cmp::Ordering::Equal)); + + let without_data: Vec<&String> = models + .iter() + .filter(|m| !data.contains_key(m.as_str())) + .collect(); + + with_data + .iter() + .map(|(m, _)| (*m).clone()) + .chain(without_data.iter().map(|m| (*m).clone())) + .collect() +} + +fn shuffle(models: &[String]) -> Vec { + use std::time::{SystemTime, UNIX_EPOCH}; + let seed = SystemTime::now() + .duration_since(UNIX_EPOCH) + .map(|d| d.subsec_nanos() as usize) + .unwrap_or(0); + let mut result = models.to_vec(); + let mut state = seed; + for i in (1..result.len()).rev() { + state = state + .wrapping_mul(6364136223846793005) + .wrapping_add(1442695040888963407); + let j = state % (i + 1); + result.swap(i, j); + } + result +} + +#[derive(serde::Deserialize)] +struct CostEntry { + input_per_million: f64, + output_per_million: f64, +} + +async fn fetch_cost_metrics( + url: &str, + auth: Option<&common::configuration::MetricsAuth>, + client: &reqwest::Client, +) -> HashMap { + let mut req = client.get(url); + if let Some(auth) = auth { + if auth.auth_type == "bearer" { + req = req.header("Authorization", format!("Bearer {}", auth.token)); + } else { + warn!(auth_type = %auth.auth_type, "unsupported auth type for cost_metrics, skipping auth"); + } + } + match req.send().await { + Ok(resp) => match resp.json::>().await { + Ok(data) => data + .into_iter() + .map(|(k, v)| (k, v.input_per_million + v.output_per_million)) + .collect(), + Err(err) => { + warn!(error = %err, url = %url, "failed to parse cost metrics response"); + HashMap::new() + } + }, + Err(err) => { + warn!(error = %err, url = %url, "failed to fetch cost metrics"); + HashMap::new() + } + } +} + +#[derive(serde::Deserialize)] +struct DoModelList { + data: Vec, +} + +#[derive(serde::Deserialize)] +struct DoModel { + model_id: String, + pricing: Option, +} + +#[derive(serde::Deserialize)] +struct DoPricing { + input_price_per_million: Option, + output_price_per_million: Option, +} + +async fn fetch_do_pricing( + client: &reqwest::Client, + aliases: &HashMap, +) -> HashMap { + match client.get(DO_PRICING_URL).send().await { + Ok(resp) => match resp.json::().await { + Ok(list) => list + .data + .into_iter() + .filter_map(|m| { + let pricing = m.pricing?; + let raw_key = m.model_id.clone(); + let key = aliases.get(&raw_key).cloned().unwrap_or(raw_key); + let cost = pricing.input_price_per_million.unwrap_or(0.0) + + pricing.output_price_per_million.unwrap_or(0.0); + Some((key, cost)) + }) + .collect(), + Err(err) => { + warn!(error = %err, url = DO_PRICING_URL, "failed to parse digitalocean pricing response"); + HashMap::new() + } + }, + Err(err) => { + warn!(error = %err, url = DO_PRICING_URL, "failed to fetch digitalocean pricing"); + HashMap::new() + } + } +} + +#[derive(serde::Deserialize)] +struct PrometheusResponse { + data: PrometheusData, +} + +#[derive(serde::Deserialize)] +struct PrometheusData { + result: Vec, +} + +#[derive(serde::Deserialize)] +struct PrometheusResult { + metric: HashMap, + value: (f64, String), // (timestamp, value_str) +} + +async fn fetch_prometheus_metrics( + url: &str, + query: &str, + client: &reqwest::Client, +) -> HashMap { + let query_url = format!("{}/api/v1/query", url.trim_end_matches('/')); + match client + .get(&query_url) + .query(&[("query", query)]) + .send() + .await + { + Ok(resp) => match resp.json::().await { + Ok(prom) => prom + .data + .result + .into_iter() + .filter_map(|r| { + let model_name = r.metric.get("model_name")?.clone(); + let value: f64 = r.value.1.parse().ok()?; + Some((model_name, value)) + }) + .collect(), + Err(err) => { + warn!(error = %err, url = %query_url, "failed to parse prometheus response"); + HashMap::new() + } + }, + Err(err) => { + warn!(error = %err, url = %query_url, "failed to fetch prometheus metrics"); + HashMap::new() + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use common::configuration::SelectionPreference; + + fn make_policy(prefer: SelectionPreference) -> SelectionPolicy { + SelectionPolicy { prefer } + } + + #[test] + fn test_rank_by_ascending_metric_picks_lowest_first() { + let models = vec!["a".to_string(), "b".to_string(), "c".to_string()]; + let mut data = HashMap::new(); + data.insert("a".to_string(), 0.01); + data.insert("b".to_string(), 0.005); + data.insert("c".to_string(), 0.02); + assert_eq!( + rank_by_ascending_metric(&models, &data), + vec!["b", "a", "c"] + ); + } + + #[test] + fn test_rank_by_ascending_metric_no_data_preserves_order() { + let models = vec!["x".to_string(), "y".to_string()]; + let data = HashMap::new(); + assert_eq!(rank_by_ascending_metric(&models, &data), vec!["x", "y"]); + } + + #[test] + fn test_rank_by_ascending_metric_partial_data() { + let models = vec!["a".to_string(), "b".to_string()]; + let mut data = HashMap::new(); + data.insert("b".to_string(), 100.0); + assert_eq!(rank_by_ascending_metric(&models, &data), vec!["b", "a"]); + } + + #[tokio::test] + async fn test_rank_models_cheapest() { + let service = ModelMetricsService { + cost: Arc::new(RwLock::new({ + let mut m = HashMap::new(); + m.insert("gpt-4o".to_string(), 0.005); + m.insert("gpt-4o-mini".to_string(), 0.0001); + m + })), + latency: Arc::new(RwLock::new(HashMap::new())), + }; + let models = vec!["gpt-4o".to_string(), "gpt-4o-mini".to_string()]; + let result = service + .rank_models(&models, &make_policy(SelectionPreference::Cheapest)) + .await; + assert_eq!(result, vec!["gpt-4o-mini", "gpt-4o"]); + } + + #[tokio::test] + async fn test_rank_models_fastest() { + let service = ModelMetricsService { + cost: Arc::new(RwLock::new(HashMap::new())), + latency: Arc::new(RwLock::new({ + let mut m = HashMap::new(); + m.insert("gpt-4o".to_string(), 200.0); + m.insert("claude-sonnet".to_string(), 120.0); + m + })), + }; + let models = vec!["gpt-4o".to_string(), "claude-sonnet".to_string()]; + let result = service + .rank_models(&models, &make_policy(SelectionPreference::Fastest)) + .await; + assert_eq!(result, vec!["claude-sonnet", "gpt-4o"]); + } + + #[tokio::test] + async fn test_rank_models_fallback_no_metrics() { + let service = ModelMetricsService { + cost: Arc::new(RwLock::new(HashMap::new())), + latency: Arc::new(RwLock::new(HashMap::new())), + }; + let models = vec!["model-a".to_string(), "model-b".to_string()]; + let result = service + .rank_models(&models, &make_policy(SelectionPreference::Cheapest)) + .await; + assert_eq!(result, vec!["model-a", "model-b"]); + } + + #[tokio::test] + async fn test_rank_models_partial_data_appended_last() { + let service = ModelMetricsService { + cost: Arc::new(RwLock::new({ + let mut m = HashMap::new(); + m.insert("gpt-4o".to_string(), 0.005); + m + })), + latency: Arc::new(RwLock::new(HashMap::new())), + }; + let models = vec!["gpt-4o-mini".to_string(), "gpt-4o".to_string()]; + let result = service + .rank_models(&models, &make_policy(SelectionPreference::Cheapest)) + .await; + assert_eq!(result, vec!["gpt-4o", "gpt-4o-mini"]); + } + + #[tokio::test] + async fn test_rank_models_none_preserves_order() { + let service = ModelMetricsService { + cost: Arc::new(RwLock::new({ + let mut m = HashMap::new(); + m.insert("gpt-4o-mini".to_string(), 0.0001); + m.insert("gpt-4o".to_string(), 0.005); + m + })), + latency: Arc::new(RwLock::new(HashMap::new())), + }; + let models = vec!["gpt-4o".to_string(), "gpt-4o-mini".to_string()]; + let result = service + .rank_models(&models, &make_policy(SelectionPreference::None)) + .await; + // none → original order, despite gpt-4o-mini being cheaper + assert_eq!(result, vec!["gpt-4o", "gpt-4o-mini"]); + } +} diff --git a/crates/brightstaff/src/router/router_model.rs b/crates/brightstaff/src/router/router_model.rs index 372907af2..4fe023a3f 100644 --- a/crates/brightstaff/src/router/router_model.rs +++ b/crates/brightstaff/src/router/router_model.rs @@ -1,5 +1,5 @@ -use common::configuration::ModelUsagePreference; use hermesllm::apis::openai::{ChatCompletionsRequest, Message}; +use serde::{Deserialize, Serialize}; use thiserror::Error; #[derive(Debug, Error)] @@ -10,6 +10,20 @@ pub enum RoutingModelError { pub type Result = std::result::Result; +/// Internal route descriptor passed to the router model to build its prompt. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct RoutingPreference { + pub name: String, + pub description: String, +} + +/// Groups a model with its routing preferences (used internally by RouterModelV1). +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ModelUsagePreference { + pub model: String, + pub routing_preferences: Vec, +} + pub trait RouterModel: Send + Sync { fn generate_request( &self, diff --git a/crates/brightstaff/src/router/router_model_v1.rs b/crates/brightstaff/src/router/router_model_v1.rs index 430b4f8e3..82a187fdf 100644 --- a/crates/brightstaff/src/router/router_model_v1.rs +++ b/crates/brightstaff/src/router/router_model_v1.rs @@ -1,6 +1,6 @@ use std::collections::HashMap; -use common::configuration::{ModelUsagePreference, RoutingPreference}; +use super::router_model::{ModelUsagePreference, RoutingPreference}; use hermesllm::apis::openai::{ChatCompletionsRequest, Message, MessageContent, Role}; use hermesllm::transforms::lib::ExtractText; use serde::{Deserialize, Serialize}; diff --git a/crates/common/src/configuration.rs b/crates/common/src/configuration.rs index df1790594..b3a42d643 100644 --- a/crates/common/src/configuration.rs +++ b/crates/common/src/configuration.rs @@ -104,6 +104,58 @@ pub enum StateStorageType { Postgres, } +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +#[serde(rename_all = "lowercase")] +pub enum SelectionPreference { + Cheapest, + Fastest, + Random, + /// Return models in the same order they were defined — no reordering. + None, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct SelectionPolicy { + pub prefer: SelectionPreference, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct TopLevelRoutingPreference { + pub name: String, + pub description: String, + pub models: Vec, + pub selection_policy: SelectionPolicy, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct MetricsAuth { + #[serde(rename = "type")] + pub auth_type: String, // only "bearer" supported + pub token: String, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(tag = "type", rename_all = "snake_case")] +pub enum MetricsSource { + CostMetrics { + url: String, + refresh_interval: Option, + auth: Option, + }, + PrometheusMetrics { + url: String, + query: String, + refresh_interval: Option, + }, + #[serde(rename = "digitalocean_pricing")] + DigitalOceanPricing { + refresh_interval: Option, + /// Map DO catalog keys (`lowercase(creator)/model_id`) to Plano model names. + /// Example: `openai/openai-gpt-oss-120b: openai/gpt-4o` + model_aliases: Option>, + }, +} + #[derive(Debug, Clone, Serialize, Deserialize)] pub struct Configuration { pub version: String, @@ -122,6 +174,8 @@ pub struct Configuration { pub filters: Option>, pub listeners: Vec, pub state_storage: Option, + pub routing_preferences: Option>, + pub model_metrics_sources: Option>, } #[derive(Debug, Clone, Serialize, Deserialize, Default)] @@ -317,18 +371,6 @@ impl LlmProviderType { } } -#[derive(Serialize, Deserialize, Debug)] -pub struct ModelUsagePreference { - pub model: String, - pub routing_preferences: Vec, -} - -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct RoutingPreference { - pub name: String, - pub description: String, -} - #[derive(Serialize, Deserialize, Debug)] pub struct AgentUsagePreference { pub model: String, @@ -378,7 +420,6 @@ pub struct LlmProvider { pub port: Option, pub rate_limits: Option, pub usage: Option, - pub routing_preferences: Option>, pub cluster_name: Option, pub base_url_path_prefix: Option, pub internal: Option, @@ -422,7 +463,6 @@ impl Default for LlmProvider { port: None, rate_limits: None, usage: None, - routing_preferences: None, cluster_name: None, base_url_path_prefix: None, internal: None, diff --git a/crates/common/src/llm_providers.rs b/crates/common/src/llm_providers.rs index 3c9d1d68d..b5c03b30d 100644 --- a/crates/common/src/llm_providers.rs +++ b/crates/common/src/llm_providers.rs @@ -274,7 +274,6 @@ mod tests { port: None, rate_limits: None, usage: None, - routing_preferences: None, internal: None, stream: None, passthrough_auth: None, diff --git a/demos/llm_routing/model_routing_service/README.md b/demos/llm_routing/model_routing_service/README.md index 72b672f32..85306c3bf 100644 --- a/demos/llm_routing/model_routing_service/README.md +++ b/demos/llm_routing/model_routing_service/README.md @@ -13,42 +13,60 @@ Plano is an AI-native proxy and data plane for agentic apps — with built-in or - **One endpoint, many models** — apps call Plano using standard OpenAI/Anthropic APIs; Plano handles provider selection, keys, and failover - **Intelligent routing** — a lightweight 1.5B router model classifies user intent and picks the best model per request +- **Cost & latency ranking** — models are ranked by live cost (DigitalOcean pricing API) or latency (Prometheus) before returning the fallback list - **Platform governance** — centralize API keys, rate limits, guardrails, and observability without touching app code - **Runs anywhere** — single binary; self-host the router for full data privacy ## How Routing Works -The entire routing configuration is plain YAML — no code: +Routing is configured in top-level `routing_preferences` (requires `version: v0.4.0`): ```yaml -model_providers: - - model: openai/gpt-4o-mini - default: true # fallback for unmatched requests +version: v0.4.0 - - model: openai/gpt-4o - routing_preferences: - - name: complex_reasoning - description: complex reasoning tasks, multi-step analysis +routing_preferences: + - name: complex_reasoning + description: complex reasoning tasks, multi-step analysis, or detailed explanations + models: + - openai/gpt-4o + - openai/gpt-4o-mini + selection_policy: + prefer: cheapest # rank by live cost data - - model: anthropic/claude-sonnet-4-20250514 - routing_preferences: - - name: code_generation - description: generating new code, writing functions + - name: code_generation + description: generating new code, writing functions, or creating boilerplate + models: + - anthropic/claude-sonnet-4-20250514 + - openai/gpt-4o + selection_policy: + prefer: fastest # rank by Prometheus p95 latency ``` -When a request arrives, Plano sends the conversation and routing preferences to Arch-Router, which classifies the intent and returns the matching route: +### `selection_policy.prefer` values + +| Value | Behavior | +|---|---| +| `cheapest` | Sort models by ascending cost. Requires `cost_metrics` or `digitalocean_pricing` in `model_metrics_sources`. | +| `fastest` | Sort models by ascending P95 latency. Requires `prometheus_metrics` in `model_metrics_sources`. | +| `random` | Shuffle the model list on each request. | +| `none` | Return models in definition order — no reordering. | + +When a request arrives, Plano: + +1. Sends the conversation + route descriptions to Arch-Router for intent classification +2. Looks up the matched route and ranks its candidate models by cost or latency +3. Returns an ordered list — client uses `models[0]`, falls back to `models[1]` on 429/5xx ``` 1. Request arrives → "Write binary search in Python" -2. Preferences serialized → [{"name":"code_generation", ...}, {"name":"complex_reasoning", ...}] -3. Arch-Router classifies → {"route": "code_generation"} -4. Route → Model lookup → code_generation → anthropic/claude-sonnet-4-20250514 -5. Request forwarded → Claude generates the response +2. Arch-Router classifies → route: "code_generation" +3. Rank by latency → claude-sonnet (0.85s) < gpt-4o (1.2s) +4. Response → models: ["anthropic/claude-sonnet-4-20250514", "openai/gpt-4o"] ``` -No match? Arch-Router returns `other` → Plano falls back to the default model. +No match? Arch-Router returns `null` route → client falls back to the model in the original request. -The `/routing/v1/*` endpoints return the routing decision **without** forwarding to the LLM — useful for testing and validating routing behavior before going to production. +The `/routing/v1/*` endpoints return the routing decision **without** forwarding to the LLM — useful for testing routing behavior before going to production. ## Setup @@ -59,12 +77,28 @@ export OPENAI_API_KEY= export ANTHROPIC_API_KEY= ``` -Start Plano: +Start Prometheus and the mock latency metrics server: + ```bash cd demos/llm_routing/model_routing_service +docker compose up -d +``` + +Then start Plano: + +```bash planoai up config.yaml ``` +On startup you should see logs like: + +``` +fetched digitalocean pricing: N models +fetched prometheus latency metrics: 3 models +``` + +If a model in `routing_preferences` has no matching pricing or latency data, Plano logs a warning at startup — the model is still included but ranked last. + ## Run the demo ```bash @@ -95,13 +129,65 @@ curl http://localhost:12000/routing/v1/chat/completions \ Response: ```json { - "model": "anthropic/claude-sonnet-4-20250514", + "models": ["anthropic/claude-sonnet-4-20250514", "openai/gpt-4o"], "route": "code_generation", "trace_id": "c16d1096c1af4a17abb48fb182918a88" } ``` -The response tells you which model would handle this request and which route was matched, without actually making the LLM call. +The response contains the ranked model list — your client should try `models[0]` first and fall back to `models[1]` on 429 or 5xx errors. + +## Metrics Sources + +### DigitalOcean Pricing (`digitalocean_pricing`) + +Fetches public model pricing from the DigitalOcean Gen-AI catalog (no auth required). Model IDs are normalized as `lowercase(creator)/model_id`. Cost scalar = `input_price_per_million + output_price_per_million`. + +```yaml +model_metrics_sources: + - type: digitalocean_pricing + refresh_interval: 3600 # re-fetch every hour +``` + +### Prometheus Latency (`prometheus_metrics`) + +Queries a Prometheus instance for P95 latency. The PromQL expression must return an instant vector with a `model_name` label matching the model names in `routing_preferences`. + +```yaml +model_metrics_sources: + - type: prometheus_metrics + url: http://localhost:9090 + query: model_latency_p95_seconds + refresh_interval: 60 +``` + +The demo's `metrics_server.py` exposes mock latency data; `docker compose up -d` starts it alongside Prometheus. + +### Custom Cost Endpoint (`cost_metrics`) + +```yaml +model_metrics_sources: + - type: cost_metrics + url: https://my-internal-pricing-api/costs + auth: + type: bearer + token: $PRICING_TOKEN + refresh_interval: 300 +``` + +Expected response format: +```json +{ + "anthropic/claude-sonnet-4-20250514": { + "input_per_million": 3.0, + "output_per_million": 15.0 + }, + "openai/gpt-4o": { + "input_per_million": 5.0, + "output_per_million": 20.0 + } +} +``` ## Kubernetes Deployment (Self-hosted Arch-Router on GPU) @@ -119,7 +205,6 @@ GPU nodes commonly have a `nvidia.com/gpu:NoSchedule` taint — `vllm-deployment **1. Deploy Arch-Router and Plano:** ```bash - # arch-router deployment kubectl apply -f vllm-deployment.yaml @@ -165,39 +250,3 @@ kubectl create configmap plano-config \ --dry-run=client -o yaml | kubectl apply -f - kubectl rollout restart deployment/plano ``` - -## Demo Output - -``` -=== Model Routing Service Demo === - ---- 1. Code generation query (OpenAI format) --- -{ - "model": "anthropic/claude-sonnet-4-20250514", - "route": "code_generation", - "trace_id": "c16d1096c1af4a17abb48fb182918a88" -} - ---- 2. Complex reasoning query (OpenAI format) --- -{ - "model": "openai/gpt-4o", - "route": "complex_reasoning", - "trace_id": "30795e228aff4d7696f082ed01b75ad4" -} - ---- 3. Simple query - no routing match (OpenAI format) --- -{ - "model": "none", - "route": null, - "trace_id": "ae0b6c3b220d499fb5298ac63f4eac0e" -} - ---- 4. Code generation query (Anthropic format) --- -{ - "model": "anthropic/claude-sonnet-4-20250514", - "route": "code_generation", - "trace_id": "26be822bbdf14a3ba19fe198e55ea4a9" -} - -=== Demo Complete === -``` diff --git a/demos/llm_routing/model_routing_service/config.yaml b/demos/llm_routing/model_routing_service/config.yaml index 7b98b25b7..6f20134be 100644 --- a/demos/llm_routing/model_routing_service/config.yaml +++ b/demos/llm_routing/model_routing_service/config.yaml @@ -1,4 +1,4 @@ -version: v0.3.0 +version: v0.4.0 listeners: - type: model @@ -6,22 +6,48 @@ listeners: port: 12000 model_providers: - - model: openai/gpt-4o-mini access_key: $OPENAI_API_KEY default: true - model: openai/gpt-4o access_key: $OPENAI_API_KEY - routing_preferences: - - name: complex_reasoning - description: complex reasoning tasks, multi-step analysis, or detailed explanations - model: anthropic/claude-sonnet-4-20250514 access_key: $ANTHROPIC_API_KEY - routing_preferences: - - name: code_generation - description: generating new code, writing functions, or creating boilerplate -tracing: - random_sampling: 100 +routing_preferences: + - name: complex_reasoning + description: complex reasoning tasks, multi-step analysis, or detailed explanations + models: + - openai/gpt-4o + - openai/gpt-4o-mini + selection_policy: + prefer: cheapest + + - name: code_generation + description: generating new code, writing functions, or creating boilerplate + models: + - anthropic/claude-sonnet-4-20250514 + - openai/gpt-4o + selection_policy: + prefer: fastest + +model_metrics_sources: + - type: digitalocean_pricing + refresh_interval: 3600 + model_aliases: + openai-gpt-4o: openai/gpt-4o + openai-gpt-4o-mini: openai/gpt-4o-mini + anthropic-claude-sonnet-4: anthropic/claude-sonnet-4-20250514 + + # Use cost_metrics instead of digitalocean_pricing to supply your own pricing data. + # The demo metrics_server.py exposes /costs with OpenAI and Anthropic pricing. + # - type: cost_metrics + # url: http://localhost:8080/costs + # refresh_interval: 300 + + - type: prometheus_metrics + url: http://localhost:9090 + query: model_latency_p95_seconds + refresh_interval: 60 diff --git a/demos/llm_routing/model_routing_service/demo.sh b/demos/llm_routing/model_routing_service/demo.sh index 0c3fdc5d6..3ad102f11 100755 --- a/demos/llm_routing/model_routing_service/demo.sh +++ b/demos/llm_routing/model_routing_service/demo.sh @@ -8,9 +8,12 @@ echo "" echo "This demo shows how to use the /routing/v1/* endpoints to get" echo "routing decisions without actually proxying the request to an LLM." echo "" +echo "The response includes a ranked 'models' list — use models[0] as the" +echo "primary and fall back to models[1] on 429/5xx errors." +echo "" -# --- Example 1: OpenAI Chat Completions format --- -echo "--- 1. Code generation query (OpenAI format) ---" +# --- Example 1: Code generation (ranked by fastest) --- +echo "--- 1. Code generation query (prefer: fastest) ---" echo "" curl -s "$PLANO_URL/routing/v1/chat/completions" \ -H "Content-Type: application/json" \ @@ -22,8 +25,8 @@ curl -s "$PLANO_URL/routing/v1/chat/completions" \ }' | python3 -m json.tool echo "" -# --- Example 2: Complex reasoning query --- -echo "--- 2. Complex reasoning query (OpenAI format) ---" +# --- Example 2: Complex reasoning (ranked by cheapest) --- +echo "--- 2. Complex reasoning query (prefer: cheapest) ---" echo "" curl -s "$PLANO_URL/routing/v1/chat/completions" \ -H "Content-Type: application/json" \ @@ -36,7 +39,7 @@ curl -s "$PLANO_URL/routing/v1/chat/completions" \ echo "" # --- Example 3: Simple query (no routing match) --- -echo "--- 3. Simple query - no routing match (OpenAI format) ---" +echo "--- 3. Simple query - no routing match (falls back to request model) ---" echo "" curl -s "$PLANO_URL/routing/v1/chat/completions" \ -H "Content-Type: application/json" \ @@ -62,56 +65,45 @@ curl -s "$PLANO_URL/routing/v1/messages" \ }' | python3 -m json.tool echo "" -# --- Example 5: Inline routing policy in request body --- -echo "--- 5. Inline routing_policy (no config needed) ---" +# --- Example 5: Inline routing_preferences with prefer:cheapest --- +echo "--- 5. Inline routing_preferences (prefer: cheapest) ---" +echo " models[] will be sorted by ascending cost from DigitalOcean pricing" echo "" curl -s "$PLANO_URL/routing/v1/chat/completions" \ -H "Content-Type: application/json" \ -d '{ "model": "gpt-4o-mini", "messages": [ - {"role": "user", "content": "Write a quicksort implementation in Go"} + {"role": "user", "content": "Summarize the key differences between TCP and UDP"} ], - "routing_policy": [ + "routing_preferences": [ { - "model": "openai/gpt-4o", - "routing_preferences": [ - {"name": "coding", "description": "code generation, writing functions, debugging"} - ] - }, - { - "model": "openai/gpt-4o-mini", - "routing_preferences": [ - {"name": "general", "description": "general questions, simple lookups, casual conversation"} - ] + "name": "general", + "description": "general questions, explanations, and summaries", + "models": ["openai/gpt-4o", "openai/gpt-4o-mini"], + "selection_policy": {"prefer": "cheapest"} } ] }' | python3 -m json.tool echo "" -# --- Example 6: Inline routing policy with Anthropic format --- -echo "--- 6. Inline routing_policy (Anthropic format) ---" +# --- Example 6: Inline routing_preferences with prefer:fastest --- +echo "--- 6. Inline routing_preferences (prefer: fastest) ---" +echo " models[] will be sorted by ascending P95 latency from Prometheus" echo "" -curl -s "$PLANO_URL/routing/v1/messages" \ +curl -s "$PLANO_URL/routing/v1/chat/completions" \ -H "Content-Type: application/json" \ -d '{ "model": "gpt-4o-mini", - "max_tokens": 1024, "messages": [ - {"role": "user", "content": "What is the weather like today?"} + {"role": "user", "content": "Write a quicksort implementation in Go"} ], - "routing_policy": [ - { - "model": "openai/gpt-4o", - "routing_preferences": [ - {"name": "coding", "description": "code generation, writing functions, debugging"} - ] - }, + "routing_preferences": [ { - "model": "openai/gpt-4o-mini", - "routing_preferences": [ - {"name": "general", "description": "general questions, simple lookups, casual conversation"} - ] + "name": "coding", + "description": "code generation, writing functions, debugging", + "models": ["anthropic/claude-sonnet-4-20250514", "openai/gpt-4o", "openai/gpt-4o-mini"], + "selection_policy": {"prefer": "fastest"} } ] }' | python3 -m json.tool diff --git a/demos/llm_routing/model_routing_service/docker-compose.yaml b/demos/llm_routing/model_routing_service/docker-compose.yaml new file mode 100644 index 000000000..0f058e7be --- /dev/null +++ b/demos/llm_routing/model_routing_service/docker-compose.yaml @@ -0,0 +1,17 @@ +services: + prometheus: + image: prom/prometheus:latest + ports: + - "9090:9090" + volumes: + - ./prometheus.yaml:/etc/prometheus/prometheus.yml:ro + depends_on: + - model-metrics + + model-metrics: + image: python:3.11-slim + ports: + - "8080:8080" + volumes: + - ./metrics_server.py:/metrics_server.py:ro + command: python /metrics_server.py diff --git a/demos/llm_routing/model_routing_service/metrics_server.py b/demos/llm_routing/model_routing_service/metrics_server.py new file mode 100644 index 000000000..65a5a0a3b --- /dev/null +++ b/demos/llm_routing/model_routing_service/metrics_server.py @@ -0,0 +1,51 @@ +""" +Demo metrics server. + +Exposes two endpoints: + GET /metrics — Prometheus text format, P95 latency per model (scraped by Prometheus) + GET /costs — JSON cost data per model, compatible with cost_metrics source +""" +import json +from http.server import HTTPServer, BaseHTTPRequestHandler + +PROMETHEUS_METRICS = """\ +# HELP model_latency_p95_seconds P95 request latency in seconds per model +# TYPE model_latency_p95_seconds gauge +model_latency_p95_seconds{model_name="anthropic/claude-sonnet-4-20250514"} 0.85 +model_latency_p95_seconds{model_name="openai/gpt-4o"} 1.20 +model_latency_p95_seconds{model_name="openai/gpt-4o-mini"} 0.40 +""".encode() + +COST_DATA = { + "anthropic/claude-sonnet-4-20250514": { + "input_per_million": 3.0, + "output_per_million": 15.0, + }, + "openai/gpt-4o": {"input_per_million": 5.0, "output_per_million": 20.0}, + "openai/gpt-4o-mini": {"input_per_million": 0.15, "output_per_million": 0.6}, +} + + +class MetricsHandler(BaseHTTPRequestHandler): + def do_GET(self): + if self.path == "/costs": + body = json.dumps(COST_DATA).encode() + self.send_response(200) + self.send_header("Content-Type", "application/json") + self.end_headers() + self.wfile.write(body) + else: + # /metrics and everything else → Prometheus format + self.send_response(200) + self.send_header("Content-Type", "text/plain; version=0.0.4; charset=utf-8") + self.end_headers() + self.wfile.write(PROMETHEUS_METRICS) + + def log_message(self, fmt, *args): + pass # suppress access logs + + +if __name__ == "__main__": + server = HTTPServer(("", 8080), MetricsHandler) + print("metrics server listening on :8080 (/metrics, /costs)", flush=True) + server.serve_forever() diff --git a/demos/llm_routing/model_routing_service/prometheus.yaml b/demos/llm_routing/model_routing_service/prometheus.yaml new file mode 100644 index 000000000..6b0091fc0 --- /dev/null +++ b/demos/llm_routing/model_routing_service/prometheus.yaml @@ -0,0 +1,8 @@ +global: + scrape_interval: 15s + +scrape_configs: + - job_name: model_latency + static_configs: + - targets: + - model-metrics:8080 diff --git a/docs/routing-api.md b/docs/routing-api.md new file mode 100644 index 000000000..84575e27d --- /dev/null +++ b/docs/routing-api.md @@ -0,0 +1,246 @@ +# Plano Routing API — Request & Response Format + +## Overview + +Plano intercepts LLM requests and routes them to the best available model based on semantic intent and live cost/latency data. The developer sends a standard OpenAI-compatible request with an optional `routing_preferences` field. Plano returns an ordered list of candidate models; the client uses the first and falls back to the next on 429 or 5xx errors. + +--- + +## Request Format + +Standard OpenAI chat completion body. The only addition is the optional `routing_preferences` field, which is stripped before the request is forwarded upstream. + +```json +POST /v1/chat/completions +{ + "model": "openai/gpt-4o-mini", + "messages": [ + {"role": "user", "content": "write a sorting algorithm in Python"} + ], + "routing_preferences": [ + { + "name": "code generation", + "description": "generating new code snippets", + "models": ["anthropic/claude-sonnet-4-20250514", "openai/gpt-4o", "openai/gpt-4o-mini"], + "selection_policy": {"prefer": "fastest"} + }, + { + "name": "general questions", + "description": "casual conversation and simple queries", + "models": ["openai/gpt-4o-mini"], + "selection_policy": {"prefer": "cheapest"} + } + ] +} +``` + +### `routing_preferences` fields + +| Field | Type | Required | Description | +|---|---|---|---| +| `name` | string | yes | Route identifier. Must match the LLM router's route classification. | +| `description` | string | yes | Natural language description used by the router to match user intent. | +| `models` | string[] | yes | Ordered candidate pool. At least one entry required. Must be declared in `model_providers`. | +| `selection_policy.prefer` | enum | yes | How to rank models: `cheapest`, `fastest`, `random`, or `none`. | + +### `selection_policy.prefer` values + +| Value | Behavior | +|---|---| +| `cheapest` | Sort by ascending cost from the metrics endpoint. Models with no data appended last. | +| `fastest` | Sort by ascending latency from the metrics endpoint. Models with no data appended last. | +| `random` | Shuffle the model list randomly on each request. | +| `none` | Return models in the order they were defined — no reordering. | + +### Notes + +- `routing_preferences` is **optional**. If omitted, the config-defined preferences are used. +- If provided in the request body, it **overrides** the config for that single request only. +- `model` is still required and is used as the fallback if no route is matched. + +--- + +## Response Format + +```json +{ + "models": [ + "anthropic/claude-sonnet-4-20250514", + "openai/gpt-4o", + "openai/gpt-4o-mini" + ], + "route": "code generation", + "trace_id": "4bf92f3577b34da6a3ce929d0e0e4736" +} +``` + +### Fields + +| Field | Type | Description | +|---|---|---| +| `models` | string[] | Ranked model list. Use `models[0]` as primary; retry with `models[1]` on 429/5xx, and so on. | +| `route` | string \| null | Name of the matched route. `null` if no route matched — client should use the original request `model`. | +| `trace_id` | string | Trace ID for distributed tracing and observability. | + +--- + +## Client Usage Pattern + +```python +response = plano.routing_decision(request) +models = response["models"] + +for model in models: + try: + result = call_llm(model, messages) + break # success — stop trying + except (RateLimitError, ServerError): + continue # try next model in the ranked list +``` + +--- + +## Configuration (set by platform/ops team) + +Requires `version: v0.4.0` or above. Models listed under `routing_preferences` must be declared in `model_providers`. + +```yaml +version: v0.4.0 + +model_providers: + - model: anthropic/claude-sonnet-4-20250514 + access_key: $ANTHROPIC_API_KEY + - model: openai/gpt-4o + access_key: $OPENAI_API_KEY + - model: openai/gpt-4o-mini + access_key: $OPENAI_API_KEY + default: true + +routing_preferences: + - name: code generation + description: generating new code snippets or boilerplate + models: + - anthropic/claude-sonnet-4-20250514 + - openai/gpt-4o + selection_policy: + prefer: fastest + + - name: general questions + description: casual conversation and simple queries + models: + - openai/gpt-4o-mini + - openai/gpt-4o + selection_policy: + prefer: cheapest + +# Optional: live cost and latency data sources (max one per type) +model_metrics_sources: + # Option A: DigitalOcean public pricing (no auth required) + - type: digitalocean_pricing + refresh_interval: 3600 + + # Option B: custom cost endpoint (mutually exclusive with digitalocean_pricing) + # - type: cost_metrics + # url: https://internal-cost-api/models + # refresh_interval: 300 # seconds; omit for fetch-once on startup + # auth: + # type: bearer + # token: $COST_API_TOKEN + + - type: prometheus_metrics + url: https://internal-prometheus/ + query: histogram_quantile(0.95, sum by (model_name, le) (rate(model_latency_seconds_bucket[5m]))) + refresh_interval: 60 +``` + +### Startup validation + +Plano validates metric source configuration at startup and exits with a clear error if: + +| Condition | Error | +|---|---| +| `prefer: cheapest` with no cost source | `prefer: cheapest requires a cost data source — add cost_metrics or digitalocean_pricing` | +| `prefer: fastest` with no `prometheus_metrics` | `prefer: fastest requires a prometheus_metrics source` | +| Two `cost_metrics` entries | `only one cost_metrics source is allowed` | +| Two `prometheus_metrics` entries | `only one prometheus_metrics source is allowed` | +| Two `digitalocean_pricing` entries | `only one digitalocean_pricing source is allowed` | +| `cost_metrics` and `digitalocean_pricing` both present | `cannot both be configured — use one or the other` | + +If a model listed in `routing_preferences` has no matching entry in the fetched pricing or latency data, Plano logs a `WARN` at startup — the model is still included but ranked last. The same warning is also emitted per routing request when a model has no data in cache at decision time (relevant for inline `routing_preferences` overrides that reference models not covered by the configured metrics sources). + +### cost_metrics endpoint + +Plano GETs `url` on startup (and on each `refresh_interval`). Expected response — a JSON object mapping model name to an object with `input_per_million` and `output_per_million` fields: + +```json +{ + "anthropic/claude-sonnet-4-20250514": { + "input_per_million": 3.0, + "output_per_million": 15.0 + }, + "openai/gpt-4o": { + "input_per_million": 5.0, + "output_per_million": 20.0 + }, + "openai/gpt-4o-mini": { + "input_per_million": 0.15, + "output_per_million": 0.6 + } +} +``` + +- `auth.type: bearer` adds `Authorization: Bearer ` to the request +- Plano combines the two fields as `input_per_million + output_per_million` to produce a single cost scalar used for ranking +- Only relative order matters — the unit (e.g. USD per million tokens) is consistent so ranking is correct + +### digitalocean_pricing source + +Fetches public model pricing from the DigitalOcean Gen-AI catalog. No authentication required. + +```yaml +model_metrics_sources: + - type: digitalocean_pricing + refresh_interval: 3600 # re-fetch every hour; omit to fetch once on startup + model_aliases: + openai-gpt-4o: openai/gpt-4o + openai-gpt-4o-mini: openai/gpt-4o-mini + anthropic-claude-sonnet-4: anthropic/claude-sonnet-4-20250514 +``` + +DO catalog entries are stored by their `model_id` field (e.g. `openai-gpt-4o`). The cost scalar is `input_price_per_million + output_price_per_million`. + +**`model_aliases`** — optional. Maps DO `model_id` values to the model names used in `routing_preferences`. Without aliases, cost data is stored under the DO model_id (e.g. `openai-gpt-4o`), which won't match models configured as `openai/gpt-4o`. Aliases let you bridge the naming gap without changing your routing config. + +**Constraints:** +- `cost_metrics` and `digitalocean_pricing` cannot both be configured — use one or the other. +- Only one `digitalocean_pricing` entry is allowed. + +### prometheus_metrics endpoint + +Plano queries `{url}/api/v1/query?query={query}` on startup and each `refresh_interval`. The PromQL expression must return an instant vector with a `model_name` label: + +```json +{ + "status": "success", + "data": { + "resultType": "vector", + "result": [ + {"metric": {"model_name": "anthropic/claude-sonnet-4-20250514"}, "value": [1234567890, "120.5"]}, + {"metric": {"model_name": "openai/gpt-4o"}, "value": [1234567890, "200.3"]} + ] + } +} +``` + +- The PromQL query is responsible for computing the percentile (e.g. `histogram_quantile(0.95, ...)`) +- Latency units are arbitrary — only relative order matters +- Models missing from the result are appended at the end of the ranked list + +--- + +## Version Requirements + +| Version | Top-level `routing_preferences` | +|---|---| +| `< v0.4.0` | Not allowed — startup error if present | +| `v0.4.0+` | Supported (required for model routing) |