Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
169 changes: 169 additions & 0 deletions agave/core/tracing.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
import inspect
from contextlib import contextmanager
from functools import wraps
from typing import Any, Callable, Optional, Union

try:
import newrelic.agent
except ImportError: # pragma: no cover
raise ImportError(
"You must install agave with [tracing] option.\n"
"You can install it with: pip install agave[tracing]"
)

# trace headers key
TRACE_HEADERS_KEY = "_nr_trace_headers"


@contextmanager
def background_task(
name: str,
group: str = "Task",
trace_headers: Optional[dict[str, str]] = None,
):
with newrelic.agent.BackgroundTask(
application=newrelic.agent.application(),
name=name,
group=group,
):
if trace_headers:
accept_trace_headers(trace_headers, transport_type="Queue")
yield


def get_trace_headers() -> dict[str, str]:
headers_list: list = []
newrelic.agent.insert_distributed_trace_headers(headers_list)
return dict(headers_list)
Comment on lines +34 to +37
Copy link
Copy Markdown
Member

@rogelioLpz rogelioLpz Jan 28, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Esta funcion es un poco extraña, inserta una lista vacía y el resultado siempre es el mismo.
Creo que es mejor quitar la funcion y hacer directo el insert_distributed_trace_headers

Copy link
Copy Markdown
Author

@dpastranak dpastranak Jan 28, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

es que más bien el API de newrelic está extraña. Pide pasar una lista vacía que muta internamente donde regresa los headers y esta función encapsula esta complejidad.

sin esta función se tendría que hacer

headers_list = []
newrelic.agent.insert_distributed_trace_headers(headers_list)
# Después de llamar, headers_list ya tiene datos:
# [('newrelic', 'eyJ2IjpbMCwxXS...'), ('traceparent', '00-abc123...')]
headers = dict(headers_list) # lo paso a un diccionario para pasarlo entre servicio. 

con la función solo se hace
headers = get_trace_headers()

o por qué dices que siempre regresa lo mismo?

Creo que justo esta función hace la interfaz más clara y simple.



def accept_trace_headers(
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Esta función no es necesaria.
Realmente solo hace una linea y el transport_type lo usas como Queue

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

En este caso la función también incluye la validación if not headers: return que evita errores cuando los headers son None. Además, el transport_type por default es "HTTP" y se puede usar en contexto HHTP no solo como Queue.
Sin esta función cada que se use tendría que repetirse la validación también. Creo que la función aporta valor.

headers: Optional[dict[str, str]], transport_type: str = "HTTP"
) -> None:
if not headers:
return
newrelic.agent.accept_distributed_trace_headers(
headers, transport_type=transport_type
)


def add_custom_attribute(key: str, value: Any) -> None:
if value is not None:
newrelic.agent.add_custom_attribute(key, value)


def accept_trace_from_queue(func: Callable) -> Callable:
def _accept(args, kwargs):
trace_headers = kwargs.pop(TRACE_HEADERS_KEY, None)
if not trace_headers and args and isinstance(args[0], dict):
trace_headers = args[0].pop(TRACE_HEADERS_KEY, None)
if trace_headers:
accept_trace_headers(trace_headers, transport_type="Queue")

if inspect.iscoroutinefunction(func):

@wraps(func)
async def async_wrapper(*args, **kwargs):
_accept(args, kwargs)
return await func(*args, **kwargs)

return async_wrapper

@wraps(func)
def sync_wrapper(*args, **kwargs):
_accept(args, kwargs)
return func(*args, **kwargs)

return sync_wrapper


def inject_trace_headers(param_name: str = "trace_headers"):
def decorator(func: Callable) -> Callable:
sig = inspect.signature(func)

def _inject(args, kwargs):
bound = sig.bind_partial(*args, **kwargs)
headers = dict(bound.arguments.get(param_name) or {})
headers.update(get_trace_headers())
bound.arguments[param_name] = headers
return bound.args, bound.kwargs

if inspect.iscoroutinefunction(func):

@wraps(func)
async def async_wrapper(*args, **kwargs):
new_args, new_kwargs = _inject(args, kwargs)
return await func(*new_args, **new_kwargs)

return async_wrapper

@wraps(func)
def sync_wrapper(*args, **kwargs):
new_args, new_kwargs = _inject(args, kwargs)
return func(*new_args, **new_kwargs)

return sync_wrapper

return decorator


def trace_attributes(**extractors: Union[Callable, str]):
def decorator(func: Callable) -> Callable:
sig = inspect.signature(func)

def _extract(args, kwargs):
bound = sig.bind(*args, **kwargs)
bound.apply_defaults()
_add_attributes(bound.arguments, extractors)

if inspect.iscoroutinefunction(func):

@wraps(func)
async def async_wrapper(*args, **kwargs):
_extract(args, kwargs)
return await func(*args, **kwargs)

return async_wrapper

@wraps(func)
def sync_wrapper(*args, **kwargs):
_extract(args, kwargs)
return func(*args, **kwargs)

return sync_wrapper

return decorator


def _get_nested_value(obj: Any, path: str) -> Any:
parts = path.split(".")
value = (
obj.get(parts[0])
if isinstance(obj, dict)
else getattr(obj, parts[0], None)
)

for part in parts[1:]:
if value is None:
return None
if isinstance(value, dict):
value = value.get(part)
else:
value = getattr(value, part, None)
return value


def _add_attributes(kwargs: dict, extractors: dict) -> None:
for attr_name, extractor in extractors.items():
try:
if callable(extractor):
value = extractor(kwargs)
elif isinstance(extractor, str):
value = _get_nested_value(kwargs, extractor)
else:
value = None

add_custom_attribute(attr_name, value)
except Exception:
pass # Silent exception
# we don't want to fail if unable to extract an attribute
2 changes: 1 addition & 1 deletion agave/version.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = '1.5.2'
__version__ = "1.5.3.dev01"
Copy link

Copilot AI Jan 30, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The PR description mentions bumping the version to 1.5.3, but the code sets __version__ to "1.5.3.dev01". Please either update the version string here or adjust the PR description so they are consistent.

Copilot uses AI. Check for mistakes.
64 changes: 64 additions & 0 deletions examples/tasks/tracing_example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
from pydantic import BaseModel

from agave.core.tracing import (
accept_trace_from_queue,
Comment on lines +1 to +4
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Add from __future__ import annotations for Python 3.9 compatibility.

Line 37 uses dict | None (PEP 604 union syntax) which requires Python 3.10+ at runtime. Adding the future import enables postponed annotation evaluation, making this compatible with Python 3.9.

🐛 Proposed fix
+from __future__ import annotations
+
 from pydantic import BaseModel
 
 from agave.core.tracing import (
🤖 Prompt for AI Agents
In `@examples/tasks/tracing_example.py` around lines 1 - 4, Add "from __future__
import annotations" at the top of the module to enable postponed evaluation of
annotations so PEP 604 union types like "dict | None" used later (e.g., in the
tracing_example.py annotations and any function signatures such as those
involving accept_trace_from_queue) work on Python 3.9; insert this single import
as the very first line before other imports (above BaseModel and
agave.core.tracing imports) so runtime parsing of "dict | None" is deferred.

add_custom_attribute,
background_task,
get_trace_headers,
inject_trace_headers,
trace_attributes,
)
from agave.tasks.sqs_tasks import task

# Esta URL es solo un mock de la queue.
# Debes reemplazarla con la URL de tu queue
QUEUE_URL = "http://127.0.0.1:4000/123456789012/core.fifo"


class Abono(BaseModel):
clave_rastreo: str
clave_emisor: str
monto: float


async def process_order(order: Abono) -> None:
with background_task(name="process_order", group="Orders"):
add_custom_attribute("order_id", order.clave_rastreo)
print(f"Processing order: {order.clave_rastreo}")


async def send_to_external_service(order: Abono) -> None:
headers = get_trace_headers()
print(f"Sending with headers: {headers}")


@inject_trace_headers()
async def request(
method: str, endpoint: str, trace_headers: dict | None = None
) -> None:
Comment on lines +37 to +38
Copy link

Copilot AI Jan 30, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The type annotation trace_headers: dict | None = None uses the PEP 604 union operator, which is only valid syntax in Python 3.10+, but this project declares python_requires='>=3.9' in setup.py. To keep the examples compatible with Python 3.9, please use Optional[dict] (with the appropriate import) or dict[str, Any] | None only after dropping 3.9 support.

Copilot uses AI. Check for mistakes.
# trace_headers ya contiene los headers de New Relic
print(f"{method} {endpoint} with headers: {trace_headers}")


@task(queue_url=QUEUE_URL, region_name="us-east-1")
@accept_trace_from_queue
async def process_incoming_transaction(transaction: dict) -> None:
print(f"Processing: {transaction}")


@trace_attributes(
clave_rastreo="order.clave_rastreo",
clave_emisor="order.clave_emisor",
monto=lambda kw: f"{kw['order'].monto:.2f}",
)
async def handle_order(order: Abono, folio: str) -> None:
print(f"Handling order {folio}: {order.clave_rastreo}")


async def process_from_queue(message: dict, trace_headers: dict) -> None:
with background_task(
name="process_from_queue",
group="Queue",
trace_headers=trace_headers,
):
print(f"Processing message: {message}")
1 change: 1 addition & 0 deletions requirements-test.txt
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,4 @@ moto[server]==5.0.26
pytest-vcr==1.0.2
pytest-asyncio==0.18.*
typing_extensions==4.12.2
newrelic==11.2.0
3 changes: 3 additions & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@
'chalice': [
'chalice>=1.30.0,<2.0.0',
],
'tracing': [
'newrelic>=7.0.0,<12.0.0',
],
'fastapi': [
'fastapi>=0.115.0,<1.0.0',
# TODO: Remove this once we upgrade to starlette:
Expand Down
Loading
Loading