diff --git a/CHANGELOG.md b/CHANGELOG.md index 2c40517406..38236c5592 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,6 +16,7 @@ **Internal**: - Embed AI operation type mappings into Relay. ([#5555](https://github.com/getsentry/relay/pull/5555)) +- Apply continuous profiling rate limits to transaction profiles. ([#5614](https://github.com/getsentry/relay/pull/5614) - Use new processor architecture to process transactions. ([#5379](https://github.com/getsentry/relay/pull/5379)) - Add `gen_ai_response_time_to_first_token` as a `SpanData` attribute. ([#5575](https://github.com/getsentry/relay/pull/5575)) - Add sampling to expensive envelope buffer statsd metrics. ([#5576](https://github.com/getsentry/relay/pull/5576)) diff --git a/relay-server/src/processing/transactions/types/expanded.rs b/relay-server/src/processing/transactions/types/expanded.rs index 66edf3ff5f..74dc91ded7 100644 --- a/relay-server/src/processing/transactions/types/expanded.rs +++ b/relay-server/src/processing/transactions/types/expanded.rs @@ -1,6 +1,6 @@ use either::Either; use relay_event_schema::protocol::Event; -use relay_profiling::ProfileMetadata; +use relay_profiling::{ProfileMetadata, ProfileType}; use relay_protocol::Annotated; use relay_quotas::DataCategory; use relay_sampling::evaluation::SamplingDecision; @@ -175,7 +175,11 @@ impl RateLimited for Managed>> { let attachment_quantities = self.attachments.quantities(); // Check profile limits: - for (category, quantity) in self.profile.quantities() { + let profile_quantities = self + .profile + .as_ref() + .map_or(Default::default(), |p| p.rate_limiting_quantities()); + for (category, quantity) in profile_quantities { let limits = rate_limiter .try_consume(scoping.item(category), quantity) .await; @@ -286,6 +290,30 @@ impl ExpandedProfile { self.item.set_profile_type(self.meta.kind); self.item } + + /// Returns the data categories and quantities in which this profile should be rate limited. + /// + /// This is a remnant of the introduction of continuous profiling. Transaction profiles are + /// essentially deprecated, but for backwards compatibility reasons they still are being sent + /// and exist and we want to still support them. + /// + /// Continuous profiling is split into two separate categories, backend and ui, based on the + /// platform being profiled. Customers can now assign different quota for each category, + /// and we want these limits to also apply to transaction profiles. + /// + /// But these transaction profiles, are still visible as separate items in the UI and stats + /// pages, this is why we only rate limit against the respective category instead of also + /// counting them/adding outcomes for them in the continuous profiling categories. + /// + /// It's basically a big pile of legacy reasons. + pub fn rate_limiting_quantities(&self) -> Quantities { + let mut quantities = self.quantities(); + quantities.push(match self.meta.kind { + ProfileType::Backend => (DataCategory::ProfileChunk, 1), + ProfileType::Ui => (DataCategory::ProfileChunkUi, 1), + }); + quantities + } } impl Counted for ExpandedProfile { diff --git a/relay-server/src/processing/transactions/types/profile.rs b/relay-server/src/processing/transactions/types/profile.rs index 951cafe504..fdb645c209 100644 --- a/relay-server/src/processing/transactions/types/profile.rs +++ b/relay-server/src/processing/transactions/types/profile.rs @@ -1,9 +1,9 @@ use crate::Envelope; use crate::envelope::EnvelopeHeaders; -use crate::managed::{Counted, Managed, Quantities}; -use crate::processing::CountRateLimited; +use crate::managed::{Counted, Managed, Quantities, Rejected}; use crate::processing::transactions::Error; use crate::processing::transactions::types::ExpandedProfile; +use crate::processing::{Context, RateLimited, RateLimiter}; /// A standalone profile, which is no longer attached to a transaction as the transaction was /// dropped by dynamic sampling. @@ -29,6 +29,30 @@ impl Counted for StandaloneProfile { } } -impl CountRateLimited for Managed> { +impl RateLimited for Managed> { + type Output = Self; type Error = Error; + + async fn enforce( + self, + mut rate_limiter: R, + _ctx: Context<'_>, + ) -> Result> + where + R: RateLimiter, + { + let scoping = self.scoping(); + + for (category, quantity) in self.profile.rate_limiting_quantities() { + let limits = rate_limiter + .try_consume(scoping.item(category), quantity) + .await; + + if !limits.is_empty() { + return Err(self.reject_err(Error::from(limits))); + } + } + + Ok(self) + } } diff --git a/tests/integration/test_outcome.py b/tests/integration/test_outcome.py index cb3c15a18f..206c42b148 100644 --- a/tests/integration/test_outcome.py +++ b/tests/integration/test_outcome.py @@ -1578,7 +1578,9 @@ def make_envelope(): assert profiles_consumer.get_profile() -@pytest.mark.parametrize("quota_category", ["transaction", "profile"]) +@pytest.mark.parametrize( + "quota_category", ["transaction", "profile", "profile_chunk_ui"] +) def test_profile_outcomes_rate_limited( mini_sentry, relay_with_processing, @@ -1607,15 +1609,12 @@ def test_profile_outcomes_rate_limited( config = { "outcomes": { "emit_outcomes": True, - "batch_size": 1, - "batch_interval": 1, "aggregator": { "bucket_interval": 1, "flush_interval": 1, }, - }, + } } - upstream = relay_with_processing(config) with open( @@ -1667,7 +1666,7 @@ def test_profile_outcomes_rate_limited( for (category, quantity) in expected_categories ] - if quota_category == "profile": + if quota_category != "transaction": expected_outcomes.append( { "category": DataCategory.SPAN_INDEXED, @@ -1712,10 +1711,6 @@ def test_profile_outcomes_rate_limited_when_dynamic_sampling_drops( "flush_interval": 0, }, }, - "aggregator": { - "bucket_interval": 1, - "initial_delay": 0, - }, } relay = relay(mini_sentry, options=config)