From d8a5cd309ed7042fd714031c40c012200c69e895 Mon Sep 17 00:00:00 2001 From: Daniel Pastrana Date: Mon, 26 Jan 2026 15:09:52 -0600 Subject: [PATCH 1/6] Add distributed tracing module for New Relic integration Introduces agave.core.tracing with utilities to propagate and accept distributed trace headers across HTTP calls and queue messages: - get_trace_headers(): extract current trace headers - accept_trace_headers(): continue trace from incoming headers - add_custom_attribute(): add attributes to current trace - @accept_trace_from_queue: decorator for queue consumers - @inject_trace_headers: decorator for HTTP client calls - @trace_attributes: decorator to add custom trace attributes Also adds tracing optional dependency (newrelic>=7.0.0,<12.0.0) and bumps version to 1.5.3. --- agave/core/__init__.py | 19 ++ agave/core/tracing.py | 194 ++++++++++++++++++ agave/version.py | 2 +- requirements-test.txt | 1 + setup.py | 3 + tests/core/test_tracing.py | 392 +++++++++++++++++++++++++++++++++++++ 6 files changed, 610 insertions(+), 1 deletion(-) create mode 100644 agave/core/tracing.py create mode 100644 tests/core/test_tracing.py diff --git a/agave/core/__init__.py b/agave/core/__init__.py index e69de29b..221186fc 100644 --- a/agave/core/__init__.py +++ b/agave/core/__init__.py @@ -0,0 +1,19 @@ +from .tracing import ( + TRACE_HEADERS_KEY, + accept_trace_from_queue, + accept_trace_headers, + add_custom_attribute, + get_trace_headers, + inject_trace_headers, + trace_attributes, +) + +__all__ = [ + 'TRACE_HEADERS_KEY', + 'accept_trace_from_queue', + 'accept_trace_headers', + 'add_custom_attribute', + 'get_trace_headers', + 'inject_trace_headers', + 'trace_attributes', +] diff --git a/agave/core/tracing.py b/agave/core/tracing.py new file mode 100644 index 00000000..99a19a75 --- /dev/null +++ b/agave/core/tracing.py @@ -0,0 +1,194 @@ +from __future__ import annotations + +import inspect +from functools import wraps +from typing import Any, Callable + +import newrelic.agent + +# trace headers key +TRACE_HEADERS_KEY = "_nr_trace_headers" + + +def get_trace_headers() -> dict: + headers_list: list = [] + newrelic.agent.insert_distributed_trace_headers(headers_list) + return dict(headers_list) + + +def accept_trace_headers( + headers: dict | None, transport_type: str = "HTTP" +) -> None: + """ + Accept incoming trace headers to continue a distributed trace. + + Args: + headers: Trace headers from incoming request/message. + transport_type: "HTTP" for HTTP requests, "Queue" for queue messages. + """ + if not headers: + return + newrelic.agent.accept_distributed_trace_headers( + headers, transport_type=transport_type + ) + + +def add_custom_attribute(key: str, value: Any) -> None: + if value is not None: + newrelic.agent.add_custom_attribute(key, value) + + +def accept_trace_from_queue(func: Callable) -> Callable: + """ + Decorator to accept distributed trace headers from queue messages. + + Extracts '_nr_trace_headers' from kwargs, accepts them, and removes + them before calling the function. + + Example: + @celery_sqs.task + @accept_trace_from_queue + def process_incoming_spei_transaction_task( + transaction: dict, session=None + ): + ... + """ + + @wraps(func) + def wrapper(*args, **kwargs): + trace_headers = kwargs.pop(TRACE_HEADERS_KEY, None) + if trace_headers: + accept_trace_headers(trace_headers, transport_type="Queue") + return func(*args, **kwargs) + + return wrapper + + +def inject_trace_headers(param_name: str = "trace_headers"): + """ + Decorator to inject trace headers into HTTP calls. + + Args: + param_name: name of the parameter where headers will be injected. + + Example: + @inject_trace_headers() + async def request(self, method, endpoint, trace_headers=None): + async with session.request(..., headers=trace_headers): + ... + """ + + def decorator(func: Callable) -> Callable: + sig = inspect.signature(func) + + def _inject(args, kwargs): + bound = sig.bind_partial(*args, **kwargs) + headers = dict(bound.arguments.get(param_name) or {}) + headers.update(get_trace_headers()) + bound.arguments[param_name] = headers + return bound.args, bound.kwargs + + if inspect.iscoroutinefunction(func): + + @wraps(func) + async def async_wrapper(*args, **kwargs): + new_args, new_kwargs = _inject(args, kwargs) + return await func(*new_args, **new_kwargs) + + return async_wrapper + + @wraps(func) + def sync_wrapper(*args, **kwargs): + new_args, new_kwargs = _inject(args, kwargs) + return func(*new_args, **new_kwargs) + + return sync_wrapper + + return decorator + + +def trace_attributes(**extractors: Callable | str): + """ + Decorator to add custom attributes to New Relic traces. + + Each kwarg is an attribute to add. The value can be: + - str: name of the function parameter (e.g., 'folio_abono') + - str with dot: path to an attribute (e.g., 'orden.clave_emisor') + - callable: function that receives the kwargs and returns the value + + Example: + @trace_attributes( + clave_rastreo=lambda kw: ','.join(kw['orden'].claves_rastreo), + clave_emisor='orden.clave_emisor', + folio='folio_abono', + ) + async def handle_orden(orden, folio_abono): + ... + """ + + def decorator(func: Callable) -> Callable: + sig = inspect.signature(func) + + def _extract(args, kwargs): + bound = sig.bind(*args, **kwargs) + bound.apply_defaults() + _add_attributes(bound.arguments, extractors) + + if inspect.iscoroutinefunction(func): + + @wraps(func) + async def async_wrapper(*args, **kwargs): + _extract(args, kwargs) + return await func(*args, **kwargs) + + return async_wrapper + + @wraps(func) + def sync_wrapper(*args, **kwargs): + _extract(args, kwargs) + return func(*args, **kwargs) + + return sync_wrapper + + return decorator + + +def _get_nested_value(obj: Any, path: str) -> Any: + parts = path.split(".") + value = ( + obj.get(parts[0]) + if isinstance(obj, dict) + else getattr(obj, parts[0], None) + ) + + for part in parts[1:]: + if value is None: + return None + if isinstance(value, dict): + value = value.get(part) + else: + value = getattr(value, part, None) + return value + + +def _add_attributes(kwargs: dict, extractors: dict) -> None: + """ + Internal function to extract and add attributes to the current trace. + + Args: + kwargs: Function arguments. + extractors: Dict of attribute_name -> extractor (callable or string). + """ + for attr_name, extractor in extractors.items(): + try: + if callable(extractor): + value = extractor(kwargs) + elif isinstance(extractor, str): + value = _get_nested_value(kwargs, extractor) + else: + value = None + + add_custom_attribute(attr_name, value) + except Exception: + pass # Silent exception + # we don't want to fail if unable to extract an attribute diff --git a/agave/version.py b/agave/version.py index c3b38415..a06ff4e0 100644 --- a/agave/version.py +++ b/agave/version.py @@ -1 +1 @@ -__version__ = '1.5.2' +__version__ = "1.5.3" diff --git a/requirements-test.txt b/requirements-test.txt index 7038b79c..e970f9c4 100644 --- a/requirements-test.txt +++ b/requirements-test.txt @@ -12,3 +12,4 @@ moto[server]==5.0.26 pytest-vcr==1.0.2 pytest-asyncio==0.18.* typing_extensions==4.12.2 +newrelic==11.2.0 \ No newline at end of file diff --git a/setup.py b/setup.py index 590a6c40..3b7f5677 100644 --- a/setup.py +++ b/setup.py @@ -31,6 +31,9 @@ 'chalice': [ 'chalice>=1.30.0,<2.0.0', ], + 'tracing': [ + 'newrelic>=7.0.0,<12.0.0', + ], 'fastapi': [ 'fastapi>=0.115.0,<1.0.0', # TODO: Remove this once we upgrade to starlette: diff --git a/tests/core/test_tracing.py b/tests/core/test_tracing.py new file mode 100644 index 00000000..2ae8a038 --- /dev/null +++ b/tests/core/test_tracing.py @@ -0,0 +1,392 @@ +from __future__ import annotations + +import asyncio +from unittest.mock import patch + +from agave.core.tracing import ( + _add_attributes, + _get_nested_value, + accept_trace_from_queue, + accept_trace_headers, + add_custom_attribute, + get_trace_headers, + inject_trace_headers, + trace_attributes, +) + + +def test_get_trace_headers_returns_dict_with_headers(): + with patch("newrelic.agent") as mock_agent: + mock_agent.insert_distributed_trace_headers.side_effect = ( + lambda h: h.extend([("traceparent", "abc"), ("tracestate", "xyz")]) + ) + result = get_trace_headers() + assert result == {"traceparent": "abc", "tracestate": "xyz"} + + +def test_accept_trace_headers_accepts_headers(): + with patch("newrelic.agent") as mock_agent: + headers = {"traceparent": "abc"} + accept_trace_headers(headers, transport_type="Queue") + mock_fn = mock_agent.accept_distributed_trace_headers + mock_fn.assert_called_once_with(headers, transport_type="Queue") + + +def test_accept_trace_headers_does_nothing_when_no_headers(): + with patch("newrelic.agent") as mock_agent: + accept_trace_headers({}) + mock_agent.accept_distributed_trace_headers.assert_not_called() + + +def test_accept_trace_headers_does_nothing_when_headers_none(): + with patch("newrelic.agent") as mock_agent: + accept_trace_headers(None) + mock_agent.accept_distributed_trace_headers.assert_not_called() + + +def test_add_custom_attribute_adds_attribute(): + with patch("newrelic.agent") as mock_agent: + add_custom_attribute("key", "value") + mock_agent.add_custom_attribute.assert_called_once_with("key", "value") + + +def test_add_custom_attribute_does_nothing_when_value_none(): + with patch("newrelic.agent") as mock_agent: + add_custom_attribute("key", None) + mock_agent.add_custom_attribute.assert_not_called() + + +def test_add_custom_attribute_preserves_native_types(): + with patch("newrelic.agent") as mock_agent: + add_custom_attribute("key", 12345) + mock_agent.add_custom_attribute.assert_called_once_with("key", 12345) + + +def test_accept_trace_from_queue_accepts_headers(): + @accept_trace_from_queue + def my_task(data: dict): + return data + + with patch("agave.core.tracing.accept_trace_headers") as mock_accept: + result = my_task( + {"value": 1}, _nr_trace_headers={"traceparent": "abc"} + ) + assert result == {"value": 1} + mock_accept.assert_called_once_with( + {"traceparent": "abc"}, transport_type="Queue" + ) + + +def test_accept_trace_from_queue_removes_headers_from_kwargs(): + @accept_trace_from_queue + def my_task(**kwargs): + return kwargs + + with patch("agave.core.tracing.accept_trace_headers"): + result = my_task( + data={"value": 1}, _nr_trace_headers={"traceparent": "abc"} + ) + # _nr_trace_headers should be removed + assert "_nr_trace_headers" not in result + assert result == {"data": {"value": 1}} + + +def test_accept_trace_from_queue_works_without_headers(): + @accept_trace_from_queue + def my_task(data: dict): + return data + + with patch("agave.core.tracing.accept_trace_headers") as mock_accept: + result = my_task({"value": 1}) + assert result == {"value": 1} + mock_accept.assert_not_called() + + +def test_inject_trace_headers_async_keyword_arg(): + @inject_trace_headers() + async def my_func(_url: str, trace_headers: dict | None = None): + return trace_headers + + with patch( + "agave.core.tracing.get_trace_headers", + return_value={"traceparent": "abc"}, + ): + result = asyncio.run(my_func("http://example.com")) + assert result == {"traceparent": "abc"} + + +def test_inject_trace_headers_merges_existing_headers_async(): + @inject_trace_headers() + async def my_func(_url: str, trace_headers: dict | None = None): + return trace_headers + + with patch( + "agave.core.tracing.get_trace_headers", + return_value={"traceparent": "abc"}, + ): + result = asyncio.run( + my_func("http://example.com", trace_headers={"custom": "header"}) + ) + assert result == {"custom": "header", "traceparent": "abc"} + + +def test_inject_trace_headers_handles_positional_headers_async(): + """Test that positional args don't cause 'multiple values' error.""" + + @inject_trace_headers("headers") + async def my_func(_url: str, headers: dict | None = None): + return headers + + with patch( + "agave.core.tracing.get_trace_headers", + return_value={"traceparent": "abc"}, + ): + # Pass headers as positional argument + result = asyncio.run( + my_func("http://example.com", {"custom": "header"}) + ) + assert result == {"custom": "header", "traceparent": "abc"} + + +def test_inject_trace_headers_sync_keyword_arg(): + @inject_trace_headers() + def my_func(_url: str, trace_headers: dict | None = None): + return trace_headers + + with patch( + "agave.core.tracing.get_trace_headers", + return_value={"traceparent": "abc"}, + ): + result = my_func("http://example.com") + assert result == {"traceparent": "abc"} + + +def test_inject_trace_headers_merges_existing_headers_sync(): + @inject_trace_headers() + def my_func(_url: str, trace_headers: dict | None = None): + return trace_headers + + with patch( + "agave.core.tracing.get_trace_headers", + return_value={"traceparent": "abc"}, + ): + result = my_func( + "http://example.com", trace_headers={"custom": "header"} + ) + assert result == {"custom": "header", "traceparent": "abc"} + + +def test_inject_trace_headers_handles_positional_headers_sync(): + """Test that positional args don't cause 'multiple values' error.""" + + @inject_trace_headers("headers") + def my_func(_url: str, headers: dict | None = None): + return headers + + with patch( + "agave.core.tracing.get_trace_headers", + return_value={"traceparent": "abc"}, + ): + # Pass headers as positional argument + result = my_func("http://example.com", {"custom": "header"}) + assert result == {"custom": "header", "traceparent": "abc"} + + +def test_inject_trace_headers_custom_param_name(): + @inject_trace_headers("custom_headers") + def my_func(_url: str, custom_headers: dict | None = None): + return custom_headers + + with patch( + "agave.core.tracing.get_trace_headers", + return_value={"traceparent": "abc"}, + ): + result = my_func("http://example.com") + assert result == {"traceparent": "abc"} + + +def test_trace_attributes_extracts_simple_kwarg_async(): + @trace_attributes(my_attr="value") + async def my_func(value: str): + return value + + with patch("agave.core.tracing.add_custom_attribute") as mock_add: + result = asyncio.run(my_func("test_value")) + assert result == "test_value" + mock_add.assert_called_once_with("my_attr", "test_value") + + +def test_trace_attributes_extracts_nested_attr_async(): + class Order: + def __init__(self): + self.clave_emisor = "12345" + + @trace_attributes(emisor="order.clave_emisor") + async def my_func(order: Order): + return order.clave_emisor + + with patch("agave.core.tracing.add_custom_attribute") as mock_add: + result = asyncio.run(my_func(Order())) + assert result == "12345" + mock_add.assert_called_once_with("emisor", "12345") + + +def test_trace_attributes_extracts_with_callable_async(): + @trace_attributes(total=lambda kw: kw["a"] + kw["b"]) + async def my_func(a: int, b: int): + return a + b + + with patch("agave.core.tracing.add_custom_attribute") as mock_add: + result = asyncio.run(my_func(1, 2)) + assert result == 3 + mock_add.assert_called_once_with("total", 3) + + +def test_trace_attributes_extracts_simple_kwarg_sync(): + @trace_attributes(my_attr="value") + def my_func(value: str): + return value + + with patch("agave.core.tracing.add_custom_attribute") as mock_add: + result = my_func("test_value") + assert result == "test_value" + mock_add.assert_called_once_with("my_attr", "test_value") + + +def test_trace_attributes_extracts_nested_attr_sync(): + class Order: + def __init__(self): + self.clave_emisor = "12345" + + @trace_attributes(emisor="order.clave_emisor") + def my_func(order: Order): + return order.clave_emisor + + with patch("agave.core.tracing.add_custom_attribute") as mock_add: + result = my_func(Order()) + assert result == "12345" + mock_add.assert_called_once_with("emisor", "12345") + + +def test_trace_attributes_extracts_with_callable_sync(): + @trace_attributes(total=lambda kw: kw["a"] + kw["b"]) + def my_func(a: int, b: int): + return a + b + + with patch("agave.core.tracing.add_custom_attribute") as mock_add: + result = my_func(1, 2) + assert result == 3 + mock_add.assert_called_once_with("total", 3) + + +def test_trace_attributes_handles_positional_args(): + """Test that positional args are properly bound to param names.""" + + @trace_attributes(x_val="x", y_val="y") + def my_func(x: int, y: int): + return x + y + + with patch("agave.core.tracing.add_custom_attribute") as mock_add: + result = my_func(10, 20) # Both as positional + assert result == 30 + assert mock_add.call_count == 2 + + +def test_trace_attributes_handles_extraction_error_gracefully(): + @trace_attributes(missing="nonexistent.path.deep") + def my_func(value: str): + return value + + with patch("agave.core.tracing.add_custom_attribute") as mock_add: + # Should not raise, just silently fail the attribute + result = my_func("test") + assert result == "test" + mock_add.assert_called_once_with("missing", None) + + +def test_get_nested_value_from_dict(): + obj = {"transaction": {"clave_rastreo": "ABC123"}} + result = _get_nested_value(obj, "transaction.clave_rastreo") + assert result == "ABC123" + + +def test_get_nested_value_from_object(): + class Transaction: + clave_rastreo = "ABC123" + + class Data: + transaction = Transaction() + + result = _get_nested_value(Data(), "transaction.clave_rastreo") + assert result == "ABC123" + + +def test_get_nested_value_returns_none_for_missing_path(): + obj = {"transaction": {}} + result = _get_nested_value(obj, "transaction.missing.deep") + assert result is None + + +def test_get_nested_value_simple_key(): + obj = {"name": "test"} + result = _get_nested_value(obj, "name") + assert result == "test" + + +def test_get_nested_value_mixed_dict_and_object(): + class Inner: + value = "nested" + + obj = {"outer": Inner()} + result = _get_nested_value(obj, "outer.value") + assert result == "nested" + + +def test_add_attributes_with_callable_extractor(): + kwargs = {"a": 1, "b": 2} + extractors = {"sum": lambda kw: kw["a"] + kw["b"]} + + with patch("agave.core.tracing.add_custom_attribute") as mock_add: + _add_attributes(kwargs, extractors) + mock_add.assert_called_once_with("sum", 3) + + +def test_add_attributes_with_string_extractor(): + kwargs = {"name": "test"} + extractors = {"result": "name"} + + with patch("agave.core.tracing.add_custom_attribute") as mock_add: + _add_attributes(kwargs, extractors) + mock_add.assert_called_once_with("result", "test") + + +def test_add_attributes_with_dotted_path_extractor(): + class Obj: + value = "nested" + + kwargs = {"obj": Obj()} + extractors = {"result": "obj.value"} + + with patch("agave.core.tracing.add_custom_attribute") as mock_add: + _add_attributes(kwargs, extractors) + mock_add.assert_called_once_with("result", "nested") + + +def test_add_attributes_handles_exception_gracefully(): + kwargs = {} + extractors = {"broken": lambda kw: kw["missing"]} + + with patch("agave.core.tracing.add_custom_attribute") as mock_add: + # Should not raise + _add_attributes(kwargs, extractors) + mock_add.assert_not_called() + + +def test_add_attributes_with_non_callable_non_string_extractor(): + """Test that non-callable non-string extractors result in None.""" + kwargs = {"name": "test"} + extractors = {"result": 12345} # Not a callable, not a string + + with patch("agave.core.tracing.add_custom_attribute") as mock_add: + _add_attributes(kwargs, extractors) + mock_add.assert_called_once_with("result", None) From 163482a1ef21d5bbc85afe77dbd3c2498fd41d72 Mon Sep 17 00:00:00 2001 From: Daniel Pastrana Date: Mon, 26 Jan 2026 15:20:54 -0600 Subject: [PATCH 2/6] Fix async support to accept_trace_from_queue - Update accept_trace_from_queue decorator to handle both sync and async functions - Add tests for async queue handler support --- agave/core/tracing.py | 18 +++++++++++++++--- tests/core/test_tracing.py | 26 ++++++++++++++++++++++++++ 2 files changed, 41 insertions(+), 3 deletions(-) diff --git a/agave/core/tracing.py b/agave/core/tracing.py index 99a19a75..53292457 100644 --- a/agave/core/tracing.py +++ b/agave/core/tracing.py @@ -54,14 +54,26 @@ def process_incoming_spei_transaction_task( ... """ - @wraps(func) - def wrapper(*args, **kwargs): + def _accept(kwargs): trace_headers = kwargs.pop(TRACE_HEADERS_KEY, None) if trace_headers: accept_trace_headers(trace_headers, transport_type="Queue") + + if inspect.iscoroutinefunction(func): + + @wraps(func) + async def async_wrapper(*args, **kwargs): + _accept(kwargs) + return await func(*args, **kwargs) + + return async_wrapper + + @wraps(func) + def sync_wrapper(*args, **kwargs): + _accept(kwargs) return func(*args, **kwargs) - return wrapper + return sync_wrapper def inject_trace_headers(param_name: str = "trace_headers"): diff --git a/tests/core/test_tracing.py b/tests/core/test_tracing.py index 2ae8a038..36e3483d 100644 --- a/tests/core/test_tracing.py +++ b/tests/core/test_tracing.py @@ -102,6 +102,32 @@ def my_task(data: dict): mock_accept.assert_not_called() +def test_accept_trace_from_queue_async_accepts_headers(): + @accept_trace_from_queue + async def my_task(data: dict): + return data + + with patch("agave.core.tracing.accept_trace_headers") as mock_accept: + result = asyncio.run( + my_task({"value": 1}, _nr_trace_headers={"traceparent": "abc"}) + ) + assert result == {"value": 1} + mock_accept.assert_called_once_with( + {"traceparent": "abc"}, transport_type="Queue" + ) + + +def test_accept_trace_from_queue_async_works_without_headers(): + @accept_trace_from_queue + async def my_task(data: dict): + return data + + with patch("agave.core.tracing.accept_trace_headers") as mock_accept: + result = asyncio.run(my_task({"value": 1})) + assert result == {"value": 1} + mock_accept.assert_not_called() + + def test_inject_trace_headers_async_keyword_arg(): @inject_trace_headers() async def my_func(_url: str, trace_headers: dict | None = None): From 68264646f4d108105305d804284d93b3c01975f9 Mon Sep 17 00:00:00 2001 From: Daniel Pastrana Date: Mon, 26 Jan 2026 15:48:49 -0600 Subject: [PATCH 3/6] change debug version --- agave/version.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/agave/version.py b/agave/version.py index a06ff4e0..0638b869 100644 --- a/agave/version.py +++ b/agave/version.py @@ -1 +1 @@ -__version__ = "1.5.3" +__version__ = "1.5.3.dev00" From ceff470cb33205d9072aacb0cb32088201faebf0 Mon Sep 17 00:00:00 2001 From: Daniel Pastrana Date: Mon, 26 Jan 2026 20:40:44 -0600 Subject: [PATCH 4/6] add nerelic background rasks contextmanager --- agave/core/__init__.py | 2 ++ agave/core/tracing.py | 19 ++++++++++++++- agave/version.py | 2 +- tests/core/test_tracing.py | 49 ++++++++++++++++++++++++++++++++++++++ 4 files changed, 70 insertions(+), 2 deletions(-) diff --git a/agave/core/__init__.py b/agave/core/__init__.py index 221186fc..326e02ab 100644 --- a/agave/core/__init__.py +++ b/agave/core/__init__.py @@ -3,6 +3,7 @@ accept_trace_from_queue, accept_trace_headers, add_custom_attribute, + background_task, get_trace_headers, inject_trace_headers, trace_attributes, @@ -13,6 +14,7 @@ 'accept_trace_from_queue', 'accept_trace_headers', 'add_custom_attribute', + 'background_task', 'get_trace_headers', 'inject_trace_headers', 'trace_attributes', diff --git a/agave/core/tracing.py b/agave/core/tracing.py index 53292457..d1fb4d04 100644 --- a/agave/core/tracing.py +++ b/agave/core/tracing.py @@ -1,8 +1,9 @@ from __future__ import annotations import inspect +from contextlib import contextmanager from functools import wraps -from typing import Any, Callable +from typing import Any, Callable, Optional import newrelic.agent @@ -10,6 +11,22 @@ TRACE_HEADERS_KEY = "_nr_trace_headers" +@contextmanager +def background_task( + name: str, + group: str = "Task", + trace_headers: Optional[dict] = None, +): + with newrelic.agent.BackgroundTask( + application=newrelic.agent.application(), + name=name, + group=group, + ): + if trace_headers: + accept_trace_headers(trace_headers, transport_type="Queue") + yield + + def get_trace_headers() -> dict: headers_list: list = [] newrelic.agent.insert_distributed_trace_headers(headers_list) diff --git a/agave/version.py b/agave/version.py index 0638b869..58611c67 100644 --- a/agave/version.py +++ b/agave/version.py @@ -1 +1 @@ -__version__ = "1.5.3.dev00" +__version__ = "1.5.3.dev01" diff --git a/tests/core/test_tracing.py b/tests/core/test_tracing.py index 36e3483d..4ff03b9b 100644 --- a/tests/core/test_tracing.py +++ b/tests/core/test_tracing.py @@ -9,6 +9,7 @@ accept_trace_from_queue, accept_trace_headers, add_custom_attribute, + background_task, get_trace_headers, inject_trace_headers, trace_attributes, @@ -416,3 +417,51 @@ def test_add_attributes_with_non_callable_non_string_extractor(): with patch("agave.core.tracing.add_custom_attribute") as mock_add: _add_attributes(kwargs, extractors) mock_add.assert_called_once_with("result", None) + + +def test_background_task_creates_transaction(): + """Test that background_task creates a New Relic BackgroundTask.""" + with patch("agave.core.tracing.newrelic.agent") as mock_agent: + mock_agent.application.return_value = "test_app" + mock_agent.BackgroundTask.return_value.__enter__ = lambda s: None + mock_agent.BackgroundTask.return_value.__exit__ = lambda s, *a: None + + with background_task("my_task", "Redis/Queue"): + pass + + mock_agent.BackgroundTask.assert_called_once_with( + application="test_app", + name="my_task", + group="Redis/Queue", + ) + + +def test_background_task_accepts_trace_headers(): + """Test that background_task accepts trace headers inside transaction.""" + with patch("agave.core.tracing.newrelic.agent") as mock_agent: + mock_agent.application.return_value = "test_app" + mock_agent.BackgroundTask.return_value.__enter__ = lambda s: None + mock_agent.BackgroundTask.return_value.__exit__ = lambda s, *a: None + + with patch("agave.core.tracing.accept_trace_headers") as mock_accept: + headers = {"traceparent": "abc"} + with background_task("my_task", "SQS/Queue", headers): + pass + + mock_accept.assert_called_once_with( + headers, transport_type="Queue" + ) + + +def test_background_task_does_not_accept_when_no_headers(): + """Test that background_task skips accept when no trace headers.""" + with patch("agave.core.tracing.newrelic.agent") as mock_agent: + mock_agent.application.return_value = "test_app" + mock_agent.BackgroundTask.return_value.__enter__ = lambda s: None + mock_agent.BackgroundTask.return_value.__exit__ = lambda s, *a: None + + with patch("agave.core.tracing.accept_trace_headers") as mock_accept: + with background_task("my_task", "Task"): + pass + + mock_accept.assert_not_called() From 2002fbdb19b1fce1be9d86916b0d127f8830584e Mon Sep 17 00:00:00 2001 From: Daniel Pastrana Date: Wed, 28 Jan 2026 11:27:20 -0600 Subject: [PATCH 5/6] pr comments: Address comments for tracing module - Make newrelic import explicit with clear error message for missing dependency - Remove tracing re-exports from agave.core to avoid forcing newrelic installation - Use Optional and Union for Python 3.9 compatibility instead of PEP 604 syntax and future module - Add specific type hints dict[str, str] for trace headers - Add trailing newline to requirements-test.txt --- agave/core/__init__.py | 21 --------------------- agave/core/tracing.py | 22 +++++++++++++--------- requirements-test.txt | 2 +- tests/core/test_tracing.py | 17 ++++++++--------- 4 files changed, 22 insertions(+), 40 deletions(-) diff --git a/agave/core/__init__.py b/agave/core/__init__.py index 326e02ab..e69de29b 100644 --- a/agave/core/__init__.py +++ b/agave/core/__init__.py @@ -1,21 +0,0 @@ -from .tracing import ( - TRACE_HEADERS_KEY, - accept_trace_from_queue, - accept_trace_headers, - add_custom_attribute, - background_task, - get_trace_headers, - inject_trace_headers, - trace_attributes, -) - -__all__ = [ - 'TRACE_HEADERS_KEY', - 'accept_trace_from_queue', - 'accept_trace_headers', - 'add_custom_attribute', - 'background_task', - 'get_trace_headers', - 'inject_trace_headers', - 'trace_attributes', -] diff --git a/agave/core/tracing.py b/agave/core/tracing.py index d1fb4d04..bdcad0fb 100644 --- a/agave/core/tracing.py +++ b/agave/core/tracing.py @@ -1,11 +1,15 @@ -from __future__ import annotations - import inspect from contextlib import contextmanager from functools import wraps -from typing import Any, Callable, Optional - -import newrelic.agent +from typing import Any, Callable, Optional, Union + +try: + import newrelic.agent +except ImportError: # pragma: no cover + raise ImportError( + "You must install agave with [tracing] option.\n" + "You can install it with: pip install agave[tracing]" + ) # trace headers key TRACE_HEADERS_KEY = "_nr_trace_headers" @@ -15,7 +19,7 @@ def background_task( name: str, group: str = "Task", - trace_headers: Optional[dict] = None, + trace_headers: Optional[dict[str, str]] = None, ): with newrelic.agent.BackgroundTask( application=newrelic.agent.application(), @@ -27,14 +31,14 @@ def background_task( yield -def get_trace_headers() -> dict: +def get_trace_headers() -> dict[str, str]: headers_list: list = [] newrelic.agent.insert_distributed_trace_headers(headers_list) return dict(headers_list) def accept_trace_headers( - headers: dict | None, transport_type: str = "HTTP" + headers: Optional[dict[str, str]], transport_type: str = "HTTP" ) -> None: """ Accept incoming trace headers to continue a distributed trace. @@ -136,7 +140,7 @@ def sync_wrapper(*args, **kwargs): return decorator -def trace_attributes(**extractors: Callable | str): +def trace_attributes(**extractors: Union[Callable, str]): """ Decorator to add custom attributes to New Relic traces. diff --git a/requirements-test.txt b/requirements-test.txt index e970f9c4..b85c1583 100644 --- a/requirements-test.txt +++ b/requirements-test.txt @@ -12,4 +12,4 @@ moto[server]==5.0.26 pytest-vcr==1.0.2 pytest-asyncio==0.18.* typing_extensions==4.12.2 -newrelic==11.2.0 \ No newline at end of file +newrelic==11.2.0 diff --git a/tests/core/test_tracing.py b/tests/core/test_tracing.py index 4ff03b9b..b110e9a0 100644 --- a/tests/core/test_tracing.py +++ b/tests/core/test_tracing.py @@ -1,6 +1,5 @@ -from __future__ import annotations - import asyncio +from typing import Optional from unittest.mock import patch from agave.core.tracing import ( @@ -131,7 +130,7 @@ async def my_task(data: dict): def test_inject_trace_headers_async_keyword_arg(): @inject_trace_headers() - async def my_func(_url: str, trace_headers: dict | None = None): + async def my_func(_url: str, trace_headers: Optional[dict] = None): return trace_headers with patch( @@ -144,7 +143,7 @@ async def my_func(_url: str, trace_headers: dict | None = None): def test_inject_trace_headers_merges_existing_headers_async(): @inject_trace_headers() - async def my_func(_url: str, trace_headers: dict | None = None): + async def my_func(_url: str, trace_headers: Optional[dict] = None): return trace_headers with patch( @@ -161,7 +160,7 @@ def test_inject_trace_headers_handles_positional_headers_async(): """Test that positional args don't cause 'multiple values' error.""" @inject_trace_headers("headers") - async def my_func(_url: str, headers: dict | None = None): + async def my_func(_url: str, headers: Optional[dict] = None): return headers with patch( @@ -177,7 +176,7 @@ async def my_func(_url: str, headers: dict | None = None): def test_inject_trace_headers_sync_keyword_arg(): @inject_trace_headers() - def my_func(_url: str, trace_headers: dict | None = None): + def my_func(_url: str, trace_headers: Optional[dict] = None): return trace_headers with patch( @@ -190,7 +189,7 @@ def my_func(_url: str, trace_headers: dict | None = None): def test_inject_trace_headers_merges_existing_headers_sync(): @inject_trace_headers() - def my_func(_url: str, trace_headers: dict | None = None): + def my_func(_url: str, trace_headers: Optional[dict] = None): return trace_headers with patch( @@ -207,7 +206,7 @@ def test_inject_trace_headers_handles_positional_headers_sync(): """Test that positional args don't cause 'multiple values' error.""" @inject_trace_headers("headers") - def my_func(_url: str, headers: dict | None = None): + def my_func(_url: str, headers: Optional[dict] = None): return headers with patch( @@ -221,7 +220,7 @@ def my_func(_url: str, headers: dict | None = None): def test_inject_trace_headers_custom_param_name(): @inject_trace_headers("custom_headers") - def my_func(_url: str, custom_headers: dict | None = None): + def my_func(_url: str, custom_headers: Optional[dict] = None): return custom_headers with patch( From 2ebb1ef2f54e6a63dc7654a503eaf2c1471f317a Mon Sep 17 00:00:00 2001 From: Daniel Pastrana Date: Wed, 28 Jan 2026 16:04:08 -0600 Subject: [PATCH 6/6] Add tracing examples and SQS integration tests - Remove docstrings from tracing module now examples serves as docs - Add examples/tasks/tracing_example.py with usage examples - Update accept_trace_from_queue to extract headers from body dict - Add integration tests for tracing with SQS tasks --- agave/core/tracing.py | 68 +++---------------------------- examples/tasks/tracing_example.py | 64 +++++++++++++++++++++++++++++ tests/core/test_tracing.py | 65 +++++++++++++++++++++++++++++ 3 files changed, 134 insertions(+), 63 deletions(-) create mode 100644 examples/tasks/tracing_example.py diff --git a/agave/core/tracing.py b/agave/core/tracing.py index bdcad0fb..f56b0608 100644 --- a/agave/core/tracing.py +++ b/agave/core/tracing.py @@ -40,13 +40,6 @@ def get_trace_headers() -> dict[str, str]: def accept_trace_headers( headers: Optional[dict[str, str]], transport_type: str = "HTTP" ) -> None: - """ - Accept incoming trace headers to continue a distributed trace. - - Args: - headers: Trace headers from incoming request/message. - transport_type: "HTTP" for HTTP requests, "Queue" for queue messages. - """ if not headers: return newrelic.agent.accept_distributed_trace_headers( @@ -60,23 +53,10 @@ def add_custom_attribute(key: str, value: Any) -> None: def accept_trace_from_queue(func: Callable) -> Callable: - """ - Decorator to accept distributed trace headers from queue messages. - - Extracts '_nr_trace_headers' from kwargs, accepts them, and removes - them before calling the function. - - Example: - @celery_sqs.task - @accept_trace_from_queue - def process_incoming_spei_transaction_task( - transaction: dict, session=None - ): - ... - """ - - def _accept(kwargs): + def _accept(args, kwargs): trace_headers = kwargs.pop(TRACE_HEADERS_KEY, None) + if not trace_headers and args and isinstance(args[0], dict): + trace_headers = args[0].pop(TRACE_HEADERS_KEY, None) if trace_headers: accept_trace_headers(trace_headers, transport_type="Queue") @@ -84,33 +64,20 @@ def _accept(kwargs): @wraps(func) async def async_wrapper(*args, **kwargs): - _accept(kwargs) + _accept(args, kwargs) return await func(*args, **kwargs) return async_wrapper @wraps(func) def sync_wrapper(*args, **kwargs): - _accept(kwargs) + _accept(args, kwargs) return func(*args, **kwargs) return sync_wrapper def inject_trace_headers(param_name: str = "trace_headers"): - """ - Decorator to inject trace headers into HTTP calls. - - Args: - param_name: name of the parameter where headers will be injected. - - Example: - @inject_trace_headers() - async def request(self, method, endpoint, trace_headers=None): - async with session.request(..., headers=trace_headers): - ... - """ - def decorator(func: Callable) -> Callable: sig = inspect.signature(func) @@ -141,24 +108,6 @@ def sync_wrapper(*args, **kwargs): def trace_attributes(**extractors: Union[Callable, str]): - """ - Decorator to add custom attributes to New Relic traces. - - Each kwarg is an attribute to add. The value can be: - - str: name of the function parameter (e.g., 'folio_abono') - - str with dot: path to an attribute (e.g., 'orden.clave_emisor') - - callable: function that receives the kwargs and returns the value - - Example: - @trace_attributes( - clave_rastreo=lambda kw: ','.join(kw['orden'].claves_rastreo), - clave_emisor='orden.clave_emisor', - folio='folio_abono', - ) - async def handle_orden(orden, folio_abono): - ... - """ - def decorator(func: Callable) -> Callable: sig = inspect.signature(func) @@ -205,13 +154,6 @@ def _get_nested_value(obj: Any, path: str) -> Any: def _add_attributes(kwargs: dict, extractors: dict) -> None: - """ - Internal function to extract and add attributes to the current trace. - - Args: - kwargs: Function arguments. - extractors: Dict of attribute_name -> extractor (callable or string). - """ for attr_name, extractor in extractors.items(): try: if callable(extractor): diff --git a/examples/tasks/tracing_example.py b/examples/tasks/tracing_example.py new file mode 100644 index 00000000..daecf13c --- /dev/null +++ b/examples/tasks/tracing_example.py @@ -0,0 +1,64 @@ +from pydantic import BaseModel + +from agave.core.tracing import ( + accept_trace_from_queue, + add_custom_attribute, + background_task, + get_trace_headers, + inject_trace_headers, + trace_attributes, +) +from agave.tasks.sqs_tasks import task + +# Esta URL es solo un mock de la queue. +# Debes reemplazarla con la URL de tu queue +QUEUE_URL = "http://127.0.0.1:4000/123456789012/core.fifo" + + +class Abono(BaseModel): + clave_rastreo: str + clave_emisor: str + monto: float + + +async def process_order(order: Abono) -> None: + with background_task(name="process_order", group="Orders"): + add_custom_attribute("order_id", order.clave_rastreo) + print(f"Processing order: {order.clave_rastreo}") + + +async def send_to_external_service(order: Abono) -> None: + headers = get_trace_headers() + print(f"Sending with headers: {headers}") + + +@inject_trace_headers() +async def request( + method: str, endpoint: str, trace_headers: dict | None = None +) -> None: + # trace_headers ya contiene los headers de New Relic + print(f"{method} {endpoint} with headers: {trace_headers}") + + +@task(queue_url=QUEUE_URL, region_name="us-east-1") +@accept_trace_from_queue +async def process_incoming_transaction(transaction: dict) -> None: + print(f"Processing: {transaction}") + + +@trace_attributes( + clave_rastreo="order.clave_rastreo", + clave_emisor="order.clave_emisor", + monto=lambda kw: f"{kw['order'].monto:.2f}", +) +async def handle_order(order: Abono, folio: str) -> None: + print(f"Handling order {folio}: {order.clave_rastreo}") + + +async def process_from_queue(message: dict, trace_headers: dict) -> None: + with background_task( + name="process_from_queue", + group="Queue", + trace_headers=trace_headers, + ): + print(f"Processing message: {message}") diff --git a/tests/core/test_tracing.py b/tests/core/test_tracing.py index b110e9a0..b8df039e 100644 --- a/tests/core/test_tracing.py +++ b/tests/core/test_tracing.py @@ -1,4 +1,5 @@ import asyncio +import json from typing import Optional from unittest.mock import patch @@ -13,6 +14,9 @@ inject_trace_headers, trace_attributes, ) +from agave.tasks.sqs_tasks import task + +from ..utils import CORE_QUEUE_REGION def test_get_trace_headers_returns_dict_with_headers(): @@ -464,3 +468,64 @@ def test_background_task_does_not_accept_when_no_headers(): pass mock_accept.assert_not_called() + + +async def test_accept_trace_from_queue_with_sqs_task(sqs_client): + @task( + queue_url=sqs_client.queue_url, + region_name=CORE_QUEUE_REGION, + wait_time_seconds=1, + visibility_timeout=1, + ) + @accept_trace_from_queue + async def my_task(data: dict): + return data + + message = {"value": 1, "_nr_trace_headers": {"traceparent": "abc123"}} + await sqs_client.send_message( + MessageBody=json.dumps(message), + MessageGroupId="1234", + ) + + with patch("agave.core.tracing.accept_trace_headers") as mock_accept: + await my_task() + mock_accept.assert_called_once_with( + {"traceparent": "abc123"}, transport_type="Queue" + ) + + +async def test_trace_headers_propagation_through_queue(sqs_client): + received_data = {} + + @task( + queue_url=sqs_client.queue_url, + region_name=CORE_QUEUE_REGION, + wait_time_seconds=1, + visibility_timeout=1, + ) + @accept_trace_from_queue + async def consumer_task(data: dict): + received_data.update(data) + + trace_headers = { + "traceparent": "00-abc123-def456-01", + "tracestate": "nr=xyz", + } + message = { + "order_id": "ORD001", + "_nr_trace_headers": trace_headers, + } + + await sqs_client.send_message( + MessageBody=json.dumps(message), + MessageGroupId="propagation-test", + ) + + with patch("agave.core.tracing.accept_trace_headers") as mock_accept: + await consumer_task() + mock_accept.assert_called_once_with( + trace_headers, transport_type="Queue" + ) + + assert received_data == {"order_id": "ORD001"} + assert "_nr_trace_headers" not in received_data