diff --git a/CHANGELOG.md b/CHANGELOG.md index 2c405174060..7fad070fc66 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -15,6 +15,7 @@ **Internal**: +- Add EAP double-write for session data. ([#5588](https://github.com/getsentry/relay/pull/5588)) - Embed AI operation type mappings into Relay. ([#5555](https://github.com/getsentry/relay/pull/5555)) - Use new processor architecture to process transactions. ([#5379](https://github.com/getsentry/relay/pull/5379)) - Add `gen_ai_response_time_to_first_token` as a `SpanData` attribute. ([#5575](https://github.com/getsentry/relay/pull/5575)) diff --git a/relay-base-schema/src/metrics/name.rs b/relay-base-schema/src/metrics/name.rs index 17f894ed123..37620490534 100644 --- a/relay-base-schema/src/metrics/name.rs +++ b/relay-base-schema/src/metrics/name.rs @@ -94,6 +94,29 @@ impl MetricName { .into_iter() .find(|namespace| maybe_namespace == namespace.as_str()) } + + /// Extracts the metric name from a well formed MRI. + /// + /// If the contained metric name is not a well formed MRI this function returns `None`. + /// + /// # Examples + /// + /// ``` + /// use relay_base_schema::metrics::{MetricName, MetricNamespace}; + /// + /// let name = MetricName::from("foo"); + /// assert!(name.try_name().is_none()); + /// + /// let name = MetricName::from("c:custom/foo@none"); + /// assert_eq!(name.try_name(), Some("foo")); + /// let name = MetricName::from("c:custom/foo"); + /// assert_eq!(name.try_name(), Some("foo")); + /// ``` + pub fn try_name(&self) -> Option<&str> { + // A well formed MRI is always in the format `:/[@]`. + let after_slash = self.0.get(2..)?.split('/').nth(1)?; + after_slash.split('@').next() + } } impl PartialEq for MetricName { diff --git a/relay-dynamic-config/src/global.rs b/relay-dynamic-config/src/global.rs index f8ff4eb9ab1..8fe0f70424b 100644 --- a/relay-dynamic-config/src/global.rs +++ b/relay-dynamic-config/src/global.rs @@ -176,6 +176,19 @@ pub struct Options { )] pub objectstore_attachments_sample_rate: f32, + /// Rollout rate for the EAP (Event Analytics Platform) double-write for user sessions. + /// + /// When rolled out, session data is sent both through the legacy metrics pipeline + /// and directly to the `snuba-items` topic as `TRACE_ITEM_TYPE_USER_SESSION`. + /// + /// Rate needs to be between `0.0` and `1.0`. + #[serde( + rename = "relay.sessions-eap.rollout-rate", + deserialize_with = "default_on_error", + skip_serializing_if = "is_default" + )] + pub sessions_eap_rollout_rate: f32, + /// All other unknown options. #[serde(flatten)] other: HashMap, diff --git a/relay-server/src/processing/sessions/mod.rs b/relay-server/src/processing/sessions/mod.rs index 50f2c3204a5..9e3614b6573 100644 --- a/relay-server/src/processing/sessions/mod.rs +++ b/relay-server/src/processing/sessions/mod.rs @@ -103,7 +103,7 @@ impl processing::Processor for SessionsProcessor { let sessions = self.limiter.enforce_quotas(sessions, ctx).await?; - let sessions = process::extract(sessions, ctx); + let sessions = process::extract_metrics(sessions, ctx); Ok(Output::metrics(sessions)) } } diff --git a/relay-server/src/processing/sessions/process.rs b/relay-server/src/processing/sessions/process.rs index 8036969a930..3015c7fa020 100644 --- a/relay-server/src/processing/sessions/process.rs +++ b/relay-server/src/processing/sessions/process.rs @@ -212,7 +212,10 @@ fn normalize_attributes(attrs: &mut SessionAttributes, ctx: &NormalizeContext<'_ Ok(()) } -pub fn extract(sessions: Managed, ctx: Context<'_>) -> Managed { +pub fn extract_metrics( + sessions: Managed, + ctx: Context<'_>, +) -> Managed { let should_extract_abnormal_mechanism = ctx .project_info .config diff --git a/relay-server/src/services/store.rs b/relay-server/src/services/store.rs index 3b1786733c8..33e8d28c86d 100644 --- a/relay-server/src/services/store.rs +++ b/relay-server/src/services/store.rs @@ -43,6 +43,8 @@ use crate::services::processor::Processed; use crate::statsd::{RelayCounters, RelayGauges, RelayTimers}; use crate::utils::{self, FormDataIter}; +mod sessions; + /// Fallback name used for attachment items without a `filename` header. const UNNAMED_ATTACHMENT: &str = "Unnamed Attachment"; @@ -518,6 +520,12 @@ impl StoreService { let global_config = self.global_config.current(); let mut encoder = BucketEncoder::new(&global_config); + let emit_sessions_to_eap = utils::is_rolled_out( + scoping.organization_id.value(), + global_config.options.sessions_eap_rollout_rate, + ) + .is_keep(); + let now = UnixTimestamp::now(); let mut delay_stats = ByNamespace::<(u64, u64, u64)>::default(); @@ -561,6 +569,13 @@ impl StoreService { self.metric_outcomes.track(scoping, &[view], outcome); } + + if emit_sessions_to_eap + && let Some(trace_item) = sessions::to_trace_item(scoping, bucket, retention) + { + let message = KafkaMessage::for_item(scoping, trace_item); + let _ = self.produce(KafkaTopic::Items, message); + } } if let Some(error) = error { @@ -594,16 +609,7 @@ impl StoreService { let received_at = message.received_at(); let quantities = message.try_accept(|item| { - let item_type = item.trace_item.item_type(); - let message = KafkaMessage::Item { - headers: BTreeMap::from([ - ("project_id".to_owned(), scoping.project_id.to_string()), - ("item_type".to_owned(), (item_type as i32).to_string()), - ]), - message: item.trace_item, - item_type, - }; - + let message = KafkaMessage::for_item(scoping, item.trace_item); self.produce(KafkaTopic::Items, message) .map(|()| item.quantities) }); @@ -931,8 +937,8 @@ impl StoreService { } _ => KafkaTopic::MetricsGeneric, }; - let headers = BTreeMap::from([("namespace".to_owned(), namespace.to_string())]); + let headers = BTreeMap::from([("namespace".to_owned(), namespace.to_string())]); self.produce(topic, KafkaMessage::Metric { headers, message })?; Ok(()) } @@ -1561,6 +1567,21 @@ enum KafkaMessage<'a> { ReplayRecordingNotChunked(ReplayRecordingNotChunkedKafkaMessage<'a>), } +impl KafkaMessage<'_> { + /// Creates a [`KafkaMessage`] for a [`TraceItem`]. + fn for_item(scoping: Scoping, item: TraceItem) -> KafkaMessage<'static> { + let item_type = item.item_type(); + KafkaMessage::Item { + headers: BTreeMap::from([ + ("project_id".to_owned(), scoping.project_id.to_string()), + ("item_type".to_owned(), (item_type as i32).to_string()), + ]), + message: item, + item_type, + } + } +} + impl Message for KafkaMessage<'_> { fn variant(&self) -> &'static str { match self { diff --git a/relay-server/src/services/store/sessions.rs b/relay-server/src/services/store/sessions.rs new file mode 100644 index 00000000000..afe97d62a36 --- /dev/null +++ b/relay-server/src/services/store/sessions.rs @@ -0,0 +1,100 @@ +use std::collections::{BTreeSet, HashMap}; + +use relay_metrics::{Bucket, BucketValue, MetricNamespace}; +use relay_quotas::Scoping; +use sentry_protos::snuba::v1::{AnyValue, ArrayValue, TraceItem, TraceItemType, any_value}; + +use crate::processing::utils::store::uuid_to_item_id; + +/// Converts a session [`Bucket`] into an EAP [`TraceItem`]. +pub fn to_trace_item(scoping: Scoping, bucket: Bucket, retention: u16) -> Option { + if bucket.name.namespace() != MetricNamespace::Sessions { + return None; + } + + // Currently max 5 tags + 1 metric value. + let mut attributes = HashMap::with_capacity(6); + + match bucket.name.try_name()? { + "session" => { + let BucketValue::Counter(v) = bucket.value else { + return None; + }; + attributes.insert( + "session_count".to_owned(), + AnyValue { + value: Some(any_value::Value::DoubleValue(v.into())), + }, + ); + } + "user" => { + let BucketValue::Set(set) = &bucket.value else { + return None; + }; + attributes.insert("user_id".to_owned(), set_to_attribute_value(set)); + } + "error" => { + let BucketValue::Set(set) = &bucket.value else { + return None; + }; + attributes.insert("errored_session_id".to_owned(), set_to_attribute_value(set)); + } + _ => return None, + } + + for (name, value) in bucket.tags.into_iter().filter_map(tag_to_attribute) { + attributes.insert(name, value); + } + + let uuid = uuid::Uuid::new_v4(); + Some(TraceItem { + organization_id: scoping.organization_id.value(), + project_id: scoping.project_id.value(), + trace_id: uuid.to_string(), + item_id: uuid_to_item_id(uuid), + item_type: TraceItemType::UserSession.into(), + timestamp: Some(prost_types::Timestamp { + seconds: bucket.timestamp.as_secs() as i64, + nanos: 0, + }), + received: bucket + .metadata + .received_at + .map(|ts| prost_types::Timestamp { + seconds: ts.as_secs() as i64, + nanos: 0, + }), + retention_days: retention.into(), + downsampled_retention_days: retention.into(), + attributes, + client_sample_rate: 1.0, + server_sample_rate: 1.0, + }) +} + +fn set_to_attribute_value(set: &BTreeSet) -> AnyValue { + let values = set + .iter() + .map(|v| i64::from(*v)) + .map(any_value::Value::IntValue) + .map(|value| AnyValue { value: Some(value) }) + .collect(); + + AnyValue { + value: Some(any_value::Value::ArrayValue(ArrayValue { values })), + } +} + +fn tag_to_attribute((name, value): (String, String)) -> Option<(String, AnyValue)> { + let name = match name.as_str() { + "session.status" => "status".to_owned(), + "release" | "environment" | "sdk" | "abnormal_mechanism" => name, + _ => return None, + }; + + let value = AnyValue { + value: Some(any_value::Value::StringValue(value)), + }; + + Some((name, value)) +} diff --git a/tests/integration/test_sessions_eap.py b/tests/integration/test_sessions_eap.py new file mode 100644 index 00000000000..04119380fe9 --- /dev/null +++ b/tests/integration/test_sessions_eap.py @@ -0,0 +1,146 @@ +from datetime import datetime, timedelta, timezone +from unittest import mock + +from .asserts import time_within_delta + + +def test_session_eap_double_write( + mini_sentry, + relay_with_processing, + items_consumer, +): + """ + Test that session metrics are double-written to the EAP snuba-items topic + as TRACE_ITEM_TYPE_USER_SESSION TraceItems when the rollout rate is enabled. + Asserts the full Kafka payload for both the counter (session_count) and + set (user_id) metric buckets. + """ + items_consumer = items_consumer() + + project_id = 42 + project_config = mini_sentry.add_full_project_config(project_id) + project_config["config"]["sessionMetrics"] = {"version": 3} + + # Enable EAP double-write via global config. + mini_sentry.global_config["options"]["relay.sessions-eap.rollout-rate"] = 1.0 + + relay = relay_with_processing() + + timestamp = datetime.now(tz=timezone.utc) + started = timestamp - timedelta(hours=1) + + relay.send_session( + project_id, + { + "sid": "8333339f-5675-4f89-a9a0-1c935255ab58", + "did": "foobarbaz", + "seq": 42, + "init": True, + "timestamp": timestamp.isoformat(), + "started": started.isoformat(), + "duration": 1947.49, + "status": "exited", + "errors": 0, + "attrs": { + "release": "sentry-test@1.0.0", + "environment": "production", + }, + }, + ) + + items = sorted( + items_consumer.get_items(n=2), + key=lambda x: not x["attributes"].get("session_count"), + ) + assert items == [ + # Converted from: `c:sessions/session@none` + { + "organizationId": "1", + "projectId": "42", + "traceId": mock.ANY, + "itemId": mock.ANY, + "itemType": 12, + "timestamp": time_within_delta(started, delta=timedelta(seconds=2)), + "received": time_within_delta(), + "retentionDays": 90, + "downsampledRetentionDays": 90, + "clientSampleRate": 1.0, + "serverSampleRate": 1.0, + "attributes": { + "status": {"stringValue": "init"}, + "release": {"stringValue": "sentry-test@1.0.0"}, + "environment": {"stringValue": "production"}, + "sdk": {"stringValue": "raven-node/2.6.3"}, + "session_count": {"doubleValue": 1.0}, + }, + }, + # Converted from `s:sessions/user@none` + { + "organizationId": "1", + "projectId": "42", + "traceId": mock.ANY, + "itemId": mock.ANY, + "itemType": 12, + "timestamp": time_within_delta(started, delta=timedelta(seconds=2)), + "received": time_within_delta(), + "retentionDays": 90, + "downsampledRetentionDays": 90, + "clientSampleRate": 1.0, + "serverSampleRate": 1.0, + "attributes": { + "release": {"stringValue": "sentry-test@1.0.0"}, + "environment": {"stringValue": "production"}, + "sdk": {"stringValue": "raven-node/2.6.3"}, + # 1617781333 is the CRC32 hash of "foobarbaz". + "user_id": {"arrayValue": {"values": [{"intValue": "1617781333"}]}}, + }, + }, + ] + + +def test_session_eap_double_write_disabled( + mini_sentry, + relay_with_processing, + items_consumer, + metrics_consumer, +): + """ + Test that sessions are NOT written to EAP when the rollout rate is 0 (default). + """ + items_consumer = items_consumer() + metrics_consumer = metrics_consumer() + + project_id = 42 + project_config = mini_sentry.add_full_project_config(project_id) + project_config["config"]["sessionMetrics"] = {"version": 3} + + # Don't set rollout-rate — defaults to 0.0. + relay = relay_with_processing() + + timestamp = datetime.now(tz=timezone.utc) + started = timestamp - timedelta(hours=1) + + relay.send_session( + project_id, + { + "sid": "8333339f-5675-4f89-a9a0-1c935255ab58", + "did": "foobarbaz", + "seq": 42, + "init": True, + "timestamp": timestamp.isoformat(), + "started": started.isoformat(), + "duration": 1947.49, + "status": "exited", + "errors": 0, + "attrs": { + "release": "sentry-test@1.0.0", + "environment": "production", + }, + }, + ) + + # Wait for legacy metrics to confirm the session was fully processed. + assert len(metrics_consumer.get_metrics(n=2)) == 2 + + # No items should appear on the EAP topic. + items_consumer.assert_empty()