Skip to content

Commit 9b87bd9

Browse files
committed
Add distributed tracing module for New Relic integration
Introduces agave.core.tracing with utilities to propagate and accept distributed trace headers across HTTP calls and queue messages: - get_trace_headers(): extract current trace headers - accept_trace_headers(): continue trace from incoming headers - add_custom_attribute(): add attributes to current trace - @accept_trace_from_queue: decorator for queue consumers - @inject_trace_headers: decorator for HTTP client calls - @trace_attributes: decorator to add custom trace attributes Also adds 'tracing' optional dependency (newrelic>=7.0.0,<12.0.0) and bumps version to 1.5.3.
1 parent e7aa7f5 commit 9b87bd9

6 files changed

Lines changed: 600 additions & 1 deletion

File tree

agave/core/__init__.py

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
from .tracing import (
2+
TRACE_HEADERS_KEY,
3+
accept_trace_from_queue,
4+
accept_trace_headers,
5+
add_custom_attribute,
6+
get_trace_headers,
7+
inject_trace_headers,
8+
trace_attributes,
9+
)
10+
11+
__all__ = [
12+
'TRACE_HEADERS_KEY',
13+
'accept_trace_from_queue',
14+
'accept_trace_headers',
15+
'add_custom_attribute',
16+
'get_trace_headers',
17+
'inject_trace_headers',
18+
'trace_attributes',
19+
]

agave/core/tracing.py

Lines changed: 186 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,186 @@
1+
import inspect
2+
from functools import wraps
3+
from typing import Any, Callable
4+
5+
import newrelic.agent
6+
7+
# trace headers key
8+
TRACE_HEADERS_KEY = "_nr_trace_headers"
9+
10+
11+
def get_trace_headers() -> dict:
12+
headers_list: list = []
13+
newrelic.agent.insert_distributed_trace_headers(headers_list)
14+
return dict(headers_list)
15+
16+
17+
def accept_trace_headers(headers: dict | None, transport_type: str = "HTTP") -> None:
18+
"""
19+
Accept incoming trace headers to continue a distributed trace.
20+
21+
Args:
22+
headers: Trace headers from incoming request/message.
23+
transport_type: "HTTP" for HTTP requests, "Queue" for queue messages.
24+
"""
25+
if not headers:
26+
return
27+
newrelic.agent.accept_distributed_trace_headers(
28+
headers, transport_type=transport_type
29+
)
30+
31+
32+
def add_custom_attribute(key: str, value: Any) -> None:
33+
if value is not None:
34+
newrelic.agent.add_custom_attribute(key, value)
35+
36+
37+
def accept_trace_from_queue(func: Callable) -> Callable:
38+
"""
39+
Decorator to accept distributed trace headers from queue messages.
40+
41+
Extracts '_nr_trace_headers' from kwargs, accepts them, and removes
42+
them before calling the function.
43+
44+
Example:
45+
@celery_sqs.task
46+
@accept_trace_from_queue
47+
def process_incoming_spei_transaction_task(
48+
transaction: dict, session=None
49+
):
50+
...
51+
"""
52+
53+
@wraps(func)
54+
def wrapper(*args, **kwargs):
55+
trace_headers = kwargs.pop(TRACE_HEADERS_KEY, None)
56+
if trace_headers:
57+
accept_trace_headers(trace_headers, transport_type="Queue")
58+
return func(*args, **kwargs)
59+
60+
return wrapper
61+
62+
63+
def inject_trace_headers(param_name: str = "trace_headers"):
64+
"""
65+
Decorator to inject trace headers into HTTP calls.
66+
67+
Args:
68+
param_name: name of the parameter where headers will be injected.
69+
70+
Example:
71+
@inject_trace_headers()
72+
async def request(self, method, endpoint, trace_headers=None):
73+
async with session.request(..., headers=trace_headers):
74+
...
75+
"""
76+
77+
def decorator(func: Callable) -> Callable:
78+
sig = inspect.signature(func)
79+
80+
def _inject(args, kwargs):
81+
bound = sig.bind_partial(*args, **kwargs)
82+
headers = dict(bound.arguments.get(param_name) or {})
83+
headers.update(get_trace_headers())
84+
bound.arguments[param_name] = headers
85+
return bound.args, bound.kwargs
86+
87+
if inspect.iscoroutinefunction(func):
88+
89+
@wraps(func)
90+
async def async_wrapper(*args, **kwargs):
91+
new_args, new_kwargs = _inject(args, kwargs)
92+
return await func(*new_args, **new_kwargs)
93+
94+
return async_wrapper
95+
96+
@wraps(func)
97+
def sync_wrapper(*args, **kwargs):
98+
new_args, new_kwargs = _inject(args, kwargs)
99+
return func(*new_args, **new_kwargs)
100+
101+
return sync_wrapper
102+
103+
return decorator
104+
105+
106+
def trace_attributes(**extractors: Callable | str):
107+
"""
108+
Decorator to add custom attributes to New Relic traces.
109+
110+
Each kwarg is an attribute to add. The value can be:
111+
- str: name of the function parameter (e.g., 'folio_abono')
112+
- str with dot: path to an attribute (e.g., 'orden.clave_emisor')
113+
- callable: function that receives the kwargs and returns the value
114+
115+
Example:
116+
@trace_attributes(
117+
clave_rastreo=lambda kw: ','.join(kw['orden'].claves_rastreo),
118+
clave_emisor='orden.clave_emisor',
119+
folio='folio_abono',
120+
)
121+
async def handle_orden(orden, folio_abono):
122+
...
123+
"""
124+
125+
def decorator(func: Callable) -> Callable:
126+
sig = inspect.signature(func)
127+
128+
def _extract(args, kwargs):
129+
bound = sig.bind(*args, **kwargs)
130+
bound.apply_defaults()
131+
_add_attributes(bound.arguments, extractors)
132+
133+
if inspect.iscoroutinefunction(func):
134+
135+
@wraps(func)
136+
async def async_wrapper(*args, **kwargs):
137+
_extract(args, kwargs)
138+
return await func(*args, **kwargs)
139+
140+
return async_wrapper
141+
142+
@wraps(func)
143+
def sync_wrapper(*args, **kwargs):
144+
_extract(args, kwargs)
145+
return func(*args, **kwargs)
146+
147+
return sync_wrapper
148+
149+
return decorator
150+
151+
152+
def _get_nested_value(obj: Any, path: str) -> Any:
153+
parts = path.split(".")
154+
value = obj.get(parts[0]) if isinstance(obj, dict) else getattr(obj, parts[0], None)
155+
156+
for part in parts[1:]:
157+
if value is None:
158+
return None
159+
if isinstance(value, dict):
160+
value = value.get(part)
161+
else:
162+
value = getattr(value, part, None)
163+
return value
164+
165+
166+
def _add_attributes(kwargs: dict, extractors: dict) -> None:
167+
"""
168+
Internal function to extract and add attributes to the current trace.
169+
170+
Args:
171+
kwargs: Function arguments.
172+
extractors: Dict of attribute_name -> extractor (callable or string).
173+
"""
174+
for attr_name, extractor in extractors.items():
175+
try:
176+
if callable(extractor):
177+
value = extractor(kwargs)
178+
elif isinstance(extractor, str):
179+
value = _get_nested_value(kwargs, extractor)
180+
else:
181+
value = None
182+
183+
add_custom_attribute(attr_name, value)
184+
except Exception:
185+
pass # Silent exception
186+
# we don't want to fail if unable to extract an attribute

agave/version.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
__version__ = '1.5.2'
1+
__version__ = "1.5.3"

requirements-test.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,3 +12,4 @@ moto[server]==5.0.26
1212
pytest-vcr==1.0.2
1313
pytest-asyncio==0.18.*
1414
typing_extensions==4.12.2
15+
newrelic==11.2.0

setup.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,9 @@
3131
'chalice': [
3232
'chalice>=1.30.0,<2.0.0',
3333
],
34+
'tracing': [
35+
'newrelic>=7.0.0,<12.0.0',
36+
],
3437
'fastapi': [
3538
'fastapi>=0.115.0,<1.0.0',
3639
# TODO: Remove this once we upgrade to starlette:

0 commit comments

Comments
 (0)