Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,14 @@
)
from .execute_tool_scope import ExecuteToolScope
from .execution_type import ExecutionType
from .exporters.agent365_exporter_options import Agent365ExporterOptions
from .exporters.enriched_span import EnrichedReadableSpan
from .exporters.enriching_span_processor import (
get_span_enricher,
register_span_enricher,
unregister_span_enricher,
)
from .exporters.spectra_exporter_options import SpectraExporterOptions
from .inference_call_details import InferenceCallDetails, ServiceEndpoint
from .inference_operation_type import InferenceOperationType
from .inference_scope import InferenceScope
Expand All @@ -38,6 +40,9 @@
"is_configured",
"get_tracer",
"get_tracer_provider",
# Exporter options
"Agent365ExporterOptions",
"SpectraExporterOptions",
# Span enrichment
"register_span_enricher",
"unregister_span_enricher",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@
from typing import Any, Optional

from opentelemetry import trace
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import (
OTLPSpanExporter as GrpcOTLPSpanExporter,
)
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.resources import SERVICE_NAME, SERVICE_NAMESPACE, Resource
from opentelemetry.sdk.trace import TracerProvider
Expand All @@ -18,6 +21,7 @@
from .exporters.enriching_span_processor import (
_EnrichingBatchSpanProcessor,
)
from .exporters.spectra_exporter_options import SpectraExporterOptions
from .exporters.utils import is_agent365_exporter_enabled
from .trace_processor.span_processor import SpanProcessor

Expand Down Expand Up @@ -58,7 +62,7 @@ def configure(
logger_name: str = DEFAULT_LOGGER_NAME,
token_resolver: Callable[[str, str], str | None] | None = None,
cluster_category: str = "prod",
exporter_options: Optional[Agent365ExporterOptions] = None,
exporter_options: Agent365ExporterOptions | SpectraExporterOptions | None = None,
suppress_invoke_agent_input: bool = False,
**kwargs: Any,
) -> bool:
Expand All @@ -72,9 +76,10 @@ def configure(
Use exporter_options instead.
:param cluster_category: (Deprecated) Environment / cluster category (e.g. "prod").
Use exporter_options instead.
:param exporter_options: Agent365ExporterOptions instance for configuring the exporter.
If provided, exporter_options takes precedence. If exporter_options is None, the token_resolver and cluster_category parameters are used as fallback/legacy support to construct a default Agent365ExporterOptions instance.
:param suppress_invoke_agent_input: If True, suppress input messages for spans that are children of InvokeAgent spans.
:param exporter_options: Exporter configuration. Pass Agent365ExporterOptions for A365 API
export, SpectraExporterOptions for Spectra Collector sidecar export, or None (default)
to construct Agent365ExporterOptions from legacy parameters.
:param suppress_invoke_agent_input: If True, suppress input messages for InvokeAgent spans.
:return: True if configuration succeeded, False otherwise.
"""
try:
Expand All @@ -100,7 +105,7 @@ def _configure_internal(
logger_name: str,
token_resolver: Callable[[str, str], str | None] | None = None,
cluster_category: str = "prod",
exporter_options: Optional[Agent365ExporterOptions] = None,
exporter_options: Agent365ExporterOptions | SpectraExporterOptions | None = None,
suppress_invoke_agent_input: bool = False,
**kwargs: Any,
) -> bool:
Expand Down Expand Up @@ -156,25 +161,43 @@ def _configure_internal(
"max_export_batch_size": exporter_options.max_export_batch_size,
}

if is_agent365_exporter_enabled() and exporter_options.token_resolver is not None:
# Type-based exporter dispatch
if isinstance(exporter_options, SpectraExporterOptions):
# Spectra path — OTLP exporter to sidecar
# ENABLE_A365_OBSERVABILITY_EXPORTER is intentionally ignored.
if exporter_options.protocol == "grpc":
exporter = GrpcOTLPSpanExporter(
endpoint=exporter_options.endpoint,
insecure=exporter_options.insecure,
)
else:
exporter = OTLPSpanExporter(
endpoint=exporter_options.endpoint,
)

elif is_agent365_exporter_enabled() and exporter_options.token_resolver is not None:
exporter = _Agent365Exporter(
token_resolver=exporter_options.token_resolver,
cluster_category=exporter_options.cluster_category,
use_s2s_endpoint=exporter_options.use_s2s_endpoint,
suppress_invoke_agent_input=suppress_invoke_agent_input,
)

else:
exporter = ConsoleSpanExporter()
self._logger.warning(
"is_agent365_exporter_enabled() not enabled or token_resolver not set. Falling back to console exporter."
"is_agent365_exporter_enabled() not enabled or token_resolver not set."
" Falling back to console exporter."
)

# Add span processors

# Create _EnrichingBatchSpanProcessor with optimized settings
# This allows extensions to enrich spans before export
batch_processor = _EnrichingBatchSpanProcessor(exporter, **batch_processor_kwargs)
batch_processor = _EnrichingBatchSpanProcessor(
exporter,
suppress_invoke_agent_input=suppress_invoke_agent_input,
**batch_processor_kwargs,
)
agent_processor = SpanProcessor()

tracer_provider.add_span_processor(batch_processor)
Expand Down Expand Up @@ -248,7 +271,8 @@ def configure(
logger_name: str = DEFAULT_LOGGER_NAME,
token_resolver: Callable[[str, str], str | None] | None = None,
cluster_category: str = "prod",
exporter_options: Optional[Agent365ExporterOptions] = None,
exporter_options: Agent365ExporterOptions | SpectraExporterOptions | None = None,
suppress_invoke_agent_input: bool = False,
**kwargs: Any,
) -> bool:
"""
Expand All @@ -261,8 +285,10 @@ def configure(
Use exporter_options instead.
:param cluster_category: (Deprecated) Environment / cluster category (e.g. "prod").
Use exporter_options instead.
:param exporter_options: Agent365ExporterOptions instance for configuring the exporter.
If provided, exporter_options takes precedence. If exporter_options is None, the token_resolver and cluster_category parameters are used as fallback/legacy support to construct a default Agent365ExporterOptions instance.
:param exporter_options: Exporter configuration. Pass Agent365ExporterOptions for A365 API
export, SpectraExporterOptions for Spectra Collector sidecar export, or None (default)
to construct Agent365ExporterOptions from legacy parameters.
:param suppress_invoke_agent_input: If True, suppress input messages for InvokeAgent spans.
:return: True if configuration succeeded, False otherwise.
"""
return _telemetry_manager.configure(
Expand All @@ -272,6 +298,7 @@ def configure(
token_resolver,
cluster_category,
exporter_options,
suppress_invoke_agent_input,
**kwargs,
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@
# Licensed under the MIT License.

from .agent365_exporter_options import Agent365ExporterOptions
from .spectra_exporter_options import SpectraExporterOptions

# Agent365Exporter is not exported intentionally.
# It should only be used internally by the observability core module.
__all__ = ["Agent365ExporterOptions"]
__all__ = ["Agent365ExporterOptions", "SpectraExporterOptions"]
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,6 @@
from opentelemetry.sdk.trace.export import SpanExporter, SpanExportResult
from opentelemetry.trace import StatusCode

from ..constants import (
GEN_AI_INPUT_MESSAGES_KEY,
GEN_AI_OPERATION_NAME_KEY,
INVOKE_AGENT_OPERATION_NAME,
)
from .utils import (
build_export_url,
get_validated_domain_override,
Expand Down Expand Up @@ -59,7 +54,6 @@ def __init__(
token_resolver: Callable[[str, str], str | None],
cluster_category: str = "prod",
use_s2s_endpoint: bool = False,
suppress_invoke_agent_input: bool = False,
):
if token_resolver is None:
raise ValueError("token_resolver must be provided.")
Expand All @@ -69,7 +63,6 @@ def __init__(
self._token_resolver = token_resolver
self._cluster_category = cluster_category
self._use_s2s_endpoint = use_s2s_endpoint
self._suppress_invoke_agent_input = suppress_invoke_agent_input
# Read domain override once at initialization
self._domain_override = get_validated_domain_override()

Expand Down Expand Up @@ -270,19 +263,6 @@ def _map_span(self, sp: ReadableSpan) -> dict[str, Any]:
# attributes
attrs = dict(sp.attributes or {})

# Suppress input messages if configured and current span is an InvokeAgent span
if self._suppress_invoke_agent_input:
# Check if current span is an InvokeAgent span by:
# 1. Span name starts with "invoke_agent"
# 2. Has attribute gen_ai.operation.name set to INVOKE_AGENT_OPERATION_NAME
operation_name = attrs.get(GEN_AI_OPERATION_NAME_KEY)
if (
sp.name.startswith(INVOKE_AGENT_OPERATION_NAME)
and operation_name == INVOKE_AGENT_OPERATION_NAME
):
# Remove input messages attribute
attrs.pop(GEN_AI_INPUT_MESSAGES_KEY, None)

# events
events = []
for ev in sp.events:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,22 +19,31 @@ class EnrichedReadableSpan(ReadableSpan):
the original span.
"""

def __init__(self, span: ReadableSpan, extra_attributes: dict):
def __init__(
self,
span: ReadableSpan,
extra_attributes: dict,
excluded_attribute_keys: set[str] | None = None,
):
"""
Initialize the enriched span wrapper.

Args:
span: The original ReadableSpan to wrap.
extra_attributes: Additional attributes to merge with the original.
excluded_attribute_keys: Attribute keys to remove after merging.
"""
self._span = span
self._extra_attributes = extra_attributes
self._excluded_attribute_keys = excluded_attribute_keys or set()

@property
def attributes(self) -> types.Attributes:
"""Return merged attributes from original span and extra attributes."""
original = dict(self._span.attributes or {})
original.update(self._extra_attributes)
for key in self._excluded_attribute_keys:
original.pop(key, None)
return original

@property
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,13 @@
from opentelemetry.sdk.trace import ReadableSpan
from opentelemetry.sdk.trace.export import BatchSpanProcessor

from ..constants import (
GEN_AI_INPUT_MESSAGES_KEY,
GEN_AI_OPERATION_NAME_KEY,
INVOKE_AGENT_OPERATION_NAME,
)
from .enriched_span import EnrichedReadableSpan

logger = logging.getLogger(__name__)

# Single span enricher - only one platform instrumentor should be active at a time
Expand Down Expand Up @@ -65,6 +72,15 @@ def get_span_enricher() -> Callable[[ReadableSpan], ReadableSpan] | None:
class _EnrichingBatchSpanProcessor(BatchSpanProcessor):
"""BatchSpanProcessor that applies the registered enricher before batching."""

def __init__(
self,
*args: object,
suppress_invoke_agent_input: bool = False,
**kwargs: object,
):
super().__init__(*args, **kwargs)
self._suppress_invoke_agent_input = suppress_invoke_agent_input

def on_end(self, span: ReadableSpan) -> None:
"""Apply the span enricher and pass to parent for batching.

Expand All @@ -83,4 +99,18 @@ def on_end(self, span: ReadableSpan) -> None:
enricher.__name__,
)

# Apply input message suppression for InvokeAgent spans
if self._suppress_invoke_agent_input:
attrs = enriched_span.attributes or {}
operation_name = attrs.get(GEN_AI_OPERATION_NAME_KEY)
if (
enriched_span.name.startswith(INVOKE_AGENT_OPERATION_NAME)
and operation_name == INVOKE_AGENT_OPERATION_NAME
):
enriched_span = EnrichedReadableSpan(
enriched_span,
extra_attributes={},
excluded_attribute_keys={GEN_AI_INPUT_MESSAGES_KEY},
)

super().on_end(enriched_span)
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.

from typing import Literal


class SpectraExporterOptions:
"""
Configuration for exporting traces to a Spectra Collector sidecar via OTLP.

Spectra Collector is deployed as a Kubernetes sidecar that accepts
standard OTLP telemetry on localhost. Defaults are tuned for this
deployment topology — most consumers should not need to override them.

Note: Batch processor fields (max_queue_size, scheduled_delay_ms, etc.)
are duplicated from Agent365ExporterOptions intentionally — these two
options classes have no shared base class per design decision C4.
"""

_DEFAULT_GRPC_ENDPOINT = "http://localhost:4317"
_DEFAULT_HTTP_ENDPOINT = "http://localhost:4318"

def __init__(
self,
endpoint: str | None = None,
protocol: Literal["grpc", "http"] = "grpc",
insecure: bool = True,
max_queue_size: int = 2048,
scheduled_delay_ms: int = 5000,
exporter_timeout_ms: int = 30000,
max_export_batch_size: int = 512,
):
"""
Args:
endpoint: Spectra sidecar OTLP endpoint. Defaults to
http://localhost:4317 for gRPC or http://localhost:4318 for HTTP.
protocol: OTLP protocol — "grpc" or "http". Default: grpc.
insecure: Use insecure (no TLS) connection. Default: True (localhost sidecar).
max_queue_size: Batch processor queue size. Default: 2048.
scheduled_delay_ms: Export interval in milliseconds. Default: 5000.
exporter_timeout_ms: Export timeout in milliseconds. Default: 30000.
max_export_batch_size: Max spans per export batch. Default: 512.
"""
if protocol not in ("grpc", "http"):
raise ValueError(f"protocol must be 'grpc' or 'http', got '{protocol}'")
if endpoint is None:
endpoint = (
self._DEFAULT_GRPC_ENDPOINT if protocol == "grpc" else self._DEFAULT_HTTP_ENDPOINT
)
self.endpoint = endpoint
self.protocol = protocol
self.insecure = insecure
self.max_queue_size = max_queue_size
self.scheduled_delay_ms = scheduled_delay_ms
self.exporter_timeout_ms = exporter_timeout_ms
self.max_export_batch_size = max_export_batch_size
2 changes: 1 addition & 1 deletion tests/observability/core/test_agent365.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,11 +115,11 @@ def test_batch_span_processor_and_exporter_called_with_correct_values(
self.assertTrue(result, "configure() should return True")

# Verify Agent365Exporter was called with correct parameters
# (suppress_invoke_agent_input is now handled by _EnrichingBatchSpanProcessor)
mock_exporter.assert_called_once_with(
token_resolver=self.mock_token_resolver,
cluster_category="staging",
use_s2s_endpoint=True,
suppress_invoke_agent_input=False,
)

# Verify BatchSpanProcessor was called with correct parameters from exporter_options
Expand Down
Loading
Loading