Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions src/sentry/data_export/endpoints/data_export_details.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,11 @@
from sentry.api.base import cell_silo_endpoint
from sentry.api.bases.organization import OrganizationDataExportPermission, OrganizationEndpoint
from sentry.api.serializers import serialize
from sentry.data_export.models import ExportedData
from sentry.models.organization import Organization
from sentry.models.project import Project
from sentry.utils import metrics

from ..models import ExportedData


@cell_silo_endpoint
class DataExportDetailsEndpoint(OrganizationEndpoint):
Expand Down
3 changes: 1 addition & 2 deletions src/sentry/data_export/processors/discover.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from sentry_relay.consts import SPAN_STATUS_CODE_TO_NAME

from sentry.api.utils import get_date_range_from_params
from sentry.data_export.base import ExportError
from sentry.models.environment import Environment
from sentry.models.group import Group
from sentry.models.organization import Organization
Expand All @@ -13,8 +14,6 @@
from sentry.snuba import discover
from sentry.snuba.utils import get_dataset

from ..base import ExportError
Comment thread
manessaraj marked this conversation as resolved.

logger = logging.getLogger(__name__)


Expand Down
12 changes: 9 additions & 3 deletions src/sentry/data_export/processors/explore.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
EAPResponse,
FieldsACL,
SearchResolverConfig,
SupportedTraceItemType,
)
from sentry.search.events.types import SAMPLING_MODES, SnubaParams
from sentry.snuba.ourlogs import OurLogs
Expand Down Expand Up @@ -76,6 +77,11 @@ def __init__(
if dataset == "spans"
else TraceItemType.TRACE_ITEM_TYPE_LOG
)
self._supported_trace_item_type = (
SupportedTraceItemType.LOGS
if self.trace_item_type == TraceItemType.TRACE_ITEM_TYPE_LOG
else SupportedTraceItemType.SPANS
)

use_aggregate_conditions = explore_query.get("allowAggregateConditions", "1") == "1"
disable_extrapolation = explore_query.get("disableAggregateExtrapolation", "0") == "1"
Expand Down Expand Up @@ -215,18 +221,18 @@ def run_query(self, _offset: int, limit: int) -> list[dict[str, Any]]:
token.ParseFromString(self.page_token)
request.page_token.CopyFrom(token)
http_resp = export_logs_rpc(request)
rows = list(iter_export_trace_items_rows(http_resp))
rows = list(iter_export_trace_items_rows(http_resp, self._supported_trace_item_type))

if self._last_emitted_item_id_hex is not None:
while rows and rows[0].get("item_id") == self._last_emitted_item_id_hex:
while rows and rows[0].get("id") == self._last_emitted_item_id_hex:
rows = rows[1:]

self._sync_page_token_from_snuba_response(http_resp)

if not rows:
return []

last_id = rows[-1].get("item_id")
last_id = rows[-1].get("id")
if isinstance(last_id, str):
self._last_emitted_item_id_hex = last_id
return rows
79 changes: 67 additions & 12 deletions src/sentry/data_export/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,35 @@
import logging
from collections.abc import Callable
from functools import wraps
from typing import Any, Iterator
from typing import Any, Iterator, Literal

from google.protobuf.timestamp_pb2 import Timestamp
from sentry_protos.snuba.v1.endpoint_trace_items_pb2 import ExportTraceItemsResponse
from sentry_protos.snuba.v1.request_common_pb2 import TraceItemType
from sentry_protos.snuba.v1.trace_item_pb2 import AnyValue, TraceItem

from sentry.data_export.base import ExportError
from sentry.search.eap.types import SupportedTraceItemType
from sentry.search.eap.utils import can_expose_attribute, translate_internal_to_public_alias
from sentry.search.events.constants import TIMEOUT_ERROR_MESSAGE
from sentry.snuba import discover
from sentry.utils import metrics, snuba
from sentry.utils.sdk import capture_exception
from sentry.utils.snuba_rpc import SnubaRPCRateLimitExceeded

_SCALAR_SEARCH_TYPES: list[Literal["string", "number", "boolean"]] = [
"string",
"number",
"boolean",
]

PROTOBUF_TYPE_TO_SEARCH_TYPE: dict[str, Literal["string", "number", "boolean"]] = {
"string_value": "string",
"bytes_value": "string",
"bool_value": "boolean",
"int_value": "number",
"double_value": "number",
}


# Adapted into decorator from 'src/sentry/api/endpoints/organization_events.py'
def handle_snuba_errors(
Expand Down Expand Up @@ -106,15 +121,55 @@ def _ts_to_epoch(ts: Timestamp) -> float:
return ts.seconds + ts.nanos / 1e9


def trace_item_to_row(item: TraceItem) -> dict[str, Any]:
def _merge_trace_export_cell(out: dict[str, Any], new_key: str, value: Any) -> None:
if new_key not in out:
out[new_key] = value
elif out[new_key] is None and value is not None:
out[new_key] = value


def _export_column_name_for_scalar_trace_attribute(
Comment thread
manessaraj marked this conversation as resolved.
internal_key: str,
eap_storage_type: Literal["string", "number", "boolean"],
item_type: SupportedTraceItemType,
) -> str:
"""Map a scalar trace item attribute to its public export column name."""
for storage_type in [eap_storage_type] + [
t for t in _SCALAR_SEARCH_TYPES if t != eap_storage_type
]:
public_alias, public_name, _ = translate_internal_to_public_alias(
internal_key, storage_type, item_type
)
if public_alias is not None and public_name is not None:
return public_name
return internal_key
Comment thread
cursor[bot] marked this conversation as resolved.


def trace_item_to_row(
item: TraceItem,
*,
item_type: SupportedTraceItemType,
) -> dict[str, Any]:
row: dict[str, Any] = {}
for key, av in item.attributes.items():
row[key] = None if av.WhichOneof("value") is None else anyvalue_to_python(av)
row["organization_id"] = item.organization_id
row["project_id"] = item.project_id
row["trace_id"] = item.trace_id
row["item_id"] = item.item_id.hex() if item.item_id else None
row["item_type"] = TraceItemType.Name(item.item_type)
_merge_trace_export_cell(row, "organization.id", item.organization_id)
_merge_trace_export_cell(row, "project.id", item.project_id)
_merge_trace_export_cell(row, "trace", item.trace_id)
_merge_trace_export_cell(row, "id", item.item_id.hex() if item.item_id else None)

for internal_key, av in item.attributes.items():
if not can_expose_attribute(internal_key, item_type, include_internal=False):
continue
which = av.WhichOneof("value")
value = None if which is None else anyvalue_to_python(av)
eap_storage_type = PROTOBUF_TYPE_TO_SEARCH_TYPE.get(which) if which is not None else None
if eap_storage_type is None:
new_key = internal_key
else:
new_key = _export_column_name_for_scalar_trace_attribute(
internal_key, eap_storage_type, item_type
)
Comment thread
cursor[bot] marked this conversation as resolved.
_merge_trace_export_cell(row, new_key, value)

if item.HasField("timestamp"):
row["timestamp"] = _ts_to_epoch(item.timestamp)
if item.HasField("received"):
Expand All @@ -123,12 +178,12 @@ def trace_item_to_row(item: TraceItem) -> dict[str, Any]:
row["server_sample_rate"] = item.server_sample_rate
row["retention_days"] = item.retention_days
row["downsampled_retention_days"] = item.downsampled_retention_days

return row


def iter_export_trace_items_rows(
resp: ExportTraceItemsResponse,
item_type: SupportedTraceItemType,
) -> Iterator[dict[str, Any]]:
for item in resp.trace_items:
yield trace_item_to_row(item)
yield trace_item_to_row(item, item_type=item_type)
23 changes: 9 additions & 14 deletions tests/sentry/data_export/test_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -784,11 +784,11 @@ def _store_explore_logs_jsonl_rich_field_fixture(
"tags[code.line.number,number]": 148.0,
"logger.name": "sentry.access.api",
"origin": "auto.log.stdlib",
"sentry.body": "api.access.alpha",
"message": "api.access.alpha",
"rate_limited": "False",
"sentry.severity_text": "info",
"severity": "info",
"environment": "prod",
"sentry.severity_number": 9.0,
"severity_number": 9.0,
"payload_size": 981.0,
"tags[process.pid,number]": 6639.0,
"response": "200",
Expand All @@ -802,13 +802,13 @@ def _store_explore_logs_jsonl_rich_field_fixture(
"tags[code.line.number,number]": 148.0,
"logger.name": "sentry.access.api",
"origin": "auto.log.stdlib",
"sentry.body": "api.access.beta",
"message": "api.access.beta",
"rate_limited": "False",
"sentry.severity_text": "info",
"severity": "info",
"environment": "prod",
"payload_size": 2048.0,
"tags[process.pid,number]": 6639.0,
"sentry.severity_number": 9.0,
"severity_number": 9.0,
"response": "201",
},
]
Expand All @@ -823,15 +823,10 @@ def _store_explore_logs_jsonl_rich_field_fixture(
"observed_timestamp",
"trace",
"id",
"item_id",
"sentry.timestamp_precise",
"sentry.observed_timestamp_nanos",
"organization_id",
"project_id",
"trace_id",
"organization.id",
"project.id",
"item_type",
"timestamp",
"sentry._internal.ingested_at",
"client_sample_rate",
"server_sample_rate",
"retention_days",
Expand Down Expand Up @@ -1089,7 +1084,7 @@ def test_explore_logs_jsonl_format(self, emailer: MagicMock) -> None:

lines = [ln for ln in content.split(b"\n") if ln]
assert len(lines) == rows_exported
message_key = "log.body" if len(fields) else "sentry.body"
message_key = "log.body" if len(fields) else "message"
bodies = {json.loads(ln.decode("utf-8"))[message_key] for ln in lines}
assert bodies == {
"jsonl log message",
Expand Down
Loading