Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
hex_span_id,
hex_trace_id,
kind_name,
parse_retry_after,
partition_by_identity,
status_name,
truncate_span,
Expand Down Expand Up @@ -79,9 +80,9 @@ def export(self, spans: Sequence[ReadableSpan]) -> SpanExportResult:
logger.info("No spans with tenant/agent identity found; nothing exported.")
return SpanExportResult.SUCCESS

# Debug: Log number of groups and total span count
# Log number of groups and total span count
total_spans = sum(len(activities) for activities in groups.values())
logger.info(
logger.debug(
f"Found {len(groups)} identity groups with {total_spans} total spans to export"
)

Expand All @@ -98,8 +99,8 @@ def export(self, spans: Sequence[ReadableSpan]) -> SpanExportResult:

url = build_export_url(endpoint, agent_id, tenant_id, self._use_s2s_endpoint)

# Debug: Log endpoint being used
logger.info(
# Log endpoint details at DEBUG to avoid leaking IDs in production logs
logger.debug(
f"Exporting {len(activities)} spans to endpoint: {url} "
f"(tenant: {tenant_id}, agent: {agent_id})"
)
Expand All @@ -108,10 +109,16 @@ def export(self, spans: Sequence[ReadableSpan]) -> SpanExportResult:
try:
token = self._token_resolver(agent_id, tenant_id)
if token:
# Warn if sending bearer token over non-HTTPS connection
if not url.lower().startswith("https://"):
logger.warning(
"Bearer token is being sent over a non-HTTPS connection. "
"This may expose credentials in transit."
)
headers["authorization"] = f"Bearer {token}"
logger.info(f"Token resolved successfully for agent {agent_id}")
logger.debug(f"Token resolved successfully for agent {agent_id}")
else:
logger.info(f"No token returned for agent {agent_id}")
logger.debug(f"No token returned for agent {agent_id}")
except Exception as e:
# If token resolution fails, treat as failure for this group
logger.error(
Expand Down Expand Up @@ -174,7 +181,7 @@ def _post_with_retries(self, url: str, body: str, headers: dict[str, str]) -> bo

# 2xx => success
if 200 <= resp.status_code < 300:
logger.info(
logger.debug(
f"HTTP {resp.status_code} success on attempt {attempt + 1}. "
f"Correlation ID: {correlation_id}. "
f"Response: {self._truncate_text(resp.text, 200)}"
Expand All @@ -186,12 +193,19 @@ def _post_with_retries(self, url: str, body: str, headers: dict[str, str]) -> bo

# Retry transient
if resp.status_code in (408, 429) or 500 <= resp.status_code < 600:
# Respect Retry-After header for 429 responses
retry_after = parse_retry_after(resp.headers)
if attempt < DEFAULT_MAX_RETRIES:
time.sleep(0.2 * (attempt + 1))
if retry_after is not None:
time.sleep(min(retry_after, 60.0))
else:
# Exponential backoff with base 0.5s
time.sleep(0.5 * (2**attempt))
continue
# Final attempt failed
logger.error(
f"HTTP {resp.status_code} final failure after {DEFAULT_MAX_RETRIES + 1} attempts. "
f"HTTP {resp.status_code} final failure after "
f"{DEFAULT_MAX_RETRIES + 1} attempts. "
f"Correlation ID: {correlation_id}. "
f"Response: {response_text}"
)
Expand All @@ -206,12 +220,11 @@ def _post_with_retries(self, url: str, body: str, headers: dict[str, str]) -> bo

except requests.RequestException as e:
if attempt < DEFAULT_MAX_RETRIES:
time.sleep(0.2 * (attempt + 1))
# Exponential backoff with base 0.5s
time.sleep(0.5 * (2**attempt))
continue
# Final attempt failed
logger.error(
f"Request failed after {DEFAULT_MAX_RETRIES + 1} attempts with exception: {e}"
)
logger.error(f"Request failed after {DEFAULT_MAX_RETRIES + 1} attempts: {e}")
return False
return False

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT License.

from __future__ import annotations

import json
import logging
import os
Expand Down Expand Up @@ -194,6 +196,13 @@ def get_validated_domain_override() -> str | None:
logger.warning(f"Invalid domain override '{domain_override}': {e}")
return None

# Warn when using insecure HTTP — telemetry data and bearer tokens may be exposed
if domain_override.lower().startswith("http://"):
logger.warning(
"Domain override uses insecure HTTP. Telemetry data (including "
"bearer tokens) will be transmitted in cleartext."
)

return domain_override


Expand Down Expand Up @@ -223,6 +232,31 @@ def build_export_url(
return f"https://{endpoint}{endpoint_path}?api-version=1"


def parse_retry_after(headers: dict[str, str]) -> float | None:
"""Parse the ``Retry-After`` header value.

Only numeric (seconds) values are supported. HTTP-date values
(e.g. ``Wed, 21 Oct 2025 07:28:00 GMT``) are intentionally ignored
and treated as absent, falling back to exponential backoff.

Args:
headers: Response headers mapping.

Returns:
The number of seconds to wait, or ``None`` if the header is
absent, non-numeric, or otherwise invalid.
"""
retry_after = headers.get("Retry-After")
if retry_after is None:
return None
try:
return float(retry_after)
except (ValueError, TypeError):
# Intentionally ignore HTTP-date formatted Retry-After values;
# callers should fall back to exponential backoff.
return None


def is_agent365_exporter_enabled() -> bool:
"""Check if Agent 365 exporter is enabled."""
# Check environment variable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from threading import Lock
from typing import TYPE_CHECKING, Any

from opentelemetry import baggage, context, trace
from opentelemetry import context, trace
from opentelemetry.trace import (
Span,
SpanKind,
Expand Down Expand Up @@ -241,21 +241,6 @@ def set_tag_maybe(self, name: str, value: Any) -> None:
if value is not None and self._span and self._is_telemetry_enabled():
self._span.set_attribute(name, value)

def add_baggage(self, key: str, value: str) -> None:
"""Add baggage to the current context.

Args:
key: The baggage key
value: The baggage value
"""
# Set baggage in the current context
if self._is_telemetry_enabled():
# Set baggage on the current context
# This will be inherited by child spans created within this context
baggage_context = baggage.set_baggage(key, value)
# The context needs to be made current for child spans to inherit the baggage
context.attach(baggage_context)

def record_attributes(self, attributes: dict[str, Any] | list[tuple[str, Any]]) -> None:
"""Record multiple attribute key/value pairs for telemetry tracking.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
class OutputScope(OpenTelemetryScope):
"""Provides OpenTelemetry tracing scope for output messages."""

_MAX_OUTPUT_MESSAGES = 5000

@staticmethod
def start(
agent_details: AgentDetails,
Expand Down Expand Up @@ -82,9 +84,12 @@ def record_output_messages(self, messages: list[str]) -> None:
"""Records the output messages for telemetry tracking.

Appends the provided messages to the accumulated output messages list.
The list is capped at _MAX_OUTPUT_MESSAGES to prevent unbounded memory growth.

Args:
messages: List of output messages to append
"""
self._output_messages.extend(messages)
if len(self._output_messages) > self._MAX_OUTPUT_MESSAGES:
self._output_messages = self._output_messages[-self._MAX_OUTPUT_MESSAGES :]
self.set_tag_maybe(GEN_AI_OUTPUT_MESSAGES_KEY, safe_json_dumps(self._output_messages))
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

import logging
import re
from collections import OrderedDict
from collections.abc import Iterator
from itertools import chain
from threading import RLock
Expand Down Expand Up @@ -69,6 +70,8 @@


class CustomLangChainTracer(BaseTracer):
_MAX_TRACKED_RUNS = 10000

__slots__ = (
"_tracer",
"_separate_trace_from_runtime_context",
Expand Down Expand Up @@ -98,11 +101,18 @@ def __init__(
self.run_map = DictWithLock[str, Run](self.run_map)
self._tracer = tracer
self._separate_trace_from_runtime_context = separate_trace_from_runtime_context
self._spans_by_run: dict[UUID, Span] = DictWithLock[UUID, Span]()
self._spans_by_run: OrderedDict[UUID, Span] = OrderedDict()
self._lock = RLock() # handlers may be run in a thread by langchain

def get_span(self, run_id: UUID) -> Span | None:
return self._spans_by_run.get(run_id)
with self._lock:
return self._spans_by_run.get(run_id)

@staticmethod
def _cap_ordered_dict(d: OrderedDict, max_size: int) -> None:
"""Evict oldest entries from an OrderedDict to stay within max_size."""
while len(d) > max_size:
d.popitem(last=False)

def _start_trace(self, run: Run) -> None:
self.run_map[str(run.id)] = run
Expand Down Expand Up @@ -142,12 +152,14 @@ def _start_trace(self, run: Run) -> None:
# token = context_api.attach(context)
with self._lock:
self._spans_by_run[run.id] = span
self._cap_ordered_dict(self._spans_by_run, self._MAX_TRACKED_RUNS)

def _end_trace(self, run: Run) -> None:
self.run_map.pop(str(run.id), None)
if context_api.get_value(_SUPPRESS_INSTRUMENTATION_KEY):
return
span = self._spans_by_run.pop(run.id, None)
with self._lock:
span = self._spans_by_run.pop(run.id, None)
if span:
try:
_update_span(span, run)
Expand All @@ -162,24 +174,32 @@ def _persist_run(self, run: Run) -> None:
pass

def on_llm_error(self, error: BaseException, *args: Any, run_id: UUID, **kwargs: Any) -> Run:
if span := self._spans_by_run.get(run_id):
with self._lock:
span = self._spans_by_run.get(run_id)
if span:
record_exception(span, error)
return super().on_llm_error(error, *args, run_id=run_id, **kwargs)

def on_chain_error(self, error: BaseException, *args: Any, run_id: UUID, **kwargs: Any) -> Run:
if span := self._spans_by_run.get(run_id):
with self._lock:
span = self._spans_by_run.get(run_id)
if span:
record_exception(span, error)
return super().on_chain_error(error, *args, run_id=run_id, **kwargs)

def on_retriever_error(
self, error: BaseException, *args: Any, run_id: UUID, **kwargs: Any
) -> Run:
if span := self._spans_by_run.get(run_id):
with self._lock:
span = self._spans_by_run.get(run_id)
if span:
record_exception(span, error)
return super().on_retriever_error(error, *args, run_id=run_id, **kwargs)

def on_tool_error(self, error: BaseException, *args: Any, run_id: UUID, **kwargs: Any) -> Run:
if span := self._spans_by_run.get(run_id):
with self._lock:
span = self._spans_by_run.get(run_id)
if span:
record_exception(span, error)
return super().on_tool_error(error, *args, run_id=run_id, **kwargs)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

from __future__ import annotations

import logging
from collections.abc import Callable, Collection
from typing import Any
from uuid import UUID
Expand All @@ -21,6 +22,8 @@

from microsoft_agents_a365.observability.extensions.langchain.tracer import CustomLangChainTracer

logger = logging.getLogger(__name__)

_INSTRUMENTS: str = "langchain_core >= 1.2.0"


Expand Down Expand Up @@ -86,7 +89,7 @@ def _uninstrument(self, **kwargs: Any) -> None:
def get_span(self, run_id: UUID) -> Span | None:
"""Return the span for a specific LangChain run_id, if available."""
if not self._tracer:
print("Missing tracer; call InstrumentorForLangChain().instrument() first.")
logger.warning("Missing tracer; call InstrumentorForLangChain().instrument() first.")
return None
# TraceForLangChain is expected to expose get_span(run_id).
get_span_fn = getattr(self._tracer, "get_span", None)
Expand All @@ -95,7 +98,7 @@ def get_span(self, run_id: UUID) -> Span | None:
def get_ancestors(self, run_id: UUID) -> list[Span]:
"""Return ancestor spans from the run’s parent up to the root (nearest first)."""
if not self._tracer:
print("Missing tracer; call InstrumentorForLangChain().instrument() first.")
logger.warning("Missing tracer; call InstrumentorForLangChain().instrument() first.")
return []

# Expect the processor to keep a run_map with parent linkage (string keys).
Expand Down
Loading
Loading