Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

**Internal**:

- Add EAP double-write for session data. ([#5588](https://github.com/getsentry/relay/pull/5588))
- Embed AI operation type mappings into Relay. ([#5555](https://github.com/getsentry/relay/pull/5555))
- Use new processor architecture to process transactions. ([#5379](https://github.com/getsentry/relay/pull/5379))
- Add `gen_ai_response_time_to_first_token` as a `SpanData` attribute. ([#5575](https://github.com/getsentry/relay/pull/5575))
Expand Down
23 changes: 23 additions & 0 deletions relay-base-schema/src/metrics/name.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,29 @@ impl MetricName {
.into_iter()
.find(|namespace| maybe_namespace == namespace.as_str())
}

/// Extracts the metric name from a well formed MRI.
///
/// If the contained metric name is not a well formed MRI this function returns `None`.
///
/// # Examples
///
/// ```
/// use relay_base_schema::metrics::{MetricName, MetricNamespace};
///
/// let name = MetricName::from("foo");
/// assert!(name.try_name().is_none());
///
/// let name = MetricName::from("c:custom/foo@none");
/// assert_eq!(name.try_name(), Some("foo"));
/// let name = MetricName::from("c:custom/foo");
/// assert_eq!(name.try_name(), Some("foo"));
/// ```
pub fn try_name(&self) -> Option<&str> {
// A well formed MRI is always in the format `<type>:<namespace>/<name>[@<unit>]`.
let after_slash = self.0.get(2..)?.split('/').nth(1)?;
after_slash.split('@').next()
}
}

impl PartialEq<str> for MetricName {
Expand Down
13 changes: 13 additions & 0 deletions relay-dynamic-config/src/global.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,19 @@ pub struct Options {
)]
pub objectstore_attachments_sample_rate: f32,

/// Rollout rate for the EAP (Event Analytics Platform) double-write for user sessions.
///
/// When rolled out, session data is sent both through the legacy metrics pipeline
/// and directly to the `snuba-items` topic as `TRACE_ITEM_TYPE_USER_SESSION`.
///
/// Rate needs to be between `0.0` and `1.0`.
#[serde(
rename = "relay.sessions-eap.rollout-rate",
deserialize_with = "default_on_error",
skip_serializing_if = "is_default"
)]
pub sessions_eap_rollout_rate: f32,

/// All other unknown options.
#[serde(flatten)]
other: HashMap<String, Value>,
Expand Down
2 changes: 1 addition & 1 deletion relay-server/src/processing/sessions/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ impl processing::Processor for SessionsProcessor {

let sessions = self.limiter.enforce_quotas(sessions, ctx).await?;

let sessions = process::extract(sessions, ctx);
let sessions = process::extract_metrics(sessions, ctx);
Ok(Output::metrics(sessions))
}
}
Expand Down
5 changes: 4 additions & 1 deletion relay-server/src/processing/sessions/process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,10 @@ fn normalize_attributes(attrs: &mut SessionAttributes, ctx: &NormalizeContext<'_
Ok(())
}

pub fn extract(sessions: Managed<ExpandedSessions>, ctx: Context<'_>) -> Managed<ExtractedMetrics> {
pub fn extract_metrics(
sessions: Managed<ExpandedSessions>,
ctx: Context<'_>,
) -> Managed<ExtractedMetrics> {
let should_extract_abnormal_mechanism = ctx
.project_info
.config
Expand Down
43 changes: 32 additions & 11 deletions relay-server/src/services/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ use crate::services::processor::Processed;
use crate::statsd::{RelayCounters, RelayGauges, RelayTimers};
use crate::utils::{self, FormDataIter};

mod sessions;

/// Fallback name used for attachment items without a `filename` header.
const UNNAMED_ATTACHMENT: &str = "Unnamed Attachment";

Expand Down Expand Up @@ -518,6 +520,12 @@ impl StoreService {
let global_config = self.global_config.current();
let mut encoder = BucketEncoder::new(&global_config);

let emit_sessions_to_eap = utils::is_rolled_out(
scoping.organization_id.value(),
global_config.options.sessions_eap_rollout_rate,
)
.is_keep();

let now = UnixTimestamp::now();
let mut delay_stats = ByNamespace::<(u64, u64, u64)>::default();

Expand Down Expand Up @@ -561,6 +569,13 @@ impl StoreService {

self.metric_outcomes.track(scoping, &[view], outcome);
}

if emit_sessions_to_eap
&& let Some(trace_item) = sessions::to_trace_item(scoping, bucket, retention)
{
let message = KafkaMessage::for_item(scoping, trace_item);
let _ = self.produce(KafkaTopic::Items, message);
}
}

if let Some(error) = error {
Expand Down Expand Up @@ -594,16 +609,7 @@ impl StoreService {
let received_at = message.received_at();

let quantities = message.try_accept(|item| {
let item_type = item.trace_item.item_type();
let message = KafkaMessage::Item {
headers: BTreeMap::from([
("project_id".to_owned(), scoping.project_id.to_string()),
("item_type".to_owned(), (item_type as i32).to_string()),
]),
message: item.trace_item,
item_type,
};

let message = KafkaMessage::for_item(scoping, item.trace_item);
self.produce(KafkaTopic::Items, message)
.map(|()| item.quantities)
});
Expand Down Expand Up @@ -931,8 +937,8 @@ impl StoreService {
}
_ => KafkaTopic::MetricsGeneric,
};
let headers = BTreeMap::from([("namespace".to_owned(), namespace.to_string())]);

let headers = BTreeMap::from([("namespace".to_owned(), namespace.to_string())]);
self.produce(topic, KafkaMessage::Metric { headers, message })?;
Ok(())
}
Expand Down Expand Up @@ -1561,6 +1567,21 @@ enum KafkaMessage<'a> {
ReplayRecordingNotChunked(ReplayRecordingNotChunkedKafkaMessage<'a>),
}

impl KafkaMessage<'_> {
/// Creates a [`KafkaMessage`] for a [`TraceItem`].
fn for_item(scoping: Scoping, item: TraceItem) -> KafkaMessage<'static> {
let item_type = item.item_type();
KafkaMessage::Item {
headers: BTreeMap::from([
("project_id".to_owned(), scoping.project_id.to_string()),
("item_type".to_owned(), (item_type as i32).to_string()),
]),
message: item,
item_type,
}
}
}

impl Message for KafkaMessage<'_> {
fn variant(&self) -> &'static str {
match self {
Expand Down
100 changes: 100 additions & 0 deletions relay-server/src/services/store/sessions.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
use std::collections::{BTreeSet, HashMap};

use relay_metrics::{Bucket, BucketValue, MetricNamespace};
use relay_quotas::Scoping;
use sentry_protos::snuba::v1::{AnyValue, ArrayValue, TraceItem, TraceItemType, any_value};

use crate::processing::utils::store::uuid_to_item_id;

/// Converts a session [`Bucket`] into an EAP [`TraceItem`].
pub fn to_trace_item(scoping: Scoping, bucket: Bucket, retention: u16) -> Option<TraceItem> {
if bucket.name.namespace() != MetricNamespace::Sessions {
return None;
}

// Currently max 5 tags + 1 metric value.
let mut attributes = HashMap::with_capacity(6);

match bucket.name.try_name()? {
"session" => {
let BucketValue::Counter(v) = bucket.value else {
return None;
};
attributes.insert(
"session_count".to_owned(),
AnyValue {
value: Some(any_value::Value::DoubleValue(v.into())),
},
);
}
"user" => {
let BucketValue::Set(set) = &bucket.value else {
return None;
};
attributes.insert("user_id".to_owned(), set_to_attribute_value(set));
}
"error" => {
let BucketValue::Set(set) = &bucket.value else {
return None;
};
attributes.insert("errored_session_id".to_owned(), set_to_attribute_value(set));
}
_ => return None,
}

for (name, value) in bucket.tags.into_iter().filter_map(tag_to_attribute) {
attributes.insert(name, value);
}

let uuid = uuid::Uuid::new_v4();
Some(TraceItem {
organization_id: scoping.organization_id.value(),
project_id: scoping.project_id.value(),
trace_id: uuid.to_string(),
item_id: uuid_to_item_id(uuid),
item_type: TraceItemType::UserSession.into(),
timestamp: Some(prost_types::Timestamp {
seconds: bucket.timestamp.as_secs() as i64,
nanos: 0,
}),
received: bucket
.metadata
.received_at
.map(|ts| prost_types::Timestamp {
seconds: ts.as_secs() as i64,
nanos: 0,
}),
retention_days: retention.into(),
downsampled_retention_days: retention.into(),
attributes,
client_sample_rate: 1.0,
server_sample_rate: 1.0,
})
}

fn set_to_attribute_value(set: &BTreeSet<u32>) -> AnyValue {
let values = set
.iter()
.map(|v| i64::from(*v))
.map(any_value::Value::IntValue)
.map(|value| AnyValue { value: Some(value) })
.collect();

AnyValue {
value: Some(any_value::Value::ArrayValue(ArrayValue { values })),
}
}

fn tag_to_attribute((name, value): (String, String)) -> Option<(String, AnyValue)> {
let name = match name.as_str() {
"session.status" => "status".to_owned(),
"release" | "environment" | "sdk" | "abnormal_mechanism" => name,
_ => return None,
};

let value = AnyValue {
value: Some(any_value::Value::StringValue(value)),
};

Some((name, value))
}
Loading
Loading