Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 34 additions & 59 deletions src/sentry/profiles/flamegraph.py
Original file line number Diff line number Diff line change
Expand Up @@ -450,75 +450,48 @@ def get_profile_candidates_from_profiles(self) -> ProfileCandidates:
raise ValueError("`organization` is required and cannot be `None`")

max_profiles = options.get("profiling.flamegraph.profile-set.size")
initial_chunk_delta_hours = options.get(
"profiling.flamegraph.query.initial_chunk_delta.hours"
)
max_chunk_delta_hours = options.get("profiling.flamegraph.query.max_delta.hours")
multiplier = options.get("profiling.flamegraph.query.multiplier")

initial_chunk_delta = timedelta(hours=initial_chunk_delta_hours)
max_chunk_delta = timedelta(hours=max_chunk_delta_hours)

referrer = Referrer.API_PROFILING_PROFILE_FLAMEGRAPH_PROFILE_CANDIDATES.value
transaction_profile_candidates: list[TransactionProfileCandidate] = []
profiler_metas: list[ProfilerMeta] = []

assert self.snuba_params.start is not None and self.snuba_params.end is not None
snuba_params = self.snuba_params.copy()

for chunk_start, chunk_end in split_datetime_range_exponential(
self.snuba_params.start,
self.snuba_params.end,
initial_chunk_delta,
max_chunk_delta,
multiplier,
reverse=True,
):
snuba_params.start = chunk_start
snuba_params.end = chunk_end

builder = self.get_transactions_based_candidate_query(
query=self.query, limit=max_profiles, snuba_params=snuba_params
)
results = builder.run_query(referrer)
results = builder.process_results(results)

for row in results["data"]:
if row["profile.id"] is not None:
transaction_profile_candidates.append(
{
"project_id": row["project.id"],
"profile_id": row["profile.id"],
}
)
elif row["profiler.id"] is not None and row["thread.id"]:
profiler_metas.append(
ProfilerMeta(
project_id=row["project.id"],
profiler_id=row["profiler.id"],
thread_id=row["thread.id"],
start=row["precise.start_ts"],
end=row["precise.finish_ts"],
transaction_id=row["id"],
)
)
transaction_constraint = "is_transaction:true"
query = (
f"({transaction_constraint}) and ({self.query})"
if self.query
else transaction_constraint
)
results = self.get_spans_based_candidates(query=query, limit=max_profiles)
Comment thread
mjq marked this conversation as resolved.
Comment thread
mjq marked this conversation as resolved.

if len(transaction_profile_candidates) + len(profiler_metas) >= max_profiles:
break
transaction_profile_candidates: list[TransactionProfileCandidate] = [
{
"project_id": row["project.id"],
"profile_id": row["profile.id"],
}
for row in results["data"]
if row["profile.id"]
]

max_continuous_profile_candidates = max(
max_profiles - len(transaction_profile_candidates), 0
)

profiler_metas = [
ProfilerMeta(
project_id=row["project.id"],
profiler_id=row["profiler.id"],
thread_id=row["thread.id"],
start=row["precise.start_ts"],
end=row["precise.finish_ts"],
transaction_id=row["transaction.event_id"] or None,
)
for row in results["data"]
if row["profiler.id"] and row["thread.id"]
]
Comment thread
mjq marked this conversation as resolved.

continuous_profile_candidates: list[ContinuousProfileCandidate] = []
continuous_duration = 0.0

# If there are continuous profiles attached to transactions, we prefer those as
# If there are continuous profiles attached to spans, we prefer those as
# the active thread id gives us more user friendly flamegraphs than without.
if profiler_metas and max_continuous_profile_candidates > 0:
snuba_params.end = self.snuba_params.end
continuous_profile_candidates, continuous_duration = self.get_chunks_for_profilers(
profiler_metas, max_continuous_profile_candidates, snuba_params
continuous_profile_candidates, _ = self.get_chunks_for_profilers(
profiler_metas, max_continuous_profile_candidates
)

seen_chunks = {
Expand Down Expand Up @@ -559,6 +532,7 @@ def get_profile_candidates_from_profiles(self) -> ProfileCandidates:
limit=Limit(max_profiles),
)

referrer = Referrer.API_PROFILING_PROFILE_FLAMEGRAPH_PROFILE_CANDIDATES.value
all_results = bulk_snuba_queries(
[
Request(
Expand Down Expand Up @@ -646,7 +620,7 @@ def get_spans_based_candidates(self, query: str | None, limit: int) -> EAPRespon
# add constraints in order to fetch only spans with profiles
profiling_constraint = "(has:profile.id) or (has:profiler.id has:thread.id)"
if query is not None and len(query) > 0:
query = f"{query} and {profiling_constraint}"
query = f"({query}) and ({profiling_constraint})"
else:
query = profiling_constraint
return Spans.run_table_query(
Expand All @@ -661,6 +635,7 @@ def get_spans_based_candidates(self, query: str | None, limit: int) -> EAPRespon
"profiler.id",
"thread.id",
"timestamp",
"transaction.event_id", # holds the transaction ID, if this span was originally a transaction
],
orderby=["-timestamp"],
offset=0,
Expand Down
137 changes: 29 additions & 108 deletions tests/sentry/api/endpoints/test_organization_profiling_profiles.py
Original file line number Diff line number Diff line change
Expand Up @@ -530,122 +530,38 @@ def test_queries_profile_candidates_from_transactions_with_data(
},
)

def test_queries_profile_candidates_from_profiles_with_continuous_profiles_without_transactions(
def test_queries_profile_candidates_from_profiles_with_continuous_profiles(
self,
):
# this transaction has transaction profile
profile_id = uuid4().hex
self.store_transaction(
transaction="foo",
profile_id=profile_id,
project=self.project,
span_with_transaction_profile = self.create_span(
project=self.project, start_ts=self.ten_mins_ago, duration=1000
)
span_with_transaction_profile.update({"profile_id": profile_id, "is_segment": True})
self.store_span(span_with_transaction_profile)

# this transaction has continuous profile with a matching chunk (to be mocked below)
profiler_id = uuid4().hex
thread_id = "12345"
profiler_transaction_id = uuid4().hex
profiler_transaction = self.store_transaction(
transaction="foo",
profiler_id=profiler_id,
thread_id=thread_id,
transaction_id=profiler_transaction_id,
span_with_continuous_profile = self.create_span(
{"sentry_tags": {"profiler_id": profiler_id, "thread.id": thread_id}},
project=self.project,
start_ts=self.ten_mins_ago,
duration=1000,
)
start_timestamp = datetime.fromtimestamp(profiler_transaction["start_timestamp"])
finish_timestamp = datetime.fromtimestamp(profiler_transaction["timestamp"])
buffer = timedelta(seconds=3)
# not able to write profile chunks to the table yet so mock it's response here
# so that the profiler transaction looks like it has a profile chunk within
# the specified time range
chunk = {
"project_id": self.project.id,
"profiler_id": profiler_id,
"chunk_id": uuid4().hex,
"start_timestamp": (start_timestamp - buffer).isoformat(),
"end_timestamp": (finish_timestamp + buffer).isoformat(),
}
del span_with_continuous_profile["profile_id"]
span_with_continuous_profile["is_segment"] = True
self.store_span(span_with_continuous_profile)

with (
patch(
"sentry.profiles.flamegraph.FlamegraphExecutor._query_chunks_for_profilers"
) as mock_query_chunks_for_profilers,
patch(
"sentry.api.endpoints.organization_profiling_profiles.proxy_profiling_service"
) as mock_proxy_profiling_service,
):
# Mock the chunks query for the profiler_meta
mock_query_chunks_for_profilers.return_value = [{"data": [chunk]}]
mock_proxy_profiling_service.return_value = HttpResponse(status=200)

response = self.do_request(
{
"project": [self.project.id],
"dataSource": "profiles",
},
)

assert response.status_code == 200, response.content

# Verify that chunks were queried for the profiler
mock_query_chunks_for_profilers.assert_called_once()

mock_proxy_profiling_service.assert_called_once_with(
method="POST",
path=f"/organizations/{self.project.organization.id}/flamegraph",
json_data={
"transaction": [
{
"project_id": self.project.id,
"profile_id": profile_id,
},
],
"continuous": [
{
"project_id": self.project.id,
"profiler_id": profiler_id,
"chunk_id": chunk["chunk_id"],
"thread_id": thread_id,
"start": str(int(profiler_transaction["start_timestamp"] * 1e9)),
"end": str(int(profiler_transaction["timestamp"] * 1e9)),
"transaction_id": profiler_transaction_id,
},
],
},
)

def test_queries_profile_candidates_from_profiles_with_continuous_profiles_with_transactions(
self,
):
# this transaction has transaction profile
profile_id = uuid4().hex
profile_transaction_id = uuid4().hex
self.store_transaction(
transaction="foo",
profile_id=profile_id,
transaction_id=profile_transaction_id,
project=self.project,
start_timestamp = datetime.fromtimestamp(
span_with_continuous_profile["start_timestamp_precise"]
)

# this transaction has continuous profile with a matching chunk (to be mocked below)
profiler_id = uuid4().hex
thread_id = "12345"
profiler_transaction_id = uuid4().hex
profiler_transaction = self.store_transaction(
transaction="foo",
profiler_id=profiler_id,
thread_id=thread_id,
transaction_id=profiler_transaction_id,
project=self.project,
finish_timestamp = datetime.fromtimestamp(
span_with_continuous_profile["end_timestamp_precise"]
)

start_timestamp = datetime.fromtimestamp(profiler_transaction["start_timestamp"])
finish_timestamp = datetime.fromtimestamp(profiler_transaction["timestamp"])
buffer = timedelta(seconds=3)
# not able to write profile chunks to the table yet so mock it's response here
# so that the profiler transaction looks like it has a profile chunk within
# the specified time range
chunk_1 = {
# not able to write profile chunks to the table yet so mock its response here
# so that the span looks like it has a profile chunk within the specified time range
chunk = {
"project_id": self.project.id,
"profiler_id": profiler_id,
"chunk_id": uuid4().hex,
Expand All @@ -662,13 +578,14 @@ def test_queries_profile_candidates_from_profiles_with_continuous_profiles_with_
) as mock_proxy_profiling_service,
):
# Mock the chunks query for the profiler_meta
mock_query_chunks_for_profilers.return_value = [{"data": [chunk_1]}]
mock_query_chunks_for_profilers.return_value = [{"data": [chunk]}]
mock_proxy_profiling_service.return_value = HttpResponse(status=200)

response = self.do_request(
{
"project": [self.project.id],
"dataSource": "profiles",
"statsPeriod": "1h",
},
)

Expand All @@ -691,11 +608,15 @@ def test_queries_profile_candidates_from_profiles_with_continuous_profiles_with_
{
"project_id": self.project.id,
"profiler_id": profiler_id,
"chunk_id": chunk_1["chunk_id"],
"chunk_id": chunk["chunk_id"],
"thread_id": thread_id,
"start": str(int(profiler_transaction["start_timestamp"] * 1e9)),
"end": str(int(profiler_transaction["timestamp"] * 1e9)),
"transaction_id": profiler_transaction_id,
"start": str(
int(span_with_continuous_profile["start_timestamp_precise"] * 1e9)
),
"end": str(
int(span_with_continuous_profile["end_timestamp_precise"] * 1e9)
),
"transaction_id": span_with_continuous_profile["event_id"],
},
],
},
Expand Down
Loading