Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
209 changes: 150 additions & 59 deletions src/sentry/billing/platform/services/usage/_outcomes_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,11 @@

from google.protobuf.timestamp_pb2 import Timestamp
from sentry_protos.billing.v1.date_pb2 import Date
from sentry_protos.billing.v1.services.usage.v1.endpoint_orgs_with_usage_pb2 import (
GetOrgsWithUsageRequest,
GetOrgsWithUsageResponse,
PageToken,
)
from sentry_protos.billing.v1.services.usage.v1.endpoint_usage_pb2 import (
CategoryUsage,
DailyUsage,
Expand All @@ -21,6 +26,7 @@
Function,
Granularity,
Limit,
Offset,
Op,
OrderBy,
Query,
Expand Down Expand Up @@ -48,6 +54,36 @@
_BILLABLE_OUTCOMES = [Outcome.ACCEPTED, Outcome.FILTERED, Outcome.RATE_LIMITED]


def query_orgs_with_usage(request: GetOrgsWithUsageRequest) -> GetOrgsWithUsageResponse:
start = _timestamp_to_datetime(request.start)
end = _timestamp_to_datetime(request.end) + timedelta(days=1)
categories = [proto_to_sentry_category(c) for c in request.categories]

offset = request.page_token.offset if request.HasField("page_token") else 0

snuba_request = _build_distinct_orgs_query(
start=start,
end=end,
categories=categories,
limit=request.limit + 1,
offset=offset,
)
result = raw_snql_query(snuba_request, referrer=_REFERRER)
rows = result["data"]

has_more = len(rows) > request.limit
if has_more:
rows.pop()

response = GetOrgsWithUsageResponse(
organization_ids=[int(row["org_id"]) for row in rows],
Comment thread
sentry-warden[bot] marked this conversation as resolved.
)
if has_more:
response.page_token.CopyFrom(PageToken(offset=offset + request.limit))

return response


def query_outcomes_usage(request: GetUsageRequest) -> GetUsageResponse:
org_id = request.organization_id
start = _timestamp_to_datetime(request.start)
Expand All @@ -60,7 +96,9 @@ def query_outcomes_usage(request: GetUsageRequest) -> GetUsageResponse:
# (e.g., proto ATTACHMENT=3 vs Relay ATTACHMENT=4). Convert before querying.
categories = [proto_to_sentry_category(c) for c in request.categories]

snuba_request = _build_query(org_id, start, end, categories, total_outcomes=_BILLABLE_OUTCOMES)
snuba_request = _build_usage_query(
org_id, start, end, categories, total_outcomes=_BILLABLE_OUTCOMES
)
result = raw_snql_query(snuba_request, referrer=_REFERRER)
rows = result["data"]

Expand All @@ -78,90 +116,143 @@ def query_outcomes_usage(request: GetUsageRequest) -> GetUsageResponse:
return _build_response(rows)


def _build_query(
org_id: int,
def _build_distinct_orgs_query(
start: datetime,
end: datetime,
categories: Sequence[int],
*,
limit: int,
offset: int = 0,
) -> Request:
"""Build a query that returns distinct org_ids with billable usage."""
where = [
Condition(Column("timestamp"), Op.GTE, start),
Condition(Column("timestamp"), Op.LT, end),
Condition(Column("outcome"), Op.IN, list(_BILLABLE_OUTCOMES)),
]
if categories:
where.append(Condition(Column("category"), Op.IN, categories))

query = Query(
match=Entity("outcomes"),
select=[Column("org_id")],
groupby=[Column("org_id")],
where=where,
orderby=[OrderBy(Column("org_id"), Direction.ASC)],
granularity=Granularity(_DAILY_GRANULARITY),
limit=Limit(limit),
offset=Offset(offset),
)

return Request(
dataset=_DATASET,
app_id=_APP_ID,
query=query,
tenant_ids={},
)


def _build_usage_query(
org_id: int | None,
start: datetime,
end: datetime,
categories: Sequence[int],
*,
total_outcomes: Sequence[int] | None = None,
limit: int = _QUERY_LIMIT,
offset: int = 0,
) -> Request:
"""Build a per-category, per-day usage breakdown query."""
# Half-open interval [start, end) — standard sentry.snuba.outcomes convention.
# `end` has already been shifted +1 day in query_outcomes_usage() to convert
# the proto's inclusive end into the exclusive boundary Snuba expects.
where = [
Condition(Column("org_id"), Op.EQ, org_id),
Condition(Column("timestamp"), Op.GTE, start),
Condition(Column("timestamp"), Op.LT, end),
]
if org_id is not None:
where.append(Condition(Column("org_id"), Op.EQ, org_id))
if categories:
where.append(Condition(Column("category"), Op.IN, categories))

select = [
Column("category"),
Column("time"),
_total_function(total_outcomes),
Function(
"sumIf",
[Column("quantity"), Function("equals", [Column("outcome"), Outcome.ACCEPTED])],
"accepted",
),
Function(
"sumIf",
[
Column("quantity"),
Function("equals", [Column("outcome"), Outcome.RATE_LIMITED]),
],
"dropped",
),
Function(
"sumIf",
[Column("quantity"), Function("equals", [Column("outcome"), Outcome.FILTERED])],
"filtered",
),
Function("sumIf", [Column("quantity"), _over_quota_condition()], "over_quota"),
Function(
"sumIf",
[
Column("quantity"),
Function(
"and",
[
Function("equals", [Column("outcome"), Outcome.RATE_LIMITED]),
Function("equals", [Column("reason"), "smart_rate_limit"]),
],
),
],
"spike_protection",
),
Function(
"sumIf",
[
Column("quantity"),
Function(
"and",
[
Function("equals", [Column("outcome"), Outcome.FILTERED]),
Function("startsWith", [Column("reason"), "Sampled:"]),
],
),
],
"dynamic_sampling",
),
]

groupby = [Column("category"), Column("time")]
if org_id is None:
select.insert(0, Column("org_id"))
groupby.insert(0, Column("org_id"))

query = Query(
match=Entity("outcomes"),
select=[
Column("category"),
Column("time"),
_total_function(total_outcomes),
Function(
"sumIf",
[Column("quantity"), Function("equals", [Column("outcome"), Outcome.ACCEPTED])],
"accepted",
),
Function(
"sumIf",
[
Column("quantity"),
Function("equals", [Column("outcome"), Outcome.RATE_LIMITED]),
],
"dropped",
),
Function(
"sumIf",
[Column("quantity"), Function("equals", [Column("outcome"), Outcome.FILTERED])],
"filtered",
),
Function("sumIf", [Column("quantity"), _over_quota_condition()], "over_quota"),
Function(
"sumIf",
[
Column("quantity"),
Function(
"and",
[
Function("equals", [Column("outcome"), Outcome.RATE_LIMITED]),
Function("equals", [Column("reason"), "smart_rate_limit"]),
],
),
],
"spike_protection",
),
Function(
"sumIf",
[
Column("quantity"),
Function(
"and",
[
Function("equals", [Column("outcome"), Outcome.FILTERED]),
Function("startsWith", [Column("reason"), "Sampled:"]),
],
),
],
"dynamic_sampling",
),
],
groupby=[Column("category"), Column("time")],
select=select,
groupby=groupby,
where=where,
orderby=[OrderBy(Column("time"), Direction.ASC)],
granularity=Granularity(_DAILY_GRANULARITY),
limit=Limit(_QUERY_LIMIT),
limit=Limit(limit),
offset=Offset(offset),
)

tenant_ids: dict[str, int] = {}
if org_id is not None:
tenant_ids["organization_id"] = org_id

return Request(
dataset=_DATASET,
app_id=_APP_ID,
query=query,
tenant_ids={"organization_id": org_id},
tenant_ids=tenant_ids,
)


Expand Down
13 changes: 12 additions & 1 deletion src/sentry/billing/platform/services/usage/service.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,19 @@
from __future__ import annotations

from sentry_protos.billing.v1.services.usage.v1.endpoint_orgs_with_usage_pb2 import (
GetOrgsWithUsageRequest,
GetOrgsWithUsageResponse,
)
from sentry_protos.billing.v1.services.usage.v1.endpoint_usage_pb2 import (
GetUsageRequest,
GetUsageResponse,
)

from sentry.billing.platform.core import BillingService, service_method
from sentry.billing.platform.services.usage._outcomes_query import query_outcomes_usage
from sentry.billing.platform.services.usage._outcomes_query import (
query_orgs_with_usage,
query_outcomes_usage,
)


class UsageService(BillingService):
Expand All @@ -20,3 +27,7 @@ def get_usage(self, request: GetUsageRequest) -> GetUsageResponse:
dynamic_sampling.
"""
return query_outcomes_usage(request)

@service_method
def get_orgs_with_usage(self, request: GetOrgsWithUsageRequest) -> GetOrgsWithUsageResponse:
Comment thread
sentry-warden[bot] marked this conversation as resolved.
return query_orgs_with_usage(request)
Loading
Loading