diff --git a/agave/core/tracing.py b/agave/core/tracing.py new file mode 100644 index 00000000..f56b0608 --- /dev/null +++ b/agave/core/tracing.py @@ -0,0 +1,169 @@ +import inspect +from contextlib import contextmanager +from functools import wraps +from typing import Any, Callable, Optional, Union + +try: + import newrelic.agent +except ImportError: # pragma: no cover + raise ImportError( + "You must install agave with [tracing] option.\n" + "You can install it with: pip install agave[tracing]" + ) + +# trace headers key +TRACE_HEADERS_KEY = "_nr_trace_headers" + + +@contextmanager +def background_task( + name: str, + group: str = "Task", + trace_headers: Optional[dict[str, str]] = None, +): + with newrelic.agent.BackgroundTask( + application=newrelic.agent.application(), + name=name, + group=group, + ): + if trace_headers: + accept_trace_headers(trace_headers, transport_type="Queue") + yield + + +def get_trace_headers() -> dict[str, str]: + headers_list: list = [] + newrelic.agent.insert_distributed_trace_headers(headers_list) + return dict(headers_list) + + +def accept_trace_headers( + headers: Optional[dict[str, str]], transport_type: str = "HTTP" +) -> None: + if not headers: + return + newrelic.agent.accept_distributed_trace_headers( + headers, transport_type=transport_type + ) + + +def add_custom_attribute(key: str, value: Any) -> None: + if value is not None: + newrelic.agent.add_custom_attribute(key, value) + + +def accept_trace_from_queue(func: Callable) -> Callable: + def _accept(args, kwargs): + trace_headers = kwargs.pop(TRACE_HEADERS_KEY, None) + if not trace_headers and args and isinstance(args[0], dict): + trace_headers = args[0].pop(TRACE_HEADERS_KEY, None) + if trace_headers: + accept_trace_headers(trace_headers, transport_type="Queue") + + if inspect.iscoroutinefunction(func): + + @wraps(func) + async def async_wrapper(*args, **kwargs): + _accept(args, kwargs) + return await func(*args, **kwargs) + + return async_wrapper + + @wraps(func) + def sync_wrapper(*args, **kwargs): + _accept(args, kwargs) + return func(*args, **kwargs) + + return sync_wrapper + + +def inject_trace_headers(param_name: str = "trace_headers"): + def decorator(func: Callable) -> Callable: + sig = inspect.signature(func) + + def _inject(args, kwargs): + bound = sig.bind_partial(*args, **kwargs) + headers = dict(bound.arguments.get(param_name) or {}) + headers.update(get_trace_headers()) + bound.arguments[param_name] = headers + return bound.args, bound.kwargs + + if inspect.iscoroutinefunction(func): + + @wraps(func) + async def async_wrapper(*args, **kwargs): + new_args, new_kwargs = _inject(args, kwargs) + return await func(*new_args, **new_kwargs) + + return async_wrapper + + @wraps(func) + def sync_wrapper(*args, **kwargs): + new_args, new_kwargs = _inject(args, kwargs) + return func(*new_args, **new_kwargs) + + return sync_wrapper + + return decorator + + +def trace_attributes(**extractors: Union[Callable, str]): + def decorator(func: Callable) -> Callable: + sig = inspect.signature(func) + + def _extract(args, kwargs): + bound = sig.bind(*args, **kwargs) + bound.apply_defaults() + _add_attributes(bound.arguments, extractors) + + if inspect.iscoroutinefunction(func): + + @wraps(func) + async def async_wrapper(*args, **kwargs): + _extract(args, kwargs) + return await func(*args, **kwargs) + + return async_wrapper + + @wraps(func) + def sync_wrapper(*args, **kwargs): + _extract(args, kwargs) + return func(*args, **kwargs) + + return sync_wrapper + + return decorator + + +def _get_nested_value(obj: Any, path: str) -> Any: + parts = path.split(".") + value = ( + obj.get(parts[0]) + if isinstance(obj, dict) + else getattr(obj, parts[0], None) + ) + + for part in parts[1:]: + if value is None: + return None + if isinstance(value, dict): + value = value.get(part) + else: + value = getattr(value, part, None) + return value + + +def _add_attributes(kwargs: dict, extractors: dict) -> None: + for attr_name, extractor in extractors.items(): + try: + if callable(extractor): + value = extractor(kwargs) + elif isinstance(extractor, str): + value = _get_nested_value(kwargs, extractor) + else: + value = None + + add_custom_attribute(attr_name, value) + except Exception: + pass # Silent exception + # we don't want to fail if unable to extract an attribute diff --git a/agave/version.py b/agave/version.py index c3b38415..58611c67 100644 --- a/agave/version.py +++ b/agave/version.py @@ -1 +1 @@ -__version__ = '1.5.2' +__version__ = "1.5.3.dev01" diff --git a/examples/tasks/tracing_example.py b/examples/tasks/tracing_example.py new file mode 100644 index 00000000..daecf13c --- /dev/null +++ b/examples/tasks/tracing_example.py @@ -0,0 +1,64 @@ +from pydantic import BaseModel + +from agave.core.tracing import ( + accept_trace_from_queue, + add_custom_attribute, + background_task, + get_trace_headers, + inject_trace_headers, + trace_attributes, +) +from agave.tasks.sqs_tasks import task + +# Esta URL es solo un mock de la queue. +# Debes reemplazarla con la URL de tu queue +QUEUE_URL = "http://127.0.0.1:4000/123456789012/core.fifo" + + +class Abono(BaseModel): + clave_rastreo: str + clave_emisor: str + monto: float + + +async def process_order(order: Abono) -> None: + with background_task(name="process_order", group="Orders"): + add_custom_attribute("order_id", order.clave_rastreo) + print(f"Processing order: {order.clave_rastreo}") + + +async def send_to_external_service(order: Abono) -> None: + headers = get_trace_headers() + print(f"Sending with headers: {headers}") + + +@inject_trace_headers() +async def request( + method: str, endpoint: str, trace_headers: dict | None = None +) -> None: + # trace_headers ya contiene los headers de New Relic + print(f"{method} {endpoint} with headers: {trace_headers}") + + +@task(queue_url=QUEUE_URL, region_name="us-east-1") +@accept_trace_from_queue +async def process_incoming_transaction(transaction: dict) -> None: + print(f"Processing: {transaction}") + + +@trace_attributes( + clave_rastreo="order.clave_rastreo", + clave_emisor="order.clave_emisor", + monto=lambda kw: f"{kw['order'].monto:.2f}", +) +async def handle_order(order: Abono, folio: str) -> None: + print(f"Handling order {folio}: {order.clave_rastreo}") + + +async def process_from_queue(message: dict, trace_headers: dict) -> None: + with background_task( + name="process_from_queue", + group="Queue", + trace_headers=trace_headers, + ): + print(f"Processing message: {message}") diff --git a/requirements-test.txt b/requirements-test.txt index 7038b79c..b85c1583 100644 --- a/requirements-test.txt +++ b/requirements-test.txt @@ -12,3 +12,4 @@ moto[server]==5.0.26 pytest-vcr==1.0.2 pytest-asyncio==0.18.* typing_extensions==4.12.2 +newrelic==11.2.0 diff --git a/setup.py b/setup.py index 590a6c40..3b7f5677 100644 --- a/setup.py +++ b/setup.py @@ -31,6 +31,9 @@ 'chalice': [ 'chalice>=1.30.0,<2.0.0', ], + 'tracing': [ + 'newrelic>=7.0.0,<12.0.0', + ], 'fastapi': [ 'fastapi>=0.115.0,<1.0.0', # TODO: Remove this once we upgrade to starlette: diff --git a/tests/core/test_tracing.py b/tests/core/test_tracing.py new file mode 100644 index 00000000..b8df039e --- /dev/null +++ b/tests/core/test_tracing.py @@ -0,0 +1,531 @@ +import asyncio +import json +from typing import Optional +from unittest.mock import patch + +from agave.core.tracing import ( + _add_attributes, + _get_nested_value, + accept_trace_from_queue, + accept_trace_headers, + add_custom_attribute, + background_task, + get_trace_headers, + inject_trace_headers, + trace_attributes, +) +from agave.tasks.sqs_tasks import task + +from ..utils import CORE_QUEUE_REGION + + +def test_get_trace_headers_returns_dict_with_headers(): + with patch("newrelic.agent") as mock_agent: + mock_agent.insert_distributed_trace_headers.side_effect = ( + lambda h: h.extend([("traceparent", "abc"), ("tracestate", "xyz")]) + ) + result = get_trace_headers() + assert result == {"traceparent": "abc", "tracestate": "xyz"} + + +def test_accept_trace_headers_accepts_headers(): + with patch("newrelic.agent") as mock_agent: + headers = {"traceparent": "abc"} + accept_trace_headers(headers, transport_type="Queue") + mock_fn = mock_agent.accept_distributed_trace_headers + mock_fn.assert_called_once_with(headers, transport_type="Queue") + + +def test_accept_trace_headers_does_nothing_when_no_headers(): + with patch("newrelic.agent") as mock_agent: + accept_trace_headers({}) + mock_agent.accept_distributed_trace_headers.assert_not_called() + + +def test_accept_trace_headers_does_nothing_when_headers_none(): + with patch("newrelic.agent") as mock_agent: + accept_trace_headers(None) + mock_agent.accept_distributed_trace_headers.assert_not_called() + + +def test_add_custom_attribute_adds_attribute(): + with patch("newrelic.agent") as mock_agent: + add_custom_attribute("key", "value") + mock_agent.add_custom_attribute.assert_called_once_with("key", "value") + + +def test_add_custom_attribute_does_nothing_when_value_none(): + with patch("newrelic.agent") as mock_agent: + add_custom_attribute("key", None) + mock_agent.add_custom_attribute.assert_not_called() + + +def test_add_custom_attribute_preserves_native_types(): + with patch("newrelic.agent") as mock_agent: + add_custom_attribute("key", 12345) + mock_agent.add_custom_attribute.assert_called_once_with("key", 12345) + + +def test_accept_trace_from_queue_accepts_headers(): + @accept_trace_from_queue + def my_task(data: dict): + return data + + with patch("agave.core.tracing.accept_trace_headers") as mock_accept: + result = my_task( + {"value": 1}, _nr_trace_headers={"traceparent": "abc"} + ) + assert result == {"value": 1} + mock_accept.assert_called_once_with( + {"traceparent": "abc"}, transport_type="Queue" + ) + + +def test_accept_trace_from_queue_removes_headers_from_kwargs(): + @accept_trace_from_queue + def my_task(**kwargs): + return kwargs + + with patch("agave.core.tracing.accept_trace_headers"): + result = my_task( + data={"value": 1}, _nr_trace_headers={"traceparent": "abc"} + ) + # _nr_trace_headers should be removed + assert "_nr_trace_headers" not in result + assert result == {"data": {"value": 1}} + + +def test_accept_trace_from_queue_works_without_headers(): + @accept_trace_from_queue + def my_task(data: dict): + return data + + with patch("agave.core.tracing.accept_trace_headers") as mock_accept: + result = my_task({"value": 1}) + assert result == {"value": 1} + mock_accept.assert_not_called() + + +def test_accept_trace_from_queue_async_accepts_headers(): + @accept_trace_from_queue + async def my_task(data: dict): + return data + + with patch("agave.core.tracing.accept_trace_headers") as mock_accept: + result = asyncio.run( + my_task({"value": 1}, _nr_trace_headers={"traceparent": "abc"}) + ) + assert result == {"value": 1} + mock_accept.assert_called_once_with( + {"traceparent": "abc"}, transport_type="Queue" + ) + + +def test_accept_trace_from_queue_async_works_without_headers(): + @accept_trace_from_queue + async def my_task(data: dict): + return data + + with patch("agave.core.tracing.accept_trace_headers") as mock_accept: + result = asyncio.run(my_task({"value": 1})) + assert result == {"value": 1} + mock_accept.assert_not_called() + + +def test_inject_trace_headers_async_keyword_arg(): + @inject_trace_headers() + async def my_func(_url: str, trace_headers: Optional[dict] = None): + return trace_headers + + with patch( + "agave.core.tracing.get_trace_headers", + return_value={"traceparent": "abc"}, + ): + result = asyncio.run(my_func("http://example.com")) + assert result == {"traceparent": "abc"} + + +def test_inject_trace_headers_merges_existing_headers_async(): + @inject_trace_headers() + async def my_func(_url: str, trace_headers: Optional[dict] = None): + return trace_headers + + with patch( + "agave.core.tracing.get_trace_headers", + return_value={"traceparent": "abc"}, + ): + result = asyncio.run( + my_func("http://example.com", trace_headers={"custom": "header"}) + ) + assert result == {"custom": "header", "traceparent": "abc"} + + +def test_inject_trace_headers_handles_positional_headers_async(): + """Test that positional args don't cause 'multiple values' error.""" + + @inject_trace_headers("headers") + async def my_func(_url: str, headers: Optional[dict] = None): + return headers + + with patch( + "agave.core.tracing.get_trace_headers", + return_value={"traceparent": "abc"}, + ): + # Pass headers as positional argument + result = asyncio.run( + my_func("http://example.com", {"custom": "header"}) + ) + assert result == {"custom": "header", "traceparent": "abc"} + + +def test_inject_trace_headers_sync_keyword_arg(): + @inject_trace_headers() + def my_func(_url: str, trace_headers: Optional[dict] = None): + return trace_headers + + with patch( + "agave.core.tracing.get_trace_headers", + return_value={"traceparent": "abc"}, + ): + result = my_func("http://example.com") + assert result == {"traceparent": "abc"} + + +def test_inject_trace_headers_merges_existing_headers_sync(): + @inject_trace_headers() + def my_func(_url: str, trace_headers: Optional[dict] = None): + return trace_headers + + with patch( + "agave.core.tracing.get_trace_headers", + return_value={"traceparent": "abc"}, + ): + result = my_func( + "http://example.com", trace_headers={"custom": "header"} + ) + assert result == {"custom": "header", "traceparent": "abc"} + + +def test_inject_trace_headers_handles_positional_headers_sync(): + """Test that positional args don't cause 'multiple values' error.""" + + @inject_trace_headers("headers") + def my_func(_url: str, headers: Optional[dict] = None): + return headers + + with patch( + "agave.core.tracing.get_trace_headers", + return_value={"traceparent": "abc"}, + ): + # Pass headers as positional argument + result = my_func("http://example.com", {"custom": "header"}) + assert result == {"custom": "header", "traceparent": "abc"} + + +def test_inject_trace_headers_custom_param_name(): + @inject_trace_headers("custom_headers") + def my_func(_url: str, custom_headers: Optional[dict] = None): + return custom_headers + + with patch( + "agave.core.tracing.get_trace_headers", + return_value={"traceparent": "abc"}, + ): + result = my_func("http://example.com") + assert result == {"traceparent": "abc"} + + +def test_trace_attributes_extracts_simple_kwarg_async(): + @trace_attributes(my_attr="value") + async def my_func(value: str): + return value + + with patch("agave.core.tracing.add_custom_attribute") as mock_add: + result = asyncio.run(my_func("test_value")) + assert result == "test_value" + mock_add.assert_called_once_with("my_attr", "test_value") + + +def test_trace_attributes_extracts_nested_attr_async(): + class Order: + def __init__(self): + self.clave_emisor = "12345" + + @trace_attributes(emisor="order.clave_emisor") + async def my_func(order: Order): + return order.clave_emisor + + with patch("agave.core.tracing.add_custom_attribute") as mock_add: + result = asyncio.run(my_func(Order())) + assert result == "12345" + mock_add.assert_called_once_with("emisor", "12345") + + +def test_trace_attributes_extracts_with_callable_async(): + @trace_attributes(total=lambda kw: kw["a"] + kw["b"]) + async def my_func(a: int, b: int): + return a + b + + with patch("agave.core.tracing.add_custom_attribute") as mock_add: + result = asyncio.run(my_func(1, 2)) + assert result == 3 + mock_add.assert_called_once_with("total", 3) + + +def test_trace_attributes_extracts_simple_kwarg_sync(): + @trace_attributes(my_attr="value") + def my_func(value: str): + return value + + with patch("agave.core.tracing.add_custom_attribute") as mock_add: + result = my_func("test_value") + assert result == "test_value" + mock_add.assert_called_once_with("my_attr", "test_value") + + +def test_trace_attributes_extracts_nested_attr_sync(): + class Order: + def __init__(self): + self.clave_emisor = "12345" + + @trace_attributes(emisor="order.clave_emisor") + def my_func(order: Order): + return order.clave_emisor + + with patch("agave.core.tracing.add_custom_attribute") as mock_add: + result = my_func(Order()) + assert result == "12345" + mock_add.assert_called_once_with("emisor", "12345") + + +def test_trace_attributes_extracts_with_callable_sync(): + @trace_attributes(total=lambda kw: kw["a"] + kw["b"]) + def my_func(a: int, b: int): + return a + b + + with patch("agave.core.tracing.add_custom_attribute") as mock_add: + result = my_func(1, 2) + assert result == 3 + mock_add.assert_called_once_with("total", 3) + + +def test_trace_attributes_handles_positional_args(): + """Test that positional args are properly bound to param names.""" + + @trace_attributes(x_val="x", y_val="y") + def my_func(x: int, y: int): + return x + y + + with patch("agave.core.tracing.add_custom_attribute") as mock_add: + result = my_func(10, 20) # Both as positional + assert result == 30 + assert mock_add.call_count == 2 + + +def test_trace_attributes_handles_extraction_error_gracefully(): + @trace_attributes(missing="nonexistent.path.deep") + def my_func(value: str): + return value + + with patch("agave.core.tracing.add_custom_attribute") as mock_add: + # Should not raise, just silently fail the attribute + result = my_func("test") + assert result == "test" + mock_add.assert_called_once_with("missing", None) + + +def test_get_nested_value_from_dict(): + obj = {"transaction": {"clave_rastreo": "ABC123"}} + result = _get_nested_value(obj, "transaction.clave_rastreo") + assert result == "ABC123" + + +def test_get_nested_value_from_object(): + class Transaction: + clave_rastreo = "ABC123" + + class Data: + transaction = Transaction() + + result = _get_nested_value(Data(), "transaction.clave_rastreo") + assert result == "ABC123" + + +def test_get_nested_value_returns_none_for_missing_path(): + obj = {"transaction": {}} + result = _get_nested_value(obj, "transaction.missing.deep") + assert result is None + + +def test_get_nested_value_simple_key(): + obj = {"name": "test"} + result = _get_nested_value(obj, "name") + assert result == "test" + + +def test_get_nested_value_mixed_dict_and_object(): + class Inner: + value = "nested" + + obj = {"outer": Inner()} + result = _get_nested_value(obj, "outer.value") + assert result == "nested" + + +def test_add_attributes_with_callable_extractor(): + kwargs = {"a": 1, "b": 2} + extractors = {"sum": lambda kw: kw["a"] + kw["b"]} + + with patch("agave.core.tracing.add_custom_attribute") as mock_add: + _add_attributes(kwargs, extractors) + mock_add.assert_called_once_with("sum", 3) + + +def test_add_attributes_with_string_extractor(): + kwargs = {"name": "test"} + extractors = {"result": "name"} + + with patch("agave.core.tracing.add_custom_attribute") as mock_add: + _add_attributes(kwargs, extractors) + mock_add.assert_called_once_with("result", "test") + + +def test_add_attributes_with_dotted_path_extractor(): + class Obj: + value = "nested" + + kwargs = {"obj": Obj()} + extractors = {"result": "obj.value"} + + with patch("agave.core.tracing.add_custom_attribute") as mock_add: + _add_attributes(kwargs, extractors) + mock_add.assert_called_once_with("result", "nested") + + +def test_add_attributes_handles_exception_gracefully(): + kwargs = {} + extractors = {"broken": lambda kw: kw["missing"]} + + with patch("agave.core.tracing.add_custom_attribute") as mock_add: + # Should not raise + _add_attributes(kwargs, extractors) + mock_add.assert_not_called() + + +def test_add_attributes_with_non_callable_non_string_extractor(): + """Test that non-callable non-string extractors result in None.""" + kwargs = {"name": "test"} + extractors = {"result": 12345} # Not a callable, not a string + + with patch("agave.core.tracing.add_custom_attribute") as mock_add: + _add_attributes(kwargs, extractors) + mock_add.assert_called_once_with("result", None) + + +def test_background_task_creates_transaction(): + """Test that background_task creates a New Relic BackgroundTask.""" + with patch("agave.core.tracing.newrelic.agent") as mock_agent: + mock_agent.application.return_value = "test_app" + mock_agent.BackgroundTask.return_value.__enter__ = lambda s: None + mock_agent.BackgroundTask.return_value.__exit__ = lambda s, *a: None + + with background_task("my_task", "Redis/Queue"): + pass + + mock_agent.BackgroundTask.assert_called_once_with( + application="test_app", + name="my_task", + group="Redis/Queue", + ) + + +def test_background_task_accepts_trace_headers(): + """Test that background_task accepts trace headers inside transaction.""" + with patch("agave.core.tracing.newrelic.agent") as mock_agent: + mock_agent.application.return_value = "test_app" + mock_agent.BackgroundTask.return_value.__enter__ = lambda s: None + mock_agent.BackgroundTask.return_value.__exit__ = lambda s, *a: None + + with patch("agave.core.tracing.accept_trace_headers") as mock_accept: + headers = {"traceparent": "abc"} + with background_task("my_task", "SQS/Queue", headers): + pass + + mock_accept.assert_called_once_with( + headers, transport_type="Queue" + ) + + +def test_background_task_does_not_accept_when_no_headers(): + """Test that background_task skips accept when no trace headers.""" + with patch("agave.core.tracing.newrelic.agent") as mock_agent: + mock_agent.application.return_value = "test_app" + mock_agent.BackgroundTask.return_value.__enter__ = lambda s: None + mock_agent.BackgroundTask.return_value.__exit__ = lambda s, *a: None + + with patch("agave.core.tracing.accept_trace_headers") as mock_accept: + with background_task("my_task", "Task"): + pass + + mock_accept.assert_not_called() + + +async def test_accept_trace_from_queue_with_sqs_task(sqs_client): + @task( + queue_url=sqs_client.queue_url, + region_name=CORE_QUEUE_REGION, + wait_time_seconds=1, + visibility_timeout=1, + ) + @accept_trace_from_queue + async def my_task(data: dict): + return data + + message = {"value": 1, "_nr_trace_headers": {"traceparent": "abc123"}} + await sqs_client.send_message( + MessageBody=json.dumps(message), + MessageGroupId="1234", + ) + + with patch("agave.core.tracing.accept_trace_headers") as mock_accept: + await my_task() + mock_accept.assert_called_once_with( + {"traceparent": "abc123"}, transport_type="Queue" + ) + + +async def test_trace_headers_propagation_through_queue(sqs_client): + received_data = {} + + @task( + queue_url=sqs_client.queue_url, + region_name=CORE_QUEUE_REGION, + wait_time_seconds=1, + visibility_timeout=1, + ) + @accept_trace_from_queue + async def consumer_task(data: dict): + received_data.update(data) + + trace_headers = { + "traceparent": "00-abc123-def456-01", + "tracestate": "nr=xyz", + } + message = { + "order_id": "ORD001", + "_nr_trace_headers": trace_headers, + } + + await sqs_client.send_message( + MessageBody=json.dumps(message), + MessageGroupId="propagation-test", + ) + + with patch("agave.core.tracing.accept_trace_headers") as mock_accept: + await consumer_task() + mock_accept.assert_called_once_with( + trace_headers, transport_type="Queue" + ) + + assert received_data == {"order_id": "ORD001"} + assert "_nr_trace_headers" not in received_data