From 6acd02953d1d7cb262c585248b1e1aed05e284fc Mon Sep 17 00:00:00 2001 From: Noah Martin Date: Tue, 27 Jan 2026 17:54:48 -0800 Subject: [PATCH 1/9] feat(sessions): Add EAP double-write for user sessions When the `UserSessionsEap` feature flag is enabled, session data is sent both through the legacy metrics pipeline and directly to the snuba-items topic as TRACE_ITEM_TYPE_USER_SESSION TraceItems. This enables migration to the new EAP-based user sessions storage. Includes a Datadog metric `sessions.eap.produced` tagged with `session_type` (update/aggregate) to track EAP writes. Co-Authored-By: Claude Opus 4.5 --- relay-dynamic-config/src/feature.rs | 9 + relay-dynamic-config/src/project.rs | 10 +- relay-server/src/processing/sessions/mod.rs | 126 ++++- .../src/processing/sessions/process.rs | 32 +- relay-server/src/processing/sessions/store.rs | 499 ++++++++++++++++++ relay-server/src/statsd.rs | 12 + 6 files changed, 671 insertions(+), 17 deletions(-) create mode 100644 relay-server/src/processing/sessions/store.rs diff --git a/relay-dynamic-config/src/feature.rs b/relay-dynamic-config/src/feature.rs index ab0f9831602..748b7faad55 100644 --- a/relay-dynamic-config/src/feature.rs +++ b/relay-dynamic-config/src/feature.rs @@ -138,6 +138,15 @@ pub enum Feature { /// Enable the experimental Trace Attachment pipeline in Relay. #[serde(rename = "projects:trace-attachment-processing")] TraceAttachmentProcessing, + /// Enable EAP (Event Analytics Platform) double-write for user sessions. + /// + /// When enabled, session data is sent both through the legacy metrics pipeline + /// and directly to the snuba-items topic as TRACE_ITEM_TYPE_USER_SESSION. + /// This enables migration to the new EAP-based user sessions storage. + /// + /// Serialized as `organizations:user-sessions-eap`. + #[serde(rename = "organizations:user-sessions-eap")] + UserSessionsEap, /// Forward compatibility. #[doc(hidden)] #[serde(other)] diff --git a/relay-dynamic-config/src/project.rs b/relay-dynamic-config/src/project.rs index bf1d409d3ee..b47054083e9 100644 --- a/relay-dynamic-config/src/project.rs +++ b/relay-dynamic-config/src/project.rs @@ -262,6 +262,9 @@ pub struct RetentionsConfig { /// Retention settings for attachments. #[serde(skip_serializing_if = "Option::is_none")] pub trace_attachment: Option, + /// Retention settings for user sessions (EAP). + #[serde(skip_serializing_if = "Option::is_none")] + pub session: Option, } impl RetentionsConfig { @@ -271,9 +274,14 @@ impl RetentionsConfig { span, trace_metric, trace_attachment, + session, } = self; - log.is_none() && span.is_none() && trace_metric.is_none() && trace_attachment.is_none() + log.is_none() + && span.is_none() + && trace_metric.is_none() + && trace_attachment.is_none() + && session.is_none() } } diff --git a/relay-server/src/processing/sessions/mod.rs b/relay-server/src/processing/sessions/mod.rs index 50f2c3204a5..2bd848b19b4 100644 --- a/relay-server/src/processing/sessions/mod.rs +++ b/relay-server/src/processing/sessions/mod.rs @@ -9,9 +9,13 @@ use crate::managed::{Counted, Managed, ManagedEnvelope, OutcomeError, Quantities use crate::processing::sessions::process::Expansion; use crate::processing::{self, Context, CountRateLimited, Forward, Output, QuotaRateLimiter}; use crate::services::outcome::Outcome; +#[cfg(feature = "processing")] +use crate::statsd::RelayCounters; mod filter; mod process; +#[cfg(feature = "processing")] +mod store; type Result = std::result::Result; @@ -91,7 +95,9 @@ impl processing::Processor for SessionsProcessor { ) -> Result, Rejected> { let mut sessions = match process::expand(sessions, ctx) { Expansion::Continue(sessions) => sessions, - Expansion::Forward(sessions) => return Ok(Output::just(SessionsOutput(sessions))), + Expansion::Forward(sessions) => { + return Ok(Output::just(SessionsOutput::Forward(sessions))); + } }; // We can apply filters before normalization here, as our filters currently do not depend @@ -103,31 +109,125 @@ impl processing::Processor for SessionsProcessor { let sessions = self.limiter.enforce_quotas(sessions, ctx).await?; - let sessions = process::extract(sessions, ctx); - Ok(Output::metrics(sessions)) + // Check if EAP user sessions double-write is enabled. + // This feature sends session data to both the legacy metrics pipeline + // and directly to the snuba-items topic as TRACE_ITEM_TYPE_USER_SESSION. + let eap_enabled = ctx + .project_info + .config + .features + .has(relay_dynamic_config::Feature::UserSessionsEap); + + let (metrics, eap_sessions) = process::extract_with_eap(sessions, ctx, eap_enabled); + + if let Some(eap_sessions) = eap_sessions { + // Return both the EAP sessions for storage and the extracted metrics. + Ok(Output { + main: Some(SessionsOutput::Store(eap_sessions)), + metrics: Some(metrics), + }) + } else { + // Legacy path: only return metrics. + Ok(Output::metrics(metrics)) + } } } /// Output produced by the [`SessionsProcessor`]. #[derive(Debug)] -pub struct SessionsOutput(Managed); +pub enum SessionsOutput { + /// Sessions that should be forwarded (non-processing relay). + Forward(Managed), + /// Sessions that should be stored to EAP (processing relay with feature enabled). + Store(Managed), +} impl Forward for SessionsOutput { fn serialize_envelope( self, _: processing::ForwardContext<'_>, ) -> Result>, Rejected<()>> { - Ok(self.0.map(|sessions, _| sessions.serialize_envelope())) + match self { + Self::Forward(sessions) => { + Ok(sessions.map(|sessions, _| sessions.serialize_envelope())) + } + Self::Store(sessions) => { + // EAP sessions should be stored, not serialized to envelope. + Err(sessions + .internal_error("EAP sessions should be stored, not serialized to envelope")) + } + } } #[cfg(feature = "processing")] fn forward_store( self, - _: processing::forward::StoreHandle<'_>, - _: processing::ForwardContext<'_>, + s: processing::forward::StoreHandle<'_>, + ctx: processing::ForwardContext<'_>, ) -> Result<(), Rejected<()>> { - let SessionsOutput(sessions) = self; - Err(sessions.internal_error("sessions should always be extracted into metrics")) + match self { + Self::Forward(sessions) => { + // Non-processing relay path - sessions should have been extracted to metrics. + Err(sessions.internal_error("sessions should always be extracted into metrics")) + } + Self::Store(sessions) => { + // EAP double-write path: convert expanded sessions to TraceItems and store. + let store_ctx = store::Context { + received_at: sessions.received_at(), + scoping: sessions.scoping(), + retention: ctx.retention(|r| r.session.as_ref()), + }; + + // Split sessions into updates and aggregates, keeping track of the aggregates + // for later processing. + let (updates_managed, aggregates) = + sessions.split_once(|s, _| (s.updates, s.aggregates)); + + // Convert and store each session update. + for session in updates_managed.split(|updates| updates) { + let item = session.try_map(|session, _| { + Ok::<_, std::convert::Infallible>(store::convert_session_update( + &session, &store_ctx, + )) + }); + if let Ok(item) = item { + s.store(item); + relay_statsd::metric!( + counter(RelayCounters::SessionsEapProduced) += 1, + session_type = "update" + ); + } + } + + // Convert and store each session aggregate. + // Aggregates are expanded into individual session rows to unify the format. + for aggregate_batch in aggregates.split(|aggs| aggs) { + let release = aggregate_batch.attributes.release.clone(); + let environment = aggregate_batch.attributes.environment.clone(); + + for aggregate in aggregate_batch.split(|batch| batch.aggregates) { + // Convert aggregate to multiple individual session items + let items = store::convert_session_aggregate( + &aggregate, + &release, + environment.as_deref(), + &store_ctx, + ); + + for item in items { + let managed_item = aggregate.wrap(item); + s.store(managed_item); + relay_statsd::metric!( + counter(RelayCounters::SessionsEapProduced) += 1, + session_type = "aggregate" + ); + } + } + } + + Ok(()) + } + } } } @@ -163,15 +263,15 @@ impl Counted for SerializedSessions { } } -#[derive(Debug)] +#[derive(Clone, Debug)] pub struct ExpandedSessions { /// Original envelope headers. - headers: EnvelopeHeaders, + pub(crate) headers: EnvelopeHeaders, /// A list of parsed session updates. - updates: Vec, + pub(crate) updates: Vec, /// A list of parsed session aggregates. - aggregates: Vec, + pub(crate) aggregates: Vec, } impl Counted for ExpandedSessions { diff --git a/relay-server/src/processing/sessions/process.rs b/relay-server/src/processing/sessions/process.rs index 8036969a930..f4c07427799 100644 --- a/relay-server/src/processing/sessions/process.rs +++ b/relay-server/src/processing/sessions/process.rs @@ -212,14 +212,38 @@ fn normalize_attributes(attrs: &mut SessionAttributes, ctx: &NormalizeContext<'_ Ok(()) } -pub fn extract(sessions: Managed, ctx: Context<'_>) -> Managed { +/// Extracts session metrics and optionally returns the expanded sessions for EAP storage. +/// +/// When `eap_enabled` is true, this function returns both the extracted metrics (for the legacy +/// pipeline) and the expanded sessions (for double-write to the snuba-items topic as +/// TRACE_ITEM_TYPE_USER_SESSION). +/// +/// This enables a gradual migration from the legacy session metrics to the new EAP-based +/// user sessions storage, with both paths running in parallel during the migration period. +pub fn extract_with_eap( + sessions: Managed, + ctx: Context<'_>, + eap_enabled: bool, +) -> (Managed, Option>) { let should_extract_abnormal_mechanism = ctx .project_info .config .session_metrics .should_extract_abnormal_mechanism(); - sessions.map(|sessions, records| { + // If EAP is enabled, we need to clone the sessions before consuming them for metrics. + // Use `wrap` to create a new Managed with the same metadata but cloned data. + let eap_sessions = if eap_enabled { + Some(sessions.wrap(ExpandedSessions { + headers: sessions.headers.clone(), + updates: sessions.updates.clone(), + aggregates: sessions.aggregates.clone(), + })) + } else { + None + }; + + let metrics = sessions.map(|sessions, records| { let mut metrics = Vec::new(); let meta = sessions.headers.meta(); @@ -253,5 +277,7 @@ pub fn extract(sessions: Managed, ctx: Context<'_>) -> Managed project_metrics: metrics, sampling_metrics: Vec::new(), } - }) + }); + + (metrics, eap_sessions) } diff --git a/relay-server/src/processing/sessions/store.rs b/relay-server/src/processing/sessions/store.rs new file mode 100644 index 00000000000..19b72d92584 --- /dev/null +++ b/relay-server/src/processing/sessions/store.rs @@ -0,0 +1,499 @@ +//! Converts session data to EAP TraceItem format for the snuba-items topic. +//! +//! This module implements the double-write path for user sessions, sending session data +//! directly to the snuba-items Kafka topic as `i32::from(TraceItemType::UserSession)` TraceItems, +//! in addition to the legacy metrics extraction path. + +use std::collections::HashMap; + +use chrono::{DateTime, Utc}; +use relay_event_schema::protocol::{ + AbnormalMechanism, SessionAggregateItem, SessionAttributes, SessionStatus, SessionUpdate, +}; +use relay_quotas::{DataCategory, Scoping}; +use sentry_protos::snuba::v1::{AnyValue, TraceItem, TraceItemType, any_value}; +use uuid::Uuid; + +use crate::processing::Retention; +use crate::processing::utils::store::{proto_timestamp, uuid_to_item_id}; +use crate::services::store::StoreTraceItem; + +/// UUID namespace for user sessions. +/// +/// This must match the `USER_SESSION_NAMESPACE` in sentry's +/// `sentry/user_sessions/eap/constants.py` to ensure consistent trace_id and item_id +/// generation between relay and sentry. +/// +/// Using a fixed namespace ensures: +/// - No collision with other trace types in EAP +/// - Deterministic IDs for ReplacingMergeTree deduplication +const USER_SESSION_NAMESPACE: Uuid = Uuid::from_bytes([ + 0xa1, 0xb2, 0xc3, 0xd4, 0xe5, 0xf6, 0x78, 0x90, 0xab, 0xcd, 0xef, 0x12, 0x34, 0x56, 0x78, 0x90, +]); + +/// Context parameters for converting sessions to TraceItems. +#[derive(Debug, Clone, Copy)] +pub struct Context { + /// Received time. + pub received_at: DateTime, + /// Item scoping. + pub scoping: Scoping, + /// Item retention. + pub retention: Retention, +} + +/// Converts a `SessionUpdate` to a `StoreTraceItem` for the snuba-items topic. +/// +/// The resulting TraceItem uses `i32::from(TraceItemType::UserSession)` and includes all +/// session attributes that are supported by the EAP user sessions schema. +pub fn convert_session_update(session: &SessionUpdate, ctx: &Context) -> StoreTraceItem { + let session_id = session.session_id.to_string(); + + // Generate deterministic trace_id from session_id using UUID5 with namespace. + // This ensures the same session always gets the same trace_id. + let trace_id = Uuid::new_v5(&USER_SESSION_NAMESPACE, session_id.as_bytes()).to_string(); + + // Generate deterministic item_id for ReplacingMergeTree deduplication. + // Include sequence to differentiate multiple updates for the same session. + let item_id_input = format!("user_session_{}_{}", session_id, session.sequence); + let item_id = uuid_to_item_id(Uuid::new_v5( + &USER_SESSION_NAMESPACE, + item_id_input.as_bytes(), + )); + + // Determine if the session crashed based on status. + let crashed = matches!(session.status, SessionStatus::Crashed); + + // Build attributes map matching sentry's UserSessionData fields. + let mut attributes = HashMap::new(); + + // Required attributes + insert_string(&mut attributes, "session_id", &session_id); + insert_bool(&mut attributes, "crashed", crashed); + insert_int(&mut attributes, "error_count", session.errors as i64); + + // Optional attributes from session + if let Some(ref distinct_id) = session.distinct_id { + insert_string(&mut attributes, "user_id", distinct_id); + } + + // Attributes from session attributes + insert_string(&mut attributes, "release", &session.attributes.release); + + if let Some(ref environment) = session.attributes.environment { + insert_string(&mut attributes, "environment", environment); + } + + if let Some(ref user_agent) = session.attributes.user_agent { + insert_string(&mut attributes, "user_agent", user_agent); + } + + // Session status as string for debugging/filtering + insert_string(&mut attributes, "status", &session.status.to_string()); + + // Duration if available + if let Some(duration) = session.duration { + insert_double(&mut attributes, "duration", duration); + } + + // Whether this is an init session + insert_bool(&mut attributes, "init", session.init); + + // Abnormal mechanism (ANR info) if present + if !matches!(session.abnormal_mechanism, AbnormalMechanism::None) { + insert_string( + &mut attributes, + "abnormal_mechanism", + &session.abnormal_mechanism.to_string(), + ); + } + + let trace_item = TraceItem { + organization_id: ctx.scoping.organization_id.value(), + project_id: ctx.scoping.project_id.value(), + trace_id, + item_id, + item_type: i32::from(TraceItemType::UserSession), + timestamp: Some(proto_timestamp(session.timestamp)), + received: Some(proto_timestamp(ctx.received_at)), + retention_days: ctx.retention.standard.into(), + downsampled_retention_days: ctx.retention.downsampled.into(), + attributes, + client_sample_rate: 1.0, + server_sample_rate: 1.0, + }; + + StoreTraceItem { + trace_item, + quantities: smallvec::smallvec![(DataCategory::Session, 1)], + } +} + +/// Converts a `SessionAggregateItem` to multiple `StoreTraceItem`s for the snuba-items topic. +/// +/// Session aggregates represent pre-aggregated session data. Each aggregate is expanded +/// into individual session rows to unify the format with regular session updates. +/// For example, an aggregate with `exited: 3, crashed: 2` produces 5 individual TraceItems. +pub fn convert_session_aggregate( + aggregate: &SessionAggregateItem, + release: &str, + environment: Option<&str>, + ctx: &Context, +) -> Vec { + // Expand the aggregate into synthetic SessionUpdate objects, then convert each + // using the same code path as regular session updates. + expand_aggregate_to_sessions( + aggregate, + release, + environment, + ctx.scoping.project_id.value(), + ) + .into_iter() + .map(|session| convert_session_update(&session, ctx)) + .collect() +} + +/// Expands a `SessionAggregateItem` into synthetic `SessionUpdate` objects. +/// +/// Each status count in the aggregate becomes individual session updates. +/// This allows the same TraceItem conversion logic to be used for both +/// regular session updates and aggregates. +fn expand_aggregate_to_sessions( + aggregate: &SessionAggregateItem, + release: &str, + environment: Option<&str>, + project_id: u64, +) -> Vec { + let mut sessions = Vec::new(); + + // Map each status count to (count, SessionStatus) + let status_counts = [ + (aggregate.exited, SessionStatus::Exited), + (aggregate.errored, SessionStatus::Errored), + (aggregate.abnormal, SessionStatus::Abnormal), + (aggregate.unhandled, SessionStatus::Unhandled), + (aggregate.crashed, SessionStatus::Crashed), + ]; + + // Pre-compute values that are shared across all expanded sessions + let distinct_id = aggregate.distinct_id.clone(); + let distinct_id_str = distinct_id.as_deref().unwrap_or("unknown"); + let timestamp_millis = aggregate.started.timestamp_millis(); + + let mut index: u32 = 0; + for (count, status) in status_counts { + for _ in 0..count { + // Generate a unique, deterministic session ID for this expanded row. + let session_id_str = format!( + "aggregate_{}_{}_{}_{}", + timestamp_millis, distinct_id_str, project_id, index + ); + let session_id = Uuid::new_v5(&USER_SESSION_NAMESPACE, session_id_str.as_bytes()); + + let session = SessionUpdate { + session_id, + distinct_id: distinct_id.clone(), + sequence: 0, + init: true, // Aggregates represent completed sessions + timestamp: aggregate.started, + started: aggregate.started, + duration: None, + status: status.clone(), + errors: 0, // Aggregates don't track per-session error counts + attributes: SessionAttributes { + release: release.to_owned(), + environment: environment.map(str::to_owned), + ip_address: None, + user_agent: None, + }, + abnormal_mechanism: AbnormalMechanism::None, + }; + + sessions.push(session); + index += 1; + } + } + + sessions +} + +fn insert_string(attrs: &mut HashMap, key: &str, value: &str) { + let value = AnyValue { + value: Some(any_value::Value::StringValue(value.to_owned())), + }; + attrs.insert(key.to_owned(), value); +} + +fn insert_bool(attrs: &mut HashMap, key: &str, value: bool) { + let value = AnyValue { + value: Some(any_value::Value::BoolValue(value)), + }; + attrs.insert(key.to_owned(), value); +} + +fn insert_int(attrs: &mut HashMap, key: &str, value: i64) { + let value = AnyValue { + value: Some(any_value::Value::IntValue(value)), + }; + attrs.insert(key.to_owned(), value); +} + +fn insert_double(attrs: &mut HashMap, key: &str, value: f64) { + let value = AnyValue { + value: Some(any_value::Value::DoubleValue(value)), + }; + attrs.insert(key.to_owned(), value); +} + +#[cfg(test)] +mod tests { + use chrono::TimeZone; + use relay_base_schema::organization::OrganizationId; + use relay_base_schema::project::ProjectId; + use relay_event_schema::protocol::{AbnormalMechanism, SessionAttributes}; + + use super::*; + + fn test_context() -> Context { + Context { + received_at: Utc.with_ymd_and_hms(2024, 1, 1, 12, 0, 0).unwrap(), + scoping: Scoping { + organization_id: OrganizationId::new(1), + project_id: ProjectId::new(42), + project_key: "12333333333333333333333333333333".parse().unwrap(), + key_id: Some(3), + }, + retention: Retention { + standard: 90, + downsampled: 90, + }, + } + } + + fn get_str_attr<'a>(item: &'a StoreTraceItem, key: &str) -> Option<&'a str> { + item.trace_item + .attributes + .get(key) + .and_then(|v| match v.value.as_ref()? { + any_value::Value::StringValue(s) => Some(s.as_str()), + _ => None, + }) + } + + fn get_bool_attr(item: &StoreTraceItem, key: &str) -> Option { + item.trace_item + .attributes + .get(key) + .and_then(|v| match v.value.as_ref()? { + any_value::Value::BoolValue(b) => Some(*b), + _ => None, + }) + } + + #[test] + fn test_convert_session_update() { + let session = SessionUpdate { + session_id: Uuid::new_v4(), + distinct_id: Some("user-123".to_owned()), + sequence: 1, + init: true, + timestamp: Utc.with_ymd_and_hms(2024, 1, 1, 10, 0, 0).unwrap(), + started: Utc.with_ymd_and_hms(2024, 1, 1, 9, 0, 0).unwrap(), + duration: Some(3600.0), + status: SessionStatus::Ok, + errors: 2, + attributes: SessionAttributes { + release: "1.0.0".to_owned(), + environment: Some("production".to_owned()), + ip_address: None, + user_agent: Some("TestAgent/1.0".to_owned()), + }, + abnormal_mechanism: AbnormalMechanism::None, + }; + + let ctx = test_context(); + let result = convert_session_update(&session, &ctx); + + // Check item_type matches i32::from(TraceItemType::UserSession) (12) + assert_eq!( + result.trace_item.item_type, + i32::from(TraceItemType::UserSession) + ); + assert_eq!(result.trace_item.organization_id, 1); + assert_eq!(result.trace_item.project_id, 42); + assert_eq!(result.trace_item.retention_days, 90); + + // Check attributes + let attrs = &result.trace_item.attributes; + assert!(attrs.contains_key("session_id")); + assert!(attrs.contains_key("crashed")); + assert!(attrs.contains_key("error_count")); + assert!(attrs.contains_key("user_id")); + assert!(attrs.contains_key("release")); + assert!(attrs.contains_key("environment")); + } + + #[test] + fn test_convert_crashed_session() { + let session = SessionUpdate { + session_id: Uuid::new_v4(), + distinct_id: None, + sequence: 0, + init: false, + timestamp: Utc::now(), + started: Utc::now(), + duration: None, + status: SessionStatus::Crashed, + errors: 1, + attributes: SessionAttributes { + release: "1.0.0".to_owned(), + environment: None, + ip_address: None, + user_agent: None, + }, + abnormal_mechanism: AbnormalMechanism::None, + }; + + let ctx = test_context(); + let result = convert_session_update(&session, &ctx); + + assert_eq!(get_bool_attr(&result, "crashed"), Some(true)); + } + + #[test] + fn test_deterministic_ids() { + let session = SessionUpdate { + session_id: Uuid::parse_str("550e8400-e29b-41d4-a716-446655440000").unwrap(), + distinct_id: None, + sequence: 0, + init: true, + timestamp: Utc::now(), + started: Utc::now(), + duration: None, + status: SessionStatus::Ok, + errors: 0, + attributes: SessionAttributes { + release: "1.0.0".to_owned(), + environment: None, + ip_address: None, + user_agent: None, + }, + abnormal_mechanism: AbnormalMechanism::None, + }; + + let ctx = test_context(); + + // Convert the same session twice + let result1 = convert_session_update(&session, &ctx); + let result2 = convert_session_update(&session, &ctx); + + // IDs should be deterministic (same input = same output) + assert_eq!(result1.trace_item.trace_id, result2.trace_item.trace_id); + assert_eq!(result1.trace_item.item_id, result2.trace_item.item_id); + } + + #[test] + fn test_convert_session_aggregate_expands_to_multiple_rows() { + let aggregate = SessionAggregateItem { + started: Utc.with_ymd_and_hms(2024, 1, 1, 10, 0, 0).unwrap(), + distinct_id: Some("user-456".to_owned()), + exited: 3, + errored: 1, + abnormal: 0, + unhandled: 0, + crashed: 2, + }; + + let ctx = test_context(); + let results = convert_session_aggregate(&aggregate, "1.0.0", Some("production"), &ctx); + + // Should produce 6 rows: 3 exited + 1 errored + 2 crashed + assert_eq!(results.len(), 6); + + // Check status distribution + let statuses: Vec<_> = results + .iter() + .filter_map(|r| get_str_attr(r, "status")) + .collect(); + assert_eq!(statuses.iter().filter(|&&s| s == "exited").count(), 3); + assert_eq!(statuses.iter().filter(|&&s| s == "errored").count(), 1); + assert_eq!(statuses.iter().filter(|&&s| s == "crashed").count(), 2); + + // Check that crashed sessions have crashed=true + for result in &results { + let is_crashed_status = get_str_attr(result, "status") == Some("crashed"); + assert_eq!(get_bool_attr(result, "crashed"), Some(is_crashed_status)); + } + + // All items should have common attributes + for result in &results { + assert_eq!( + result.trace_item.item_type, + i32::from(TraceItemType::UserSession) + ); + assert_eq!(result.trace_item.organization_id, 1); + assert_eq!(result.trace_item.project_id, 42); + assert!(get_str_attr(result, "session_id").is_some()); + assert!(get_str_attr(result, "release").is_some()); + assert!(get_str_attr(result, "environment").is_some()); + assert!(get_str_attr(result, "user_id").is_some()); + assert!(get_bool_attr(result, "init").is_some()); + } + } + + #[test] + fn test_convert_empty_aggregate_produces_no_rows() { + let aggregate = SessionAggregateItem { + started: Utc.with_ymd_and_hms(2024, 1, 1, 10, 0, 0).unwrap(), + distinct_id: None, + exited: 0, + errored: 0, + abnormal: 0, + unhandled: 0, + crashed: 0, + }; + + let ctx = test_context(); + let results = convert_session_aggregate(&aggregate, "1.0.0", None, &ctx); + + assert!(results.is_empty()); + } + + #[test] + fn test_aggregate_rows_have_unique_deterministic_ids() { + let aggregate = SessionAggregateItem { + started: Utc.with_ymd_and_hms(2024, 1, 1, 10, 0, 0).unwrap(), + distinct_id: Some("user-789".to_owned()), + exited: 2, + errored: 0, + abnormal: 0, + unhandled: 0, + crashed: 1, + }; + + let ctx = test_context(); + + // Convert twice to verify determinism + let results1 = convert_session_aggregate(&aggregate, "1.0.0", None, &ctx); + let results2 = convert_session_aggregate(&aggregate, "1.0.0", None, &ctx); + + assert_eq!(results1.len(), results2.len()); + + // Each row should have a unique item_id + let ids1: Vec<_> = results1 + .iter() + .map(|r| r.trace_item.item_id.clone()) + .collect(); + let ids2: Vec<_> = results2 + .iter() + .map(|r| r.trace_item.item_id.clone()) + .collect(); + + // All IDs should be unique within a result set + let unique_ids1: std::collections::HashSet<_> = ids1.iter().collect(); + assert_eq!(unique_ids1.len(), ids1.len()); + + // IDs should be deterministic across conversions + assert_eq!(ids1, ids2); + } +} diff --git a/relay-server/src/statsd.rs b/relay-server/src/statsd.rs index d004890bd9c..f27e65e12dc 100644 --- a/relay-server/src/statsd.rs +++ b/relay-server/src/statsd.rs @@ -824,6 +824,16 @@ pub enum RelayCounters { /// Number of spans produced in the new format. #[cfg(feature = "processing")] SpanV2Produced, + /// Number of user sessions produced to the EAP snuba-items topic. + /// + /// This metric is only emitted when the `UserSessionsEap` feature flag is enabled, + /// tracking double-writes of session data to the EAP (Event Analytics Platform). + /// + /// This metric is tagged with: + /// - `session_type`: Either `"update"` for individual session updates or `"aggregate"` for + /// pre-aggregated session data. + #[cfg(feature = "processing")] + SessionsEapProduced, /// Number of events that hit any of the store-like endpoints: Envelope, Store, Security, /// Minidump, Unreal. /// @@ -991,6 +1001,8 @@ impl CounterMetric for RelayCounters { RelayCounters::ProcessingMessageProduced => "processing.event.produced", #[cfg(feature = "processing")] RelayCounters::SpanV2Produced => "store.produced.span_v2", + #[cfg(feature = "processing")] + RelayCounters::SessionsEapProduced => "sessions.eap.produced", RelayCounters::EventProtocol => "event.protocol", RelayCounters::EventTransaction => "event.transaction", RelayCounters::TransactionNameChanges => "event.transaction_name_changes", From 71a0acfd54f4c745c0aca8d25e1c5434b7b96827 Mon Sep 17 00:00:00 2001 From: Noah Martin Date: Mon, 2 Feb 2026 14:29:58 -0800 Subject: [PATCH 2/9] Produce to EAP after aggregation --- relay-dynamic-config/src/project.rs | 10 +- relay-server/src/processing/sessions/mod.rs | 127 +---- .../src/processing/sessions/process.rs | 31 +- relay-server/src/processing/sessions/store.rs | 499 ------------------ relay-server/src/services/processor.rs | 31 +- relay-server/src/services/store.rs | 178 +++++++ relay-server/src/statsd.rs | 8 +- 7 files changed, 224 insertions(+), 660 deletions(-) delete mode 100644 relay-server/src/processing/sessions/store.rs diff --git a/relay-dynamic-config/src/project.rs b/relay-dynamic-config/src/project.rs index b47054083e9..bf1d409d3ee 100644 --- a/relay-dynamic-config/src/project.rs +++ b/relay-dynamic-config/src/project.rs @@ -262,9 +262,6 @@ pub struct RetentionsConfig { /// Retention settings for attachments. #[serde(skip_serializing_if = "Option::is_none")] pub trace_attachment: Option, - /// Retention settings for user sessions (EAP). - #[serde(skip_serializing_if = "Option::is_none")] - pub session: Option, } impl RetentionsConfig { @@ -274,14 +271,9 @@ impl RetentionsConfig { span, trace_metric, trace_attachment, - session, } = self; - log.is_none() - && span.is_none() - && trace_metric.is_none() - && trace_attachment.is_none() - && session.is_none() + log.is_none() && span.is_none() && trace_metric.is_none() && trace_attachment.is_none() } } diff --git a/relay-server/src/processing/sessions/mod.rs b/relay-server/src/processing/sessions/mod.rs index 2bd848b19b4..bc5d0533e2f 100644 --- a/relay-server/src/processing/sessions/mod.rs +++ b/relay-server/src/processing/sessions/mod.rs @@ -9,13 +9,9 @@ use crate::managed::{Counted, Managed, ManagedEnvelope, OutcomeError, Quantities use crate::processing::sessions::process::Expansion; use crate::processing::{self, Context, CountRateLimited, Forward, Output, QuotaRateLimiter}; use crate::services::outcome::Outcome; -#[cfg(feature = "processing")] -use crate::statsd::RelayCounters; mod filter; mod process; -#[cfg(feature = "processing")] -mod store; type Result = std::result::Result; @@ -95,9 +91,7 @@ impl processing::Processor for SessionsProcessor { ) -> Result, Rejected> { let mut sessions = match process::expand(sessions, ctx) { Expansion::Continue(sessions) => sessions, - Expansion::Forward(sessions) => { - return Ok(Output::just(SessionsOutput::Forward(sessions))); - } + Expansion::Forward(sessions) => return Ok(Output::just(SessionsOutput(sessions))), }; // We can apply filters before normalization here, as our filters currently do not depend @@ -109,125 +103,32 @@ impl processing::Processor for SessionsProcessor { let sessions = self.limiter.enforce_quotas(sessions, ctx).await?; - // Check if EAP user sessions double-write is enabled. - // This feature sends session data to both the legacy metrics pipeline - // and directly to the snuba-items topic as TRACE_ITEM_TYPE_USER_SESSION. - let eap_enabled = ctx - .project_info - .config - .features - .has(relay_dynamic_config::Feature::UserSessionsEap); - - let (metrics, eap_sessions) = process::extract_with_eap(sessions, ctx, eap_enabled); - - if let Some(eap_sessions) = eap_sessions { - // Return both the EAP sessions for storage and the extracted metrics. - Ok(Output { - main: Some(SessionsOutput::Store(eap_sessions)), - metrics: Some(metrics), - }) - } else { - // Legacy path: only return metrics. - Ok(Output::metrics(metrics)) - } + let sessions = process::extract_metrics(sessions, ctx); + + Ok(Output::metrics(sessions)) } } /// Output produced by the [`SessionsProcessor`]. #[derive(Debug)] -pub enum SessionsOutput { - /// Sessions that should be forwarded (non-processing relay). - Forward(Managed), - /// Sessions that should be stored to EAP (processing relay with feature enabled). - Store(Managed), -} +pub struct SessionsOutput(Managed); impl Forward for SessionsOutput { fn serialize_envelope( self, _: processing::ForwardContext<'_>, ) -> Result>, Rejected<()>> { - match self { - Self::Forward(sessions) => { - Ok(sessions.map(|sessions, _| sessions.serialize_envelope())) - } - Self::Store(sessions) => { - // EAP sessions should be stored, not serialized to envelope. - Err(sessions - .internal_error("EAP sessions should be stored, not serialized to envelope")) - } - } + Ok(self.0.map(|sessions, _| sessions.serialize_envelope())) } #[cfg(feature = "processing")] fn forward_store( self, - s: processing::forward::StoreHandle<'_>, - ctx: processing::ForwardContext<'_>, + _: processing::forward::StoreHandle<'_>, + _: processing::ForwardContext<'_>, ) -> Result<(), Rejected<()>> { - match self { - Self::Forward(sessions) => { - // Non-processing relay path - sessions should have been extracted to metrics. - Err(sessions.internal_error("sessions should always be extracted into metrics")) - } - Self::Store(sessions) => { - // EAP double-write path: convert expanded sessions to TraceItems and store. - let store_ctx = store::Context { - received_at: sessions.received_at(), - scoping: sessions.scoping(), - retention: ctx.retention(|r| r.session.as_ref()), - }; - - // Split sessions into updates and aggregates, keeping track of the aggregates - // for later processing. - let (updates_managed, aggregates) = - sessions.split_once(|s, _| (s.updates, s.aggregates)); - - // Convert and store each session update. - for session in updates_managed.split(|updates| updates) { - let item = session.try_map(|session, _| { - Ok::<_, std::convert::Infallible>(store::convert_session_update( - &session, &store_ctx, - )) - }); - if let Ok(item) = item { - s.store(item); - relay_statsd::metric!( - counter(RelayCounters::SessionsEapProduced) += 1, - session_type = "update" - ); - } - } - - // Convert and store each session aggregate. - // Aggregates are expanded into individual session rows to unify the format. - for aggregate_batch in aggregates.split(|aggs| aggs) { - let release = aggregate_batch.attributes.release.clone(); - let environment = aggregate_batch.attributes.environment.clone(); - - for aggregate in aggregate_batch.split(|batch| batch.aggregates) { - // Convert aggregate to multiple individual session items - let items = store::convert_session_aggregate( - &aggregate, - &release, - environment.as_deref(), - &store_ctx, - ); - - for item in items { - let managed_item = aggregate.wrap(item); - s.store(managed_item); - relay_statsd::metric!( - counter(RelayCounters::SessionsEapProduced) += 1, - session_type = "aggregate" - ); - } - } - } - - Ok(()) - } - } + let SessionsOutput(sessions) = self; + Err(sessions.internal_error("sessions should always be extracted into metrics")) } } @@ -263,15 +164,15 @@ impl Counted for SerializedSessions { } } -#[derive(Clone, Debug)] +#[derive(Debug)] pub struct ExpandedSessions { /// Original envelope headers. - pub(crate) headers: EnvelopeHeaders, + headers: EnvelopeHeaders, /// A list of parsed session updates. - pub(crate) updates: Vec, + updates: Vec, /// A list of parsed session aggregates. - pub(crate) aggregates: Vec, + aggregates: Vec, } impl Counted for ExpandedSessions { diff --git a/relay-server/src/processing/sessions/process.rs b/relay-server/src/processing/sessions/process.rs index f4c07427799..3015c7fa020 100644 --- a/relay-server/src/processing/sessions/process.rs +++ b/relay-server/src/processing/sessions/process.rs @@ -212,38 +212,17 @@ fn normalize_attributes(attrs: &mut SessionAttributes, ctx: &NormalizeContext<'_ Ok(()) } -/// Extracts session metrics and optionally returns the expanded sessions for EAP storage. -/// -/// When `eap_enabled` is true, this function returns both the extracted metrics (for the legacy -/// pipeline) and the expanded sessions (for double-write to the snuba-items topic as -/// TRACE_ITEM_TYPE_USER_SESSION). -/// -/// This enables a gradual migration from the legacy session metrics to the new EAP-based -/// user sessions storage, with both paths running in parallel during the migration period. -pub fn extract_with_eap( +pub fn extract_metrics( sessions: Managed, ctx: Context<'_>, - eap_enabled: bool, -) -> (Managed, Option>) { +) -> Managed { let should_extract_abnormal_mechanism = ctx .project_info .config .session_metrics .should_extract_abnormal_mechanism(); - // If EAP is enabled, we need to clone the sessions before consuming them for metrics. - // Use `wrap` to create a new Managed with the same metadata but cloned data. - let eap_sessions = if eap_enabled { - Some(sessions.wrap(ExpandedSessions { - headers: sessions.headers.clone(), - updates: sessions.updates.clone(), - aggregates: sessions.aggregates.clone(), - })) - } else { - None - }; - - let metrics = sessions.map(|sessions, records| { + sessions.map(|sessions, records| { let mut metrics = Vec::new(); let meta = sessions.headers.meta(); @@ -277,7 +256,5 @@ pub fn extract_with_eap( project_metrics: metrics, sampling_metrics: Vec::new(), } - }); - - (metrics, eap_sessions) + }) } diff --git a/relay-server/src/processing/sessions/store.rs b/relay-server/src/processing/sessions/store.rs deleted file mode 100644 index 19b72d92584..00000000000 --- a/relay-server/src/processing/sessions/store.rs +++ /dev/null @@ -1,499 +0,0 @@ -//! Converts session data to EAP TraceItem format for the snuba-items topic. -//! -//! This module implements the double-write path for user sessions, sending session data -//! directly to the snuba-items Kafka topic as `i32::from(TraceItemType::UserSession)` TraceItems, -//! in addition to the legacy metrics extraction path. - -use std::collections::HashMap; - -use chrono::{DateTime, Utc}; -use relay_event_schema::protocol::{ - AbnormalMechanism, SessionAggregateItem, SessionAttributes, SessionStatus, SessionUpdate, -}; -use relay_quotas::{DataCategory, Scoping}; -use sentry_protos::snuba::v1::{AnyValue, TraceItem, TraceItemType, any_value}; -use uuid::Uuid; - -use crate::processing::Retention; -use crate::processing::utils::store::{proto_timestamp, uuid_to_item_id}; -use crate::services::store::StoreTraceItem; - -/// UUID namespace for user sessions. -/// -/// This must match the `USER_SESSION_NAMESPACE` in sentry's -/// `sentry/user_sessions/eap/constants.py` to ensure consistent trace_id and item_id -/// generation between relay and sentry. -/// -/// Using a fixed namespace ensures: -/// - No collision with other trace types in EAP -/// - Deterministic IDs for ReplacingMergeTree deduplication -const USER_SESSION_NAMESPACE: Uuid = Uuid::from_bytes([ - 0xa1, 0xb2, 0xc3, 0xd4, 0xe5, 0xf6, 0x78, 0x90, 0xab, 0xcd, 0xef, 0x12, 0x34, 0x56, 0x78, 0x90, -]); - -/// Context parameters for converting sessions to TraceItems. -#[derive(Debug, Clone, Copy)] -pub struct Context { - /// Received time. - pub received_at: DateTime, - /// Item scoping. - pub scoping: Scoping, - /// Item retention. - pub retention: Retention, -} - -/// Converts a `SessionUpdate` to a `StoreTraceItem` for the snuba-items topic. -/// -/// The resulting TraceItem uses `i32::from(TraceItemType::UserSession)` and includes all -/// session attributes that are supported by the EAP user sessions schema. -pub fn convert_session_update(session: &SessionUpdate, ctx: &Context) -> StoreTraceItem { - let session_id = session.session_id.to_string(); - - // Generate deterministic trace_id from session_id using UUID5 with namespace. - // This ensures the same session always gets the same trace_id. - let trace_id = Uuid::new_v5(&USER_SESSION_NAMESPACE, session_id.as_bytes()).to_string(); - - // Generate deterministic item_id for ReplacingMergeTree deduplication. - // Include sequence to differentiate multiple updates for the same session. - let item_id_input = format!("user_session_{}_{}", session_id, session.sequence); - let item_id = uuid_to_item_id(Uuid::new_v5( - &USER_SESSION_NAMESPACE, - item_id_input.as_bytes(), - )); - - // Determine if the session crashed based on status. - let crashed = matches!(session.status, SessionStatus::Crashed); - - // Build attributes map matching sentry's UserSessionData fields. - let mut attributes = HashMap::new(); - - // Required attributes - insert_string(&mut attributes, "session_id", &session_id); - insert_bool(&mut attributes, "crashed", crashed); - insert_int(&mut attributes, "error_count", session.errors as i64); - - // Optional attributes from session - if let Some(ref distinct_id) = session.distinct_id { - insert_string(&mut attributes, "user_id", distinct_id); - } - - // Attributes from session attributes - insert_string(&mut attributes, "release", &session.attributes.release); - - if let Some(ref environment) = session.attributes.environment { - insert_string(&mut attributes, "environment", environment); - } - - if let Some(ref user_agent) = session.attributes.user_agent { - insert_string(&mut attributes, "user_agent", user_agent); - } - - // Session status as string for debugging/filtering - insert_string(&mut attributes, "status", &session.status.to_string()); - - // Duration if available - if let Some(duration) = session.duration { - insert_double(&mut attributes, "duration", duration); - } - - // Whether this is an init session - insert_bool(&mut attributes, "init", session.init); - - // Abnormal mechanism (ANR info) if present - if !matches!(session.abnormal_mechanism, AbnormalMechanism::None) { - insert_string( - &mut attributes, - "abnormal_mechanism", - &session.abnormal_mechanism.to_string(), - ); - } - - let trace_item = TraceItem { - organization_id: ctx.scoping.organization_id.value(), - project_id: ctx.scoping.project_id.value(), - trace_id, - item_id, - item_type: i32::from(TraceItemType::UserSession), - timestamp: Some(proto_timestamp(session.timestamp)), - received: Some(proto_timestamp(ctx.received_at)), - retention_days: ctx.retention.standard.into(), - downsampled_retention_days: ctx.retention.downsampled.into(), - attributes, - client_sample_rate: 1.0, - server_sample_rate: 1.0, - }; - - StoreTraceItem { - trace_item, - quantities: smallvec::smallvec![(DataCategory::Session, 1)], - } -} - -/// Converts a `SessionAggregateItem` to multiple `StoreTraceItem`s for the snuba-items topic. -/// -/// Session aggregates represent pre-aggregated session data. Each aggregate is expanded -/// into individual session rows to unify the format with regular session updates. -/// For example, an aggregate with `exited: 3, crashed: 2` produces 5 individual TraceItems. -pub fn convert_session_aggregate( - aggregate: &SessionAggregateItem, - release: &str, - environment: Option<&str>, - ctx: &Context, -) -> Vec { - // Expand the aggregate into synthetic SessionUpdate objects, then convert each - // using the same code path as regular session updates. - expand_aggregate_to_sessions( - aggregate, - release, - environment, - ctx.scoping.project_id.value(), - ) - .into_iter() - .map(|session| convert_session_update(&session, ctx)) - .collect() -} - -/// Expands a `SessionAggregateItem` into synthetic `SessionUpdate` objects. -/// -/// Each status count in the aggregate becomes individual session updates. -/// This allows the same TraceItem conversion logic to be used for both -/// regular session updates and aggregates. -fn expand_aggregate_to_sessions( - aggregate: &SessionAggregateItem, - release: &str, - environment: Option<&str>, - project_id: u64, -) -> Vec { - let mut sessions = Vec::new(); - - // Map each status count to (count, SessionStatus) - let status_counts = [ - (aggregate.exited, SessionStatus::Exited), - (aggregate.errored, SessionStatus::Errored), - (aggregate.abnormal, SessionStatus::Abnormal), - (aggregate.unhandled, SessionStatus::Unhandled), - (aggregate.crashed, SessionStatus::Crashed), - ]; - - // Pre-compute values that are shared across all expanded sessions - let distinct_id = aggregate.distinct_id.clone(); - let distinct_id_str = distinct_id.as_deref().unwrap_or("unknown"); - let timestamp_millis = aggregate.started.timestamp_millis(); - - let mut index: u32 = 0; - for (count, status) in status_counts { - for _ in 0..count { - // Generate a unique, deterministic session ID for this expanded row. - let session_id_str = format!( - "aggregate_{}_{}_{}_{}", - timestamp_millis, distinct_id_str, project_id, index - ); - let session_id = Uuid::new_v5(&USER_SESSION_NAMESPACE, session_id_str.as_bytes()); - - let session = SessionUpdate { - session_id, - distinct_id: distinct_id.clone(), - sequence: 0, - init: true, // Aggregates represent completed sessions - timestamp: aggregate.started, - started: aggregate.started, - duration: None, - status: status.clone(), - errors: 0, // Aggregates don't track per-session error counts - attributes: SessionAttributes { - release: release.to_owned(), - environment: environment.map(str::to_owned), - ip_address: None, - user_agent: None, - }, - abnormal_mechanism: AbnormalMechanism::None, - }; - - sessions.push(session); - index += 1; - } - } - - sessions -} - -fn insert_string(attrs: &mut HashMap, key: &str, value: &str) { - let value = AnyValue { - value: Some(any_value::Value::StringValue(value.to_owned())), - }; - attrs.insert(key.to_owned(), value); -} - -fn insert_bool(attrs: &mut HashMap, key: &str, value: bool) { - let value = AnyValue { - value: Some(any_value::Value::BoolValue(value)), - }; - attrs.insert(key.to_owned(), value); -} - -fn insert_int(attrs: &mut HashMap, key: &str, value: i64) { - let value = AnyValue { - value: Some(any_value::Value::IntValue(value)), - }; - attrs.insert(key.to_owned(), value); -} - -fn insert_double(attrs: &mut HashMap, key: &str, value: f64) { - let value = AnyValue { - value: Some(any_value::Value::DoubleValue(value)), - }; - attrs.insert(key.to_owned(), value); -} - -#[cfg(test)] -mod tests { - use chrono::TimeZone; - use relay_base_schema::organization::OrganizationId; - use relay_base_schema::project::ProjectId; - use relay_event_schema::protocol::{AbnormalMechanism, SessionAttributes}; - - use super::*; - - fn test_context() -> Context { - Context { - received_at: Utc.with_ymd_and_hms(2024, 1, 1, 12, 0, 0).unwrap(), - scoping: Scoping { - organization_id: OrganizationId::new(1), - project_id: ProjectId::new(42), - project_key: "12333333333333333333333333333333".parse().unwrap(), - key_id: Some(3), - }, - retention: Retention { - standard: 90, - downsampled: 90, - }, - } - } - - fn get_str_attr<'a>(item: &'a StoreTraceItem, key: &str) -> Option<&'a str> { - item.trace_item - .attributes - .get(key) - .and_then(|v| match v.value.as_ref()? { - any_value::Value::StringValue(s) => Some(s.as_str()), - _ => None, - }) - } - - fn get_bool_attr(item: &StoreTraceItem, key: &str) -> Option { - item.trace_item - .attributes - .get(key) - .and_then(|v| match v.value.as_ref()? { - any_value::Value::BoolValue(b) => Some(*b), - _ => None, - }) - } - - #[test] - fn test_convert_session_update() { - let session = SessionUpdate { - session_id: Uuid::new_v4(), - distinct_id: Some("user-123".to_owned()), - sequence: 1, - init: true, - timestamp: Utc.with_ymd_and_hms(2024, 1, 1, 10, 0, 0).unwrap(), - started: Utc.with_ymd_and_hms(2024, 1, 1, 9, 0, 0).unwrap(), - duration: Some(3600.0), - status: SessionStatus::Ok, - errors: 2, - attributes: SessionAttributes { - release: "1.0.0".to_owned(), - environment: Some("production".to_owned()), - ip_address: None, - user_agent: Some("TestAgent/1.0".to_owned()), - }, - abnormal_mechanism: AbnormalMechanism::None, - }; - - let ctx = test_context(); - let result = convert_session_update(&session, &ctx); - - // Check item_type matches i32::from(TraceItemType::UserSession) (12) - assert_eq!( - result.trace_item.item_type, - i32::from(TraceItemType::UserSession) - ); - assert_eq!(result.trace_item.organization_id, 1); - assert_eq!(result.trace_item.project_id, 42); - assert_eq!(result.trace_item.retention_days, 90); - - // Check attributes - let attrs = &result.trace_item.attributes; - assert!(attrs.contains_key("session_id")); - assert!(attrs.contains_key("crashed")); - assert!(attrs.contains_key("error_count")); - assert!(attrs.contains_key("user_id")); - assert!(attrs.contains_key("release")); - assert!(attrs.contains_key("environment")); - } - - #[test] - fn test_convert_crashed_session() { - let session = SessionUpdate { - session_id: Uuid::new_v4(), - distinct_id: None, - sequence: 0, - init: false, - timestamp: Utc::now(), - started: Utc::now(), - duration: None, - status: SessionStatus::Crashed, - errors: 1, - attributes: SessionAttributes { - release: "1.0.0".to_owned(), - environment: None, - ip_address: None, - user_agent: None, - }, - abnormal_mechanism: AbnormalMechanism::None, - }; - - let ctx = test_context(); - let result = convert_session_update(&session, &ctx); - - assert_eq!(get_bool_attr(&result, "crashed"), Some(true)); - } - - #[test] - fn test_deterministic_ids() { - let session = SessionUpdate { - session_id: Uuid::parse_str("550e8400-e29b-41d4-a716-446655440000").unwrap(), - distinct_id: None, - sequence: 0, - init: true, - timestamp: Utc::now(), - started: Utc::now(), - duration: None, - status: SessionStatus::Ok, - errors: 0, - attributes: SessionAttributes { - release: "1.0.0".to_owned(), - environment: None, - ip_address: None, - user_agent: None, - }, - abnormal_mechanism: AbnormalMechanism::None, - }; - - let ctx = test_context(); - - // Convert the same session twice - let result1 = convert_session_update(&session, &ctx); - let result2 = convert_session_update(&session, &ctx); - - // IDs should be deterministic (same input = same output) - assert_eq!(result1.trace_item.trace_id, result2.trace_item.trace_id); - assert_eq!(result1.trace_item.item_id, result2.trace_item.item_id); - } - - #[test] - fn test_convert_session_aggregate_expands_to_multiple_rows() { - let aggregate = SessionAggregateItem { - started: Utc.with_ymd_and_hms(2024, 1, 1, 10, 0, 0).unwrap(), - distinct_id: Some("user-456".to_owned()), - exited: 3, - errored: 1, - abnormal: 0, - unhandled: 0, - crashed: 2, - }; - - let ctx = test_context(); - let results = convert_session_aggregate(&aggregate, "1.0.0", Some("production"), &ctx); - - // Should produce 6 rows: 3 exited + 1 errored + 2 crashed - assert_eq!(results.len(), 6); - - // Check status distribution - let statuses: Vec<_> = results - .iter() - .filter_map(|r| get_str_attr(r, "status")) - .collect(); - assert_eq!(statuses.iter().filter(|&&s| s == "exited").count(), 3); - assert_eq!(statuses.iter().filter(|&&s| s == "errored").count(), 1); - assert_eq!(statuses.iter().filter(|&&s| s == "crashed").count(), 2); - - // Check that crashed sessions have crashed=true - for result in &results { - let is_crashed_status = get_str_attr(result, "status") == Some("crashed"); - assert_eq!(get_bool_attr(result, "crashed"), Some(is_crashed_status)); - } - - // All items should have common attributes - for result in &results { - assert_eq!( - result.trace_item.item_type, - i32::from(TraceItemType::UserSession) - ); - assert_eq!(result.trace_item.organization_id, 1); - assert_eq!(result.trace_item.project_id, 42); - assert!(get_str_attr(result, "session_id").is_some()); - assert!(get_str_attr(result, "release").is_some()); - assert!(get_str_attr(result, "environment").is_some()); - assert!(get_str_attr(result, "user_id").is_some()); - assert!(get_bool_attr(result, "init").is_some()); - } - } - - #[test] - fn test_convert_empty_aggregate_produces_no_rows() { - let aggregate = SessionAggregateItem { - started: Utc.with_ymd_and_hms(2024, 1, 1, 10, 0, 0).unwrap(), - distinct_id: None, - exited: 0, - errored: 0, - abnormal: 0, - unhandled: 0, - crashed: 0, - }; - - let ctx = test_context(); - let results = convert_session_aggregate(&aggregate, "1.0.0", None, &ctx); - - assert!(results.is_empty()); - } - - #[test] - fn test_aggregate_rows_have_unique_deterministic_ids() { - let aggregate = SessionAggregateItem { - started: Utc.with_ymd_and_hms(2024, 1, 1, 10, 0, 0).unwrap(), - distinct_id: Some("user-789".to_owned()), - exited: 2, - errored: 0, - abnormal: 0, - unhandled: 0, - crashed: 1, - }; - - let ctx = test_context(); - - // Convert twice to verify determinism - let results1 = convert_session_aggregate(&aggregate, "1.0.0", None, &ctx); - let results2 = convert_session_aggregate(&aggregate, "1.0.0", None, &ctx); - - assert_eq!(results1.len(), results2.len()); - - // Each row should have a unique item_id - let ids1: Vec<_> = results1 - .iter() - .map(|r| r.trace_item.item_id.clone()) - .collect(); - let ids2: Vec<_> = results2 - .iter() - .map(|r| r.trace_item.item_id.clone()) - .collect(); - - // All IDs should be unique within a result set - let unique_ids1: std::collections::HashSet<_> = ids1.iter().collect(); - assert_eq!(unique_ids1.len(), ids1.len()); - - // IDs should be deterministic across conversions - assert_eq!(ids1, ids2); - } -} diff --git a/relay-server/src/services/processor.rs b/relay-server/src/services/processor.rs index e4d99a4a526..f16f9f25fc3 100644 --- a/relay-server/src/services/processor.rs +++ b/relay-server/src/services/processor.rs @@ -2449,10 +2449,8 @@ impl EnvelopeProcessorService { /// Processes metric buckets and sends them to kafka. /// - /// This function runs the following steps: - /// - cardinality limiting - /// - rate limiting - /// - submit to `StoreForwarder` + /// Applies rate limiting and cardinality limiting, then submits to the store. + /// If `UserSessionsEap` is enabled, session metrics are also sent to EAP. #[cfg(feature = "processing")] async fn encode_metrics_processing( &self, @@ -2460,7 +2458,9 @@ impl EnvelopeProcessorService { store_forwarder: &Addr, ) { use crate::constants::DEFAULT_EVENT_RETENTION; - use crate::services::store::StoreMetrics; + use crate::services::store::{StoreMetrics, StoreSessionMetricsEap}; + use relay_dynamic_config::Feature; + use relay_metrics::MetricNamespace; for ProjectBuckets { buckets, @@ -2487,13 +2487,28 @@ impl EnvelopeProcessorService { .event_retention .unwrap_or(DEFAULT_EVENT_RETENTION); - // The store forwarder takes care of bucket splitting internally, so we can submit the - // entire list of buckets. There is no batching needed here. + // Always send all buckets to the legacy metrics path. store_forwarder.send(StoreMetrics { - buckets, + buckets: buckets.clone(), scoping, retention, }); + + // If EAP double-write is enabled, also send session metrics to EAP. + if project_info.config.features.has(Feature::UserSessionsEap) { + let session_buckets: Vec<_> = buckets + .into_iter() + .filter(|b| b.name.namespace() == MetricNamespace::Sessions) + .collect(); + + if !session_buckets.is_empty() { + store_forwarder.send(StoreSessionMetricsEap { + buckets: session_buckets, + scoping, + retention, + }); + } + } } } diff --git a/relay-server/src/services/store.rs b/relay-server/src/services/store.rs index 20ace0733dd..143b7ff93dc 100644 --- a/relay-server/src/services/store.rs +++ b/relay-server/src/services/store.rs @@ -103,6 +103,16 @@ pub struct StoreMetrics { pub retention: u16, } +/// Publishes session metric buckets to EAP via the snuba-items topic. +/// +/// Used when the `UserSessionsEap` feature flag is enabled for double-write. +#[derive(Clone, Debug)] +pub struct StoreSessionMetricsEap { + pub buckets: Vec, + pub scoping: Scoping, + pub retention: u16, +} + /// Publishes a log item to the Sentry core application through Kafka. #[derive(Debug)] pub struct StoreTraceItem { @@ -175,6 +185,8 @@ pub enum Store { Envelope(StoreEnvelope), /// Aggregated generic metrics. Metrics(StoreMetrics), + /// Session metrics routed to EAP (Event Analytics Platform). + SessionMetricsEap(StoreSessionMetricsEap), /// A singular [`TraceItem`]. TraceItem(Managed), /// A singular Span. @@ -189,6 +201,7 @@ impl Store { match self { Store::Envelope(_) => "envelope", Store::Metrics(_) => "metrics", + Store::SessionMetricsEap(_) => "session_metrics_eap", Store::TraceItem(_) => "log", Store::Span(_) => "span", Store::ProfileChunk(_) => "profile_chunk", @@ -214,6 +227,14 @@ impl FromMessage for Store { } } +impl FromMessage for Store { + type Response = NoResponse; + + fn from_message(message: StoreSessionMetricsEap, _: ()) -> Self { + Self::SessionMetricsEap(message) + } +} + impl FromMessage> for Store { type Response = NoResponse; @@ -273,6 +294,7 @@ impl StoreService { match message { Store::Envelope(message) => self.handle_store_envelope(message), Store::Metrics(message) => self.handle_store_metrics(message), + Store::SessionMetricsEap(message) => self.handle_store_session_metrics_eap(message), Store::TraceItem(message) => self.handle_store_trace_item(message), Store::Span(message) => self.handle_store_span(message), Store::ProfileChunk(message) => self.handle_store_profile_chunk(message), @@ -592,6 +614,162 @@ impl StoreService { } } + /// Sends session metric buckets to EAP (snuba-items topic) as TraceItems. + fn handle_store_session_metrics_eap(&self, message: StoreSessionMetricsEap) { + use sentry_protos::snuba::v1::{AnyValue, TraceItem, TraceItemType, any_value}; + use std::collections::HashMap; + + const NAMESPACE: uuid::Uuid = uuid::Uuid::from_bytes([ + 0xb2, 0xc3, 0xd4, 0xe5, 0xf6, 0x78, 0x90, 0xab, 0xcd, 0xef, 0x12, 0x34, 0x56, 0x78, + 0x90, 0xa1, + ]); + + let StoreSessionMetricsEap { + buckets, + scoping, + retention, + } = message; + let now = UnixTimestamp::now(); + let mut error = None; + let mut row_count: u64 = 0; + + let kafka_headers = BTreeMap::from([ + ("project_id".into(), scoping.project_id.to_string()), + ( + "item_type".into(), + i32::from(TraceItemType::UserSession).to_string(), + ), + ]); + + for bucket in buckets { + let tags_str = bucket + .tags + .iter() + .map(|(k, v)| format!("{k}={v}")) + .collect::>() + .join(","); + + // Build base attributes from tags. + let mut base_attributes: HashMap = HashMap::new(); + for (tag, attr) in [ + ("session.status", "status"), + ("release", "release"), + ("environment", "environment"), + ("sdk", "sdk"), + ("abnormal_mechanism", "abnormal_mechanism"), + ] { + if let Some(v) = bucket.tags.get(tag) { + base_attributes.insert( + attr.into(), + AnyValue { + value: Some(any_value::Value::StringValue(v.into())), + }, + ); + } + } + + let timestamp = Some(prost_types::Timestamp { + seconds: bucket.timestamp.as_secs() as i64, + nanos: 0, + }); + let received = Some(prost_types::Timestamp { + seconds: now.as_secs() as i64, + nanos: 0, + }); + + // Helper to emit a single EAP row. + let mut emit_row = |key_suffix: &str, attributes: HashMap| { + let item_key = format!( + "{}_{}_{}_{}_{}_{}", + scoping.project_id, + bucket.name, + bucket.timestamp.as_secs(), + tags_str, + key_suffix, + scoping.organization_id, + ); + let uuid = uuid::Uuid::new_v5(&NAMESPACE, item_key.as_bytes()); + + let trace_item = TraceItem { + organization_id: scoping.organization_id.value(), + project_id: scoping.project_id.value(), + trace_id: uuid.to_string(), + item_id: uuid.as_bytes()[..16].into(), + item_type: i32::from(TraceItemType::UserSession), + timestamp: timestamp.clone(), + received: received.clone(), + retention_days: retention.into(), + downsampled_retention_days: retention.into(), + attributes, + client_sample_rate: 1.0, + server_sample_rate: 1.0, + }; + + match self.produce( + KafkaTopic::Items, + KafkaMessage::Item { + headers: kafka_headers.clone(), + message: trace_item, + item_type: TraceItemType::UserSession, + }, + ) { + Ok(()) => row_count += 1, + Err(e) => { + error.get_or_insert(e); + } + } + }; + + // For Counter buckets: emit one row with session_count. + // For Set buckets: emit one row per hashed ID for proper unique counting. + match &bucket.value { + relay_metrics::BucketValue::Counter(c) => { + let mut attributes = base_attributes.clone(); + attributes.insert( + "session_count".into(), + AnyValue { + value: Some(any_value::Value::DoubleValue((*c).into())), + }, + ); + emit_row("", attributes); + } + relay_metrics::BucketValue::Set(s) => { + let hash_attr = if bucket.name.to_string().contains("/user@") { + "user_id" + } else { + "errored_session_id" + }; + + for hashed_id in s.iter() { + let mut attributes = base_attributes.clone(); + attributes.insert( + hash_attr.into(), + AnyValue { + value: Some(any_value::Value::IntValue(*hashed_id as i64)), + }, + ); + emit_row(&hashed_id.to_string(), attributes); + } + } + // Distribution and Gauge not used for sessions, skip them. + _ => {} + } + } + + if row_count > 0 { + metric!( + counter(RelayCounters::SessionsEapProduced) += row_count, + session_type = "metric_bucket" + ); + } + if let Some(e) = error { + relay_log::error!( + error = &e as &dyn std::error::Error, + "failed to produce session metrics to EAP: {e}" + ); + } + } + fn handle_store_trace_item(&self, message: Managed) { let scoping = message.scoping(); let received_at = message.received_at(); diff --git a/relay-server/src/statsd.rs b/relay-server/src/statsd.rs index f27e65e12dc..9b8803f9cfc 100644 --- a/relay-server/src/statsd.rs +++ b/relay-server/src/statsd.rs @@ -824,14 +824,14 @@ pub enum RelayCounters { /// Number of spans produced in the new format. #[cfg(feature = "processing")] SpanV2Produced, - /// Number of user sessions produced to the EAP snuba-items topic. + /// Number of session metric buckets produced to the EAP snuba-items topic. /// /// This metric is only emitted when the `UserSessionsEap` feature flag is enabled, - /// tracking double-writes of session data to the EAP (Event Analytics Platform). + /// routing aggregated session metrics to EAP (Event Analytics Platform) instead of + /// the legacy ingest-metrics topic. /// /// This metric is tagged with: - /// - `session_type`: Either `"update"` for individual session updates or `"aggregate"` for - /// pre-aggregated session data. + /// - `session_type`: Currently always `"metric_bucket"` for aggregated session metrics. #[cfg(feature = "processing")] SessionsEapProduced, /// Number of events that hit any of the store-like endpoints: Envelope, Store, Security, From d56e86734ecfd97df74c6ea931a6512ee30246e4 Mon Sep 17 00:00:00 2001 From: Noah Martin Date: Wed, 4 Feb 2026 12:14:57 -0800 Subject: [PATCH 3/9] Move eap switch to store --- relay-dynamic-config/src/feature.rs | 9 --- relay-dynamic-config/src/global.rs | 14 ++++ relay-server/src/processing/sessions/mod.rs | 1 - relay-server/src/services/processor.rs | 31 +++------ relay-server/src/services/store.rs | 72 ++++++++++----------- relay-server/src/statsd.rs | 4 +- 6 files changed, 59 insertions(+), 72 deletions(-) diff --git a/relay-dynamic-config/src/feature.rs b/relay-dynamic-config/src/feature.rs index 748b7faad55..ab0f9831602 100644 --- a/relay-dynamic-config/src/feature.rs +++ b/relay-dynamic-config/src/feature.rs @@ -138,15 +138,6 @@ pub enum Feature { /// Enable the experimental Trace Attachment pipeline in Relay. #[serde(rename = "projects:trace-attachment-processing")] TraceAttachmentProcessing, - /// Enable EAP (Event Analytics Platform) double-write for user sessions. - /// - /// When enabled, session data is sent both through the legacy metrics pipeline - /// and directly to the snuba-items topic as TRACE_ITEM_TYPE_USER_SESSION. - /// This enables migration to the new EAP-based user sessions storage. - /// - /// Serialized as `organizations:user-sessions-eap`. - #[serde(rename = "organizations:user-sessions-eap")] - UserSessionsEap, /// Forward compatibility. #[doc(hidden)] #[serde(other)] diff --git a/relay-dynamic-config/src/global.rs b/relay-dynamic-config/src/global.rs index f8ff4eb9ab1..4a6a20a3244 100644 --- a/relay-dynamic-config/src/global.rs +++ b/relay-dynamic-config/src/global.rs @@ -176,6 +176,20 @@ pub struct Options { )] pub objectstore_attachments_sample_rate: f32, + /// Rollout rate for the EAP (Event Analytics Platform) double-write for user sessions. + /// + /// When rolled out, session data is sent both through the legacy metrics pipeline + /// and directly to the snuba-items topic as TRACE_ITEM_TYPE_USER_SESSION. + /// + /// Rate needs to be between `0.0` and `1.0`. + /// Rollout is determined deterministically per organization ID. + #[serde( + rename = "relay.sessions-eap.rollout-rate", + deserialize_with = "default_on_error", + skip_serializing_if = "is_default" + )] + pub sessions_eap_rollout_rate: f32, + /// All other unknown options. #[serde(flatten)] other: HashMap, diff --git a/relay-server/src/processing/sessions/mod.rs b/relay-server/src/processing/sessions/mod.rs index bc5d0533e2f..9e3614b6573 100644 --- a/relay-server/src/processing/sessions/mod.rs +++ b/relay-server/src/processing/sessions/mod.rs @@ -104,7 +104,6 @@ impl processing::Processor for SessionsProcessor { let sessions = self.limiter.enforce_quotas(sessions, ctx).await?; let sessions = process::extract_metrics(sessions, ctx); - Ok(Output::metrics(sessions)) } } diff --git a/relay-server/src/services/processor.rs b/relay-server/src/services/processor.rs index f16f9f25fc3..e4d99a4a526 100644 --- a/relay-server/src/services/processor.rs +++ b/relay-server/src/services/processor.rs @@ -2449,8 +2449,10 @@ impl EnvelopeProcessorService { /// Processes metric buckets and sends them to kafka. /// - /// Applies rate limiting and cardinality limiting, then submits to the store. - /// If `UserSessionsEap` is enabled, session metrics are also sent to EAP. + /// This function runs the following steps: + /// - cardinality limiting + /// - rate limiting + /// - submit to `StoreForwarder` #[cfg(feature = "processing")] async fn encode_metrics_processing( &self, @@ -2458,9 +2460,7 @@ impl EnvelopeProcessorService { store_forwarder: &Addr, ) { use crate::constants::DEFAULT_EVENT_RETENTION; - use crate::services::store::{StoreMetrics, StoreSessionMetricsEap}; - use relay_dynamic_config::Feature; - use relay_metrics::MetricNamespace; + use crate::services::store::StoreMetrics; for ProjectBuckets { buckets, @@ -2487,28 +2487,13 @@ impl EnvelopeProcessorService { .event_retention .unwrap_or(DEFAULT_EVENT_RETENTION); - // Always send all buckets to the legacy metrics path. + // The store forwarder takes care of bucket splitting internally, so we can submit the + // entire list of buckets. There is no batching needed here. store_forwarder.send(StoreMetrics { - buckets: buckets.clone(), + buckets, scoping, retention, }); - - // If EAP double-write is enabled, also send session metrics to EAP. - if project_info.config.features.has(Feature::UserSessionsEap) { - let session_buckets: Vec<_> = buckets - .into_iter() - .filter(|b| b.name.namespace() == MetricNamespace::Sessions) - .collect(); - - if !session_buckets.is_empty() { - store_forwarder.send(StoreSessionMetricsEap { - buckets: session_buckets, - scoping, - retention, - }); - } - } } } diff --git a/relay-server/src/services/store.rs b/relay-server/src/services/store.rs index 143b7ff93dc..b8e8a344c27 100644 --- a/relay-server/src/services/store.rs +++ b/relay-server/src/services/store.rs @@ -103,16 +103,6 @@ pub struct StoreMetrics { pub retention: u16, } -/// Publishes session metric buckets to EAP via the snuba-items topic. -/// -/// Used when the `UserSessionsEap` feature flag is enabled for double-write. -#[derive(Clone, Debug)] -pub struct StoreSessionMetricsEap { - pub buckets: Vec, - pub scoping: Scoping, - pub retention: u16, -} - /// Publishes a log item to the Sentry core application through Kafka. #[derive(Debug)] pub struct StoreTraceItem { @@ -185,8 +175,6 @@ pub enum Store { Envelope(StoreEnvelope), /// Aggregated generic metrics. Metrics(StoreMetrics), - /// Session metrics routed to EAP (Event Analytics Platform). - SessionMetricsEap(StoreSessionMetricsEap), /// A singular [`TraceItem`]. TraceItem(Managed), /// A singular Span. @@ -201,7 +189,6 @@ impl Store { match self { Store::Envelope(_) => "envelope", Store::Metrics(_) => "metrics", - Store::SessionMetricsEap(_) => "session_metrics_eap", Store::TraceItem(_) => "log", Store::Span(_) => "span", Store::ProfileChunk(_) => "profile_chunk", @@ -227,14 +214,6 @@ impl FromMessage for Store { } } -impl FromMessage for Store { - type Response = NoResponse; - - fn from_message(message: StoreSessionMetricsEap, _: ()) -> Self { - Self::SessionMetricsEap(message) - } -} - impl FromMessage> for Store { type Response = NoResponse; @@ -294,7 +273,6 @@ impl StoreService { match message { Store::Envelope(message) => self.handle_store_envelope(message), Store::Metrics(message) => self.handle_store_metrics(message), - Store::SessionMetricsEap(message) => self.handle_store_session_metrics_eap(message), Store::TraceItem(message) => self.handle_store_trace_item(message), Store::Span(message) => self.handle_store_span(message), Store::ProfileChunk(message) => self.handle_store_profile_chunk(message), @@ -543,9 +521,18 @@ impl StoreService { let global_config = self.global_config.current(); let mut encoder = BucketEncoder::new(&global_config); + // Check if this organization is rolled out for sessions EAP double-write. + let sessions_eap_rollout_rate = global_config.options.sessions_eap_rollout_rate; + let emit_sessions_to_eap = + utils::is_rolled_out(scoping.organization_id.value(), sessions_eap_rollout_rate) + .is_keep(); + let now = UnixTimestamp::now(); let mut delay_stats = ByNamespace::<(u64, u64, u64)>::default(); + // Collect session buckets for EAP if rolled out. + let mut session_buckets = Vec::new(); + for mut bucket in buckets { let namespace = encoder.prepare(&mut bucket); @@ -557,6 +544,11 @@ impl StoreService { *max = (*max).max(delay); } + // Collect session buckets for EAP double-write. + if emit_sessions_to_eap && namespace == MetricNamespace::Sessions { + session_buckets.push(bucket.clone()); + } + // Create a local bucket view to avoid splitting buckets unnecessarily. Since we produce // each bucket separately, we only need to split buckets that exceed the size, but not // batches. @@ -588,6 +580,11 @@ impl StoreService { } } + // Also emit session buckets to EAP if rolled out. + if !session_buckets.is_empty() { + self.emit_session_metrics_to_eap(&session_buckets, scoping, retention); + } + if let Some(error) = error { relay_log::error!( error = &error as &dyn std::error::Error, @@ -614,21 +611,21 @@ impl StoreService { } } - /// Sends session metric buckets to EAP (snuba-items topic) as TraceItems. - fn handle_store_session_metrics_eap(&self, message: StoreSessionMetricsEap) { + /// Emits session metric buckets to EAP (snuba-items topic) as TraceItems. + /// + /// This is called when the organization is rolled out for sessions EAP double-write + /// via `sessions_eap_rollout_rate`. + fn emit_session_metrics_to_eap(&self, buckets: &[Bucket], scoping: Scoping, retention: u16) { use sentry_protos::snuba::v1::{AnyValue, TraceItem, TraceItemType, any_value}; use std::collections::HashMap; + use crate::processing::utils::store::uuid_to_item_id; + const NAMESPACE: uuid::Uuid = uuid::Uuid::from_bytes([ 0xb2, 0xc3, 0xd4, 0xe5, 0xf6, 0x78, 0x90, 0xab, 0xcd, 0xef, 0x12, 0x34, 0x56, 0x78, 0x90, 0xa1, ]); - let StoreSessionMetricsEap { - buckets, - scoping, - retention, - } = message; let now = UnixTimestamp::now(); let mut error = None; let mut row_count: u64 = 0; @@ -641,6 +638,11 @@ impl StoreService { ), ]); + let received = Some(prost_types::Timestamp { + seconds: now.as_secs() as i64, + nanos: 0, + }); + for bucket in buckets { let tags_str = bucket .tags @@ -672,10 +674,6 @@ impl StoreService { seconds: bucket.timestamp.as_secs() as i64, nanos: 0, }); - let received = Some(prost_types::Timestamp { - seconds: now.as_secs() as i64, - nanos: 0, - }); // Helper to emit a single EAP row. let mut emit_row = |key_suffix: &str, attributes: HashMap| { @@ -693,11 +691,11 @@ impl StoreService { let trace_item = TraceItem { organization_id: scoping.organization_id.value(), project_id: scoping.project_id.value(), - trace_id: uuid.to_string(), - item_id: uuid.as_bytes()[..16].into(), + trace_id: uuid.as_simple().to_string(), + item_id: uuid_to_item_id(uuid), item_type: i32::from(TraceItemType::UserSession), - timestamp: timestamp.clone(), - received: received.clone(), + timestamp, + received, retention_days: retention.into(), downsampled_retention_days: retention.into(), attributes, diff --git a/relay-server/src/statsd.rs b/relay-server/src/statsd.rs index 9b8803f9cfc..e08a6b467c2 100644 --- a/relay-server/src/statsd.rs +++ b/relay-server/src/statsd.rs @@ -826,8 +826,8 @@ pub enum RelayCounters { SpanV2Produced, /// Number of session metric buckets produced to the EAP snuba-items topic. /// - /// This metric is only emitted when the `UserSessionsEap` feature flag is enabled, - /// routing aggregated session metrics to EAP (Event Analytics Platform) instead of + /// This metric is only emitted when the organization is rolled out via `sessions_eap_rollout_rate`, + /// routing aggregated session metrics to EAP (Event Analytics Platform) in addition to /// the legacy ingest-metrics topic. /// /// This metric is tagged with: From 4155961488df11d74c2ae307d4b65b20cec2f9a3 Mon Sep 17 00:00:00 2001 From: Noah Martin Date: Thu, 5 Feb 2026 09:35:10 -0800 Subject: [PATCH 4/9] Add changelog entry for sessions EAP double-write Co-Authored-By: Claude Opus 4.5 --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 73cfce1e16b..535bcdc3ca4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,7 @@ **Internal**: +- Add EAP double-write for session data. ([#5588](https://github.com/getsentry/relay/pull/5588)) - Embed AI operation type mappings into Relay. ([#5555](https://github.com/getsentry/relay/pull/5555)) - Use new processor architecture to process transactions. ([#5379](https://github.com/getsentry/relay/pull/5379)) - Add `gen_ai_response_time_to_first_token` as a `SpanData` attribute. ([#5575](https://github.com/getsentry/relay/pull/5575)) From de4305eb925847e4e8acb9402edbfe03392a4725 Mon Sep 17 00:00:00 2001 From: Noah Martin Date: Thu, 5 Feb 2026 20:00:34 -0800 Subject: [PATCH 5/9] Add tests --- tests/integration/test_sessions_eap.py | 168 +++++++++++++++++++++++++ 1 file changed, 168 insertions(+) create mode 100644 tests/integration/test_sessions_eap.py diff --git a/tests/integration/test_sessions_eap.py b/tests/integration/test_sessions_eap.py new file mode 100644 index 00000000000..be83ae96f73 --- /dev/null +++ b/tests/integration/test_sessions_eap.py @@ -0,0 +1,168 @@ +from datetime import datetime, timedelta, timezone +from unittest import mock + +from .asserts import time_within_delta + + +TEST_CONFIG = { + "outcomes": { + "emit_outcomes": True, + "batch_size": 1, + "batch_interval": 1, + "aggregator": { + "bucket_interval": 1, + "flush_interval": 1, + }, + }, + "aggregator": { + "bucket_interval": 1, + "initial_delay": 0, + }, +} + + +def test_session_eap_double_write( + mini_sentry, + relay_with_processing, + items_consumer, +): + """ + Test that session metrics are double-written to the EAP snuba-items topic + as TRACE_ITEM_TYPE_USER_SESSION TraceItems when the rollout rate is enabled. + Asserts the full Kafka payload for both the counter (session_count) and + set (user_id) metric buckets. + """ + items_consumer = items_consumer() + + project_id = 42 + project_config = mini_sentry.add_full_project_config(project_id) + project_config["config"]["sessionMetrics"] = {"version": 3} + + # Enable EAP double-write via global config. + mini_sentry.global_config["options"]["relay.sessions-eap.rollout-rate"] = 1.0 + + relay = relay_with_processing(options=TEST_CONFIG) + + timestamp = datetime.now(tz=timezone.utc) + started = timestamp - timedelta(hours=1) + + relay.send_session( + project_id, + { + "sid": "8333339f-5675-4f89-a9a0-1c935255ab58", + "did": "foobarbaz", + "seq": 42, + "init": True, + "timestamp": timestamp.isoformat(), + "started": started.isoformat(), + "duration": 1947.49, + "status": "exited", + "errors": 0, + "attrs": { + "release": "sentry-test@1.0.0", + "environment": "production", + }, + }, + ) + + items = items_consumer.get_items(n=2, timeout=10) + + # Separate counter and set items by their distinguishing attribute. + counter_items = [i for i in items if "session_count" in i.get("attributes", {})] + set_items = [i for i in items if "user_id" in i.get("attributes", {})] + + assert len(counter_items) == 1 + assert len(set_items) == 1 + + # Assert counter metric (c:sessions/session@none). + assert counter_items[0] == { + "organizationId": "1", + "projectId": "42", + "traceId": mock.ANY, + "itemId": mock.ANY, + "itemType": "TRACE_ITEM_TYPE_USER_SESSION", + "timestamp": time_within_delta(started, delta=timedelta(seconds=2)), + "received": time_within_delta(), + "retentionDays": mock.ANY, + "downsampledRetentionDays": mock.ANY, + "clientSampleRate": 1.0, + "serverSampleRate": 1.0, + "attributes": { + "status": {"stringValue": "init"}, + "release": {"stringValue": "sentry-test@1.0.0"}, + "environment": {"stringValue": "production"}, + "sdk": {"stringValue": "raven-node/2.6.3"}, + "session_count": {"doubleValue": 1.0}, + }, + } + + # Assert set metric (s:sessions/user@none). + # 1617781333 is the CRC32 hash of "foobarbaz". + assert set_items[0] == { + "organizationId": "1", + "projectId": "42", + "traceId": mock.ANY, + "itemId": mock.ANY, + "itemType": "TRACE_ITEM_TYPE_USER_SESSION", + "timestamp": time_within_delta(started, delta=timedelta(seconds=2)), + "received": time_within_delta(), + "retentionDays": mock.ANY, + "downsampledRetentionDays": mock.ANY, + "clientSampleRate": 1.0, + "serverSampleRate": 1.0, + "attributes": { + "release": {"stringValue": "sentry-test@1.0.0"}, + "environment": {"stringValue": "production"}, + "sdk": {"stringValue": "raven-node/2.6.3"}, + "user_id": {"intValue": "1617781333"}, + }, + } + + +def test_session_eap_double_write_disabled( + mini_sentry, + relay_with_processing, + items_consumer, + metrics_consumer, +): + """ + Test that sessions are NOT written to EAP when the rollout rate is 0 (default). + """ + items_consumer = items_consumer() + metrics_consumer = metrics_consumer() + + project_id = 42 + project_config = mini_sentry.add_full_project_config(project_id) + project_config["config"]["sessionMetrics"] = {"version": 3} + + # Don't set rollout-rate — defaults to 0.0. + + relay = relay_with_processing(options=TEST_CONFIG) + + timestamp = datetime.now(tz=timezone.utc) + started = timestamp - timedelta(hours=1) + + relay.send_session( + project_id, + { + "sid": "8333339f-5675-4f89-a9a0-1c935255ab58", + "did": "foobarbaz", + "seq": 42, + "init": True, + "timestamp": timestamp.isoformat(), + "started": started.isoformat(), + "duration": 1947.49, + "status": "exited", + "errors": 0, + "attrs": { + "release": "sentry-test@1.0.0", + "environment": "production", + }, + }, + ) + + # Wait for legacy metrics to confirm the session was fully processed. + metrics_consumer.get_metrics(n=2, timeout=10) + + # No items should appear on the EAP topic. + items_consumer.assert_empty() From 16801130df12bff39a492e60b1bed91b17041083 Mon Sep 17 00:00:00 2001 From: Noah Martin Date: Wed, 4 Feb 2026 12:14:57 -0800 Subject: [PATCH 6/9] Move eap switch to store --- relay-server/src/services/store.rs | 3 --- 1 file changed, 3 deletions(-) diff --git a/relay-server/src/services/store.rs b/relay-server/src/services/store.rs index b8e8a344c27..fd4d7603aff 100644 --- a/relay-server/src/services/store.rs +++ b/relay-server/src/services/store.rs @@ -530,9 +530,6 @@ impl StoreService { let now = UnixTimestamp::now(); let mut delay_stats = ByNamespace::<(u64, u64, u64)>::default(); - // Collect session buckets for EAP if rolled out. - let mut session_buckets = Vec::new(); - for mut bucket in buckets { let namespace = encoder.prepare(&mut bucket); From 1ab932d4f9a7981e559a046c16fbd8daf613629b Mon Sep 17 00:00:00 2001 From: David Herberth Date: Fri, 6 Feb 2026 12:54:37 +0100 Subject: [PATCH 7/9] Simplify conversion and tests --- relay-base-schema/src/metrics/name.rs | 23 +++ relay-dynamic-config/src/global.rs | 3 +- relay-server/src/services/store.rs | 212 +++----------------- relay-server/src/services/store/sessions.rs | 104 ++++++++++ relay-server/src/statsd.rs | 12 -- tests/integration/test_sessions_eap.py | 116 +++++------ 6 files changed, 203 insertions(+), 267 deletions(-) create mode 100644 relay-server/src/services/store/sessions.rs diff --git a/relay-base-schema/src/metrics/name.rs b/relay-base-schema/src/metrics/name.rs index 17f894ed123..37620490534 100644 --- a/relay-base-schema/src/metrics/name.rs +++ b/relay-base-schema/src/metrics/name.rs @@ -94,6 +94,29 @@ impl MetricName { .into_iter() .find(|namespace| maybe_namespace == namespace.as_str()) } + + /// Extracts the metric name from a well formed MRI. + /// + /// If the contained metric name is not a well formed MRI this function returns `None`. + /// + /// # Examples + /// + /// ``` + /// use relay_base_schema::metrics::{MetricName, MetricNamespace}; + /// + /// let name = MetricName::from("foo"); + /// assert!(name.try_name().is_none()); + /// + /// let name = MetricName::from("c:custom/foo@none"); + /// assert_eq!(name.try_name(), Some("foo")); + /// let name = MetricName::from("c:custom/foo"); + /// assert_eq!(name.try_name(), Some("foo")); + /// ``` + pub fn try_name(&self) -> Option<&str> { + // A well formed MRI is always in the format `:/[@]`. + let after_slash = self.0.get(2..)?.split('/').nth(1)?; + after_slash.split('@').next() + } } impl PartialEq for MetricName { diff --git a/relay-dynamic-config/src/global.rs b/relay-dynamic-config/src/global.rs index 4a6a20a3244..8fe0f70424b 100644 --- a/relay-dynamic-config/src/global.rs +++ b/relay-dynamic-config/src/global.rs @@ -179,10 +179,9 @@ pub struct Options { /// Rollout rate for the EAP (Event Analytics Platform) double-write for user sessions. /// /// When rolled out, session data is sent both through the legacy metrics pipeline - /// and directly to the snuba-items topic as TRACE_ITEM_TYPE_USER_SESSION. + /// and directly to the `snuba-items` topic as `TRACE_ITEM_TYPE_USER_SESSION`. /// /// Rate needs to be between `0.0` and `1.0`. - /// Rollout is determined deterministically per organization ID. #[serde( rename = "relay.sessions-eap.rollout-rate", deserialize_with = "default_on_error", diff --git a/relay-server/src/services/store.rs b/relay-server/src/services/store.rs index fd4d7603aff..3663ba6c16e 100644 --- a/relay-server/src/services/store.rs +++ b/relay-server/src/services/store.rs @@ -43,6 +43,8 @@ use crate::services::processor::Processed; use crate::statsd::{RelayCounters, RelayGauges, RelayTimers}; use crate::utils::{self, FormDataIter}; +mod sessions; + /// Fallback name used for attachment items without a `filename` header. const UNNAMED_ATTACHMENT: &str = "Unnamed Attachment"; @@ -521,11 +523,11 @@ impl StoreService { let global_config = self.global_config.current(); let mut encoder = BucketEncoder::new(&global_config); - // Check if this organization is rolled out for sessions EAP double-write. - let sessions_eap_rollout_rate = global_config.options.sessions_eap_rollout_rate; - let emit_sessions_to_eap = - utils::is_rolled_out(scoping.organization_id.value(), sessions_eap_rollout_rate) - .is_keep(); + let emit_sessions_to_eap = utils::is_rolled_out( + scoping.organization_id.value(), + global_config.options.sessions_eap_rollout_rate, + ) + .is_keep(); let now = UnixTimestamp::now(); let mut delay_stats = ByNamespace::<(u64, u64, u64)>::default(); @@ -541,11 +543,6 @@ impl StoreService { *max = (*max).max(delay); } - // Collect session buckets for EAP double-write. - if emit_sessions_to_eap && namespace == MetricNamespace::Sessions { - session_buckets.push(bucket.clone()); - } - // Create a local bucket view to avoid splitting buckets unnecessarily. Since we produce // each bucket separately, we only need to split buckets that exceed the size, but not // batches. @@ -575,11 +572,13 @@ impl StoreService { self.metric_outcomes.track(scoping, &[view], outcome); } - } - // Also emit session buckets to EAP if rolled out. - if !session_buckets.is_empty() { - self.emit_session_metrics_to_eap(&session_buckets, scoping, retention); + if emit_sessions_to_eap + && let Some(trace_item) = sessions::to_trace_item(scoping, bucket, retention) + { + let message = KafkaMessage::for_item(scoping, trace_item); + let _ = self.produce(KafkaTopic::Items, message); + } } if let Some(error) = error { @@ -608,178 +607,12 @@ impl StoreService { } } - /// Emits session metric buckets to EAP (snuba-items topic) as TraceItems. - /// - /// This is called when the organization is rolled out for sessions EAP double-write - /// via `sessions_eap_rollout_rate`. - fn emit_session_metrics_to_eap(&self, buckets: &[Bucket], scoping: Scoping, retention: u16) { - use sentry_protos::snuba::v1::{AnyValue, TraceItem, TraceItemType, any_value}; - use std::collections::HashMap; - - use crate::processing::utils::store::uuid_to_item_id; - - const NAMESPACE: uuid::Uuid = uuid::Uuid::from_bytes([ - 0xb2, 0xc3, 0xd4, 0xe5, 0xf6, 0x78, 0x90, 0xab, 0xcd, 0xef, 0x12, 0x34, 0x56, 0x78, - 0x90, 0xa1, - ]); - - let now = UnixTimestamp::now(); - let mut error = None; - let mut row_count: u64 = 0; - - let kafka_headers = BTreeMap::from([ - ("project_id".into(), scoping.project_id.to_string()), - ( - "item_type".into(), - i32::from(TraceItemType::UserSession).to_string(), - ), - ]); - - let received = Some(prost_types::Timestamp { - seconds: now.as_secs() as i64, - nanos: 0, - }); - - for bucket in buckets { - let tags_str = bucket - .tags - .iter() - .map(|(k, v)| format!("{k}={v}")) - .collect::>() - .join(","); - - // Build base attributes from tags. - let mut base_attributes: HashMap = HashMap::new(); - for (tag, attr) in [ - ("session.status", "status"), - ("release", "release"), - ("environment", "environment"), - ("sdk", "sdk"), - ("abnormal_mechanism", "abnormal_mechanism"), - ] { - if let Some(v) = bucket.tags.get(tag) { - base_attributes.insert( - attr.into(), - AnyValue { - value: Some(any_value::Value::StringValue(v.into())), - }, - ); - } - } - - let timestamp = Some(prost_types::Timestamp { - seconds: bucket.timestamp.as_secs() as i64, - nanos: 0, - }); - - // Helper to emit a single EAP row. - let mut emit_row = |key_suffix: &str, attributes: HashMap| { - let item_key = format!( - "{}_{}_{}_{}_{}_{}", - scoping.project_id, - bucket.name, - bucket.timestamp.as_secs(), - tags_str, - key_suffix, - scoping.organization_id, - ); - let uuid = uuid::Uuid::new_v5(&NAMESPACE, item_key.as_bytes()); - - let trace_item = TraceItem { - organization_id: scoping.organization_id.value(), - project_id: scoping.project_id.value(), - trace_id: uuid.as_simple().to_string(), - item_id: uuid_to_item_id(uuid), - item_type: i32::from(TraceItemType::UserSession), - timestamp, - received, - retention_days: retention.into(), - downsampled_retention_days: retention.into(), - attributes, - client_sample_rate: 1.0, - server_sample_rate: 1.0, - }; - - match self.produce( - KafkaTopic::Items, - KafkaMessage::Item { - headers: kafka_headers.clone(), - message: trace_item, - item_type: TraceItemType::UserSession, - }, - ) { - Ok(()) => row_count += 1, - Err(e) => { - error.get_or_insert(e); - } - } - }; - - // For Counter buckets: emit one row with session_count. - // For Set buckets: emit one row per hashed ID for proper unique counting. - match &bucket.value { - relay_metrics::BucketValue::Counter(c) => { - let mut attributes = base_attributes.clone(); - attributes.insert( - "session_count".into(), - AnyValue { - value: Some(any_value::Value::DoubleValue((*c).into())), - }, - ); - emit_row("", attributes); - } - relay_metrics::BucketValue::Set(s) => { - let hash_attr = if bucket.name.to_string().contains("/user@") { - "user_id" - } else { - "errored_session_id" - }; - - for hashed_id in s.iter() { - let mut attributes = base_attributes.clone(); - attributes.insert( - hash_attr.into(), - AnyValue { - value: Some(any_value::Value::IntValue(*hashed_id as i64)), - }, - ); - emit_row(&hashed_id.to_string(), attributes); - } - } - // Distribution and Gauge not used for sessions, skip them. - _ => {} - } - } - - if row_count > 0 { - metric!( - counter(RelayCounters::SessionsEapProduced) += row_count, - session_type = "metric_bucket" - ); - } - if let Some(e) = error { - relay_log::error!( - error = &e as &dyn std::error::Error, - "failed to produce session metrics to EAP: {e}" - ); - } - } - fn handle_store_trace_item(&self, message: Managed) { let scoping = message.scoping(); let received_at = message.received_at(); let quantities = message.try_accept(|item| { - let item_type = item.trace_item.item_type(); - let message = KafkaMessage::Item { - headers: BTreeMap::from([ - ("project_id".to_owned(), scoping.project_id.to_string()), - ("item_type".to_owned(), (item_type as i32).to_string()), - ]), - message: item.trace_item, - item_type, - }; - + let message = KafkaMessage::for_item(scoping, item.trace_item); self.produce(KafkaTopic::Items, message) .map(|()| item.quantities) }); @@ -1105,8 +938,8 @@ impl StoreService { } _ => KafkaTopic::MetricsGeneric, }; - let headers = BTreeMap::from([("namespace".to_owned(), namespace.to_string())]); + let headers = BTreeMap::from([("namespace".to_owned(), namespace.to_string())]); self.produce(topic, KafkaMessage::Metric { headers, message })?; Ok(()) } @@ -1735,6 +1568,21 @@ enum KafkaMessage<'a> { ReplayRecordingNotChunked(ReplayRecordingNotChunkedKafkaMessage<'a>), } +impl KafkaMessage<'_> { + /// Creates a [`KafkaMessage`] for a [`TraceItem`]. + fn for_item(scoping: Scoping, item: TraceItem) -> KafkaMessage<'static> { + let item_type = item.item_type(); + KafkaMessage::Item { + headers: BTreeMap::from([ + ("project_id".to_owned(), scoping.project_id.to_string()), + ("item_type".to_owned(), (item_type as i32).to_string()), + ]), + message: item, + item_type, + } + } +} + impl Message for KafkaMessage<'_> { fn variant(&self) -> &'static str { match self { diff --git a/relay-server/src/services/store/sessions.rs b/relay-server/src/services/store/sessions.rs new file mode 100644 index 00000000000..74012fd868c --- /dev/null +++ b/relay-server/src/services/store/sessions.rs @@ -0,0 +1,104 @@ +use std::collections::{BTreeSet, HashMap}; + +use relay_metrics::{Bucket, BucketValue, MetricNamespace}; +use relay_quotas::Scoping; +use sentry_protos::snuba::v1::{AnyValue, ArrayValue, TraceItem, TraceItemType, any_value}; + +/// UUID namespace used for deterministic item IDs. +const NAMESPACE: uuid::Uuid = uuid::Uuid::from_bytes([ + 0xb2, 0xc3, 0xd4, 0xe5, 0xf6, 0x78, 0x90, 0xab, 0xcd, 0xef, 0x12, 0x34, 0x56, 0x78, 0x90, 0xa1, +]); + +/// Converts a session [`Bucket`] into an EAP [`TraceItem`]. +pub fn to_trace_item(scoping: Scoping, bucket: Bucket, retention: u16) -> Option { + if bucket.name.namespace() != MetricNamespace::Sessions { + return None; + } + + // Currently max 5 tags + 1 metric value. + let mut attributes = HashMap::with_capacity(6); + + match bucket.name.try_name()? { + "session" => { + let BucketValue::Counter(v) = bucket.value else { + return None; + }; + attributes.insert( + "session_count".to_owned(), + AnyValue { + value: Some(any_value::Value::DoubleValue(v.into())), + }, + ); + } + "user" => { + let BucketValue::Set(set) = &bucket.value else { + return None; + }; + attributes.insert("user_id".to_owned(), set_to_attribute_value(set)); + } + "error" => { + let BucketValue::Set(set) = &bucket.value else { + return None; + }; + attributes.insert("errored_session_id".to_owned(), set_to_attribute_value(set)); + } + _ => return None, + } + + for (name, value) in bucket.tags.into_iter().filter_map(tag_to_attribute) { + attributes.insert(name, value); + } + + let uuid = uuid::Uuid::new_v5(&NAMESPACE, b"TODO"); + + Some(TraceItem { + organization_id: scoping.organization_id.value(), + project_id: scoping.project_id.value(), + trace_id: uuid.to_string(), + item_id: uuid.as_bytes()[..16].into(), + item_type: TraceItemType::UserSession.into(), + timestamp: Some(prost_types::Timestamp { + seconds: bucket.timestamp.as_secs() as i64, + nanos: 0, + }), + received: bucket + .metadata + .received_at + .map(|ts| prost_types::Timestamp { + seconds: ts.as_secs() as i64, + nanos: 0, + }), + retention_days: retention.into(), + downsampled_retention_days: retention.into(), + attributes, + client_sample_rate: 1.0, + server_sample_rate: 1.0, + }) +} + +fn set_to_attribute_value(set: &BTreeSet) -> AnyValue { + let values = set + .iter() + .map(|v| i64::from(*v)) + .map(any_value::Value::IntValue) + .map(|value| AnyValue { value: Some(value) }) + .collect(); + + AnyValue { + value: Some(any_value::Value::ArrayValue(ArrayValue { values })), + } +} + +fn tag_to_attribute((name, value): (String, String)) -> Option<(String, AnyValue)> { + let name = match name.as_str() { + "session.status" => "status".to_owned(), + "release" | "environment" | "sdk" | "abnormal_mechanism" => name, + _ => return None, + }; + + let value = AnyValue { + value: Some(any_value::Value::StringValue(value)), + }; + + Some((name, value)) +} diff --git a/relay-server/src/statsd.rs b/relay-server/src/statsd.rs index e08a6b467c2..d004890bd9c 100644 --- a/relay-server/src/statsd.rs +++ b/relay-server/src/statsd.rs @@ -824,16 +824,6 @@ pub enum RelayCounters { /// Number of spans produced in the new format. #[cfg(feature = "processing")] SpanV2Produced, - /// Number of session metric buckets produced to the EAP snuba-items topic. - /// - /// This metric is only emitted when the organization is rolled out via `sessions_eap_rollout_rate`, - /// routing aggregated session metrics to EAP (Event Analytics Platform) in addition to - /// the legacy ingest-metrics topic. - /// - /// This metric is tagged with: - /// - `session_type`: Currently always `"metric_bucket"` for aggregated session metrics. - #[cfg(feature = "processing")] - SessionsEapProduced, /// Number of events that hit any of the store-like endpoints: Envelope, Store, Security, /// Minidump, Unreal. /// @@ -1001,8 +991,6 @@ impl CounterMetric for RelayCounters { RelayCounters::ProcessingMessageProduced => "processing.event.produced", #[cfg(feature = "processing")] RelayCounters::SpanV2Produced => "store.produced.span_v2", - #[cfg(feature = "processing")] - RelayCounters::SessionsEapProduced => "sessions.eap.produced", RelayCounters::EventProtocol => "event.protocol", RelayCounters::EventTransaction => "event.transaction", RelayCounters::TransactionNameChanges => "event.transaction_name_changes", diff --git a/tests/integration/test_sessions_eap.py b/tests/integration/test_sessions_eap.py index be83ae96f73..b75e9cf1119 100644 --- a/tests/integration/test_sessions_eap.py +++ b/tests/integration/test_sessions_eap.py @@ -4,23 +4,6 @@ from .asserts import time_within_delta -TEST_CONFIG = { - "outcomes": { - "emit_outcomes": True, - "batch_size": 1, - "batch_interval": 1, - "aggregator": { - "bucket_interval": 1, - "flush_interval": 1, - }, - }, - "aggregator": { - "bucket_interval": 1, - "initial_delay": 0, - }, -} - - def test_session_eap_double_write( mini_sentry, relay_with_processing, @@ -41,7 +24,7 @@ def test_session_eap_double_write( # Enable EAP double-write via global config. mini_sentry.global_config["options"]["relay.sessions-eap.rollout-rate"] = 1.0 - relay = relay_with_processing(options=TEST_CONFIG) + relay = relay_with_processing() timestamp = datetime.now(tz=timezone.utc) started = timestamp - timedelta(hours=1) @@ -65,58 +48,50 @@ def test_session_eap_double_write( }, ) - items = items_consumer.get_items(n=2, timeout=10) - - # Separate counter and set items by their distinguishing attribute. - counter_items = [i for i in items if "session_count" in i.get("attributes", {})] - set_items = [i for i in items if "user_id" in i.get("attributes", {})] - - assert len(counter_items) == 1 - assert len(set_items) == 1 - - # Assert counter metric (c:sessions/session@none). - assert counter_items[0] == { - "organizationId": "1", - "projectId": "42", - "traceId": mock.ANY, - "itemId": mock.ANY, - "itemType": "TRACE_ITEM_TYPE_USER_SESSION", - "timestamp": time_within_delta(started, delta=timedelta(seconds=2)), - "received": time_within_delta(), - "retentionDays": mock.ANY, - "downsampledRetentionDays": mock.ANY, - "clientSampleRate": 1.0, - "serverSampleRate": 1.0, - "attributes": { - "status": {"stringValue": "init"}, - "release": {"stringValue": "sentry-test@1.0.0"}, - "environment": {"stringValue": "production"}, - "sdk": {"stringValue": "raven-node/2.6.3"}, - "session_count": {"doubleValue": 1.0}, + assert items_consumer.get_items(n=2) == [ + # Converted from: `c:sessions/session@none` + { + "organizationId": "1", + "projectId": "42", + "traceId": mock.ANY, + "itemId": mock.ANY, + "itemType": 12, + "timestamp": time_within_delta(started, delta=timedelta(seconds=2)), + "received": time_within_delta(), + "retentionDays": 90, + "downsampledRetentionDays": 90, + "clientSampleRate": 1.0, + "serverSampleRate": 1.0, + "attributes": { + "status": {"stringValue": "init"}, + "release": {"stringValue": "sentry-test@1.0.0"}, + "environment": {"stringValue": "production"}, + "sdk": {"stringValue": "raven-node/2.6.3"}, + "session_count": {"doubleValue": 1.0}, + }, }, - } - - # Assert set metric (s:sessions/user@none). - # 1617781333 is the CRC32 hash of "foobarbaz". - assert set_items[0] == { - "organizationId": "1", - "projectId": "42", - "traceId": mock.ANY, - "itemId": mock.ANY, - "itemType": "TRACE_ITEM_TYPE_USER_SESSION", - "timestamp": time_within_delta(started, delta=timedelta(seconds=2)), - "received": time_within_delta(), - "retentionDays": mock.ANY, - "downsampledRetentionDays": mock.ANY, - "clientSampleRate": 1.0, - "serverSampleRate": 1.0, - "attributes": { - "release": {"stringValue": "sentry-test@1.0.0"}, - "environment": {"stringValue": "production"}, - "sdk": {"stringValue": "raven-node/2.6.3"}, - "user_id": {"intValue": "1617781333"}, + # Converted from `s:sessions/user@none` + { + "organizationId": "1", + "projectId": "42", + "traceId": mock.ANY, + "itemId": mock.ANY, + "itemType": 12, + "timestamp": time_within_delta(started, delta=timedelta(seconds=2)), + "received": time_within_delta(), + "retentionDays": 90, + "downsampledRetentionDays": 90, + "clientSampleRate": 1.0, + "serverSampleRate": 1.0, + "attributes": { + "release": {"stringValue": "sentry-test@1.0.0"}, + "environment": {"stringValue": "production"}, + "sdk": {"stringValue": "raven-node/2.6.3"}, + # 1617781333 is the CRC32 hash of "foobarbaz". + "user_id": {"arrayValue": {"values": [{"intValue": "1617781333"}]}}, + }, }, - } + ] def test_session_eap_double_write_disabled( @@ -136,8 +111,7 @@ def test_session_eap_double_write_disabled( project_config["config"]["sessionMetrics"] = {"version": 3} # Don't set rollout-rate — defaults to 0.0. - - relay = relay_with_processing(options=TEST_CONFIG) + relay = relay_with_processing() timestamp = datetime.now(tz=timezone.utc) started = timestamp - timedelta(hours=1) @@ -162,7 +136,7 @@ def test_session_eap_double_write_disabled( ) # Wait for legacy metrics to confirm the session was fully processed. - metrics_consumer.get_metrics(n=2, timeout=10) + assert len(metrics_consumer.get_metrics(n=2)) == 2 # No items should appear on the EAP topic. items_consumer.assert_empty() From 6eed8f24dd8c71a5f73e1f642faff6987705f748 Mon Sep 17 00:00:00 2001 From: David Herberth Date: Fri, 6 Feb 2026 13:51:12 +0100 Subject: [PATCH 8/9] sort the items --- tests/integration/test_sessions_eap.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/tests/integration/test_sessions_eap.py b/tests/integration/test_sessions_eap.py index b75e9cf1119..04119380fe9 100644 --- a/tests/integration/test_sessions_eap.py +++ b/tests/integration/test_sessions_eap.py @@ -48,7 +48,11 @@ def test_session_eap_double_write( }, ) - assert items_consumer.get_items(n=2) == [ + items = sorted( + items_consumer.get_items(n=2), + key=lambda x: not x["attributes"].get("session_count"), + ) + assert items == [ # Converted from: `c:sessions/session@none` { "organizationId": "1", From 8fb00fc51952107e53c06437adf7b299a783d49d Mon Sep 17 00:00:00 2001 From: David Herberth Date: Fri, 6 Feb 2026 16:18:40 +0100 Subject: [PATCH 9/9] use a random uuid for trace and item id --- relay-server/src/services/store/sessions.rs | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/relay-server/src/services/store/sessions.rs b/relay-server/src/services/store/sessions.rs index 74012fd868c..afe97d62a36 100644 --- a/relay-server/src/services/store/sessions.rs +++ b/relay-server/src/services/store/sessions.rs @@ -4,10 +4,7 @@ use relay_metrics::{Bucket, BucketValue, MetricNamespace}; use relay_quotas::Scoping; use sentry_protos::snuba::v1::{AnyValue, ArrayValue, TraceItem, TraceItemType, any_value}; -/// UUID namespace used for deterministic item IDs. -const NAMESPACE: uuid::Uuid = uuid::Uuid::from_bytes([ - 0xb2, 0xc3, 0xd4, 0xe5, 0xf6, 0x78, 0x90, 0xab, 0xcd, 0xef, 0x12, 0x34, 0x56, 0x78, 0x90, 0xa1, -]); +use crate::processing::utils::store::uuid_to_item_id; /// Converts a session [`Bucket`] into an EAP [`TraceItem`]. pub fn to_trace_item(scoping: Scoping, bucket: Bucket, retention: u16) -> Option { @@ -49,13 +46,12 @@ pub fn to_trace_item(scoping: Scoping, bucket: Bucket, retention: u16) -> Option attributes.insert(name, value); } - let uuid = uuid::Uuid::new_v5(&NAMESPACE, b"TODO"); - + let uuid = uuid::Uuid::new_v4(); Some(TraceItem { organization_id: scoping.organization_id.value(), project_id: scoping.project_id.value(), trace_id: uuid.to_string(), - item_id: uuid.as_bytes()[..16].into(), + item_id: uuid_to_item_id(uuid), item_type: TraceItemType::UserSession.into(), timestamp: Some(prost_types::Timestamp { seconds: bucket.timestamp.as_secs() as i64,