From 877a684f4b55ac2fd4ddca9aea49cc50a6ce55e9 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Mon, 16 Mar 2026 15:01:08 +0000 Subject: [PATCH 01/10] Initial plan From 113e17c097b53126963b16166fb6336b7d1094d2 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Mon, 16 Mar 2026 15:20:34 +0000 Subject: [PATCH 02/10] Replace assert statements with explicit TypeError raises in langchain utils Replace all 30 assert statements in utils.py with equivalent if-not-raise TypeError checks. This ensures type validation is not silently stripped when Python runs with -O (optimized mode). Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- .../core/exporters/agent365_exporter.py | 68 ++++++++---- .../observability/core/opentelemetry_scope.py | 16 ++- .../extensions/langchain/utils.py | 104 ++++++++++-------- .../extensions/openai/trace_processor.py | 31 ++++-- .../observability/extensions/openai/utils.py | 4 +- 5 files changed, 145 insertions(+), 78 deletions(-) diff --git a/libraries/microsoft-agents-a365-observability-core/microsoft_agents_a365/observability/core/exporters/agent365_exporter.py b/libraries/microsoft-agents-a365-observability-core/microsoft_agents_a365/observability/core/exporters/agent365_exporter.py index 27b54e23..5cce2dd7 100644 --- a/libraries/microsoft-agents-a365-observability-core/microsoft_agents_a365/observability/core/exporters/agent365_exporter.py +++ b/libraries/microsoft-agents-a365-observability-core/microsoft_agents_a365/observability/core/exporters/agent365_exporter.py @@ -86,9 +86,9 @@ def export(self, spans: Sequence[ReadableSpan]) -> SpanExportResult: logger.info("No spans with tenant/agent identity found; nothing exported.") return SpanExportResult.SUCCESS - # Debug: Log number of groups and total span count + # Log number of groups and total span count total_spans = sum(len(activities) for activities in groups.values()) - logger.info( + logger.debug( f"Found {len(groups)} identity groups with {total_spans} total spans to export" ) @@ -105,8 +105,8 @@ def export(self, spans: Sequence[ReadableSpan]) -> SpanExportResult: url = build_export_url(endpoint, agent_id, tenant_id, self._use_s2s_endpoint) - # Debug: Log endpoint being used - logger.info( + # Log endpoint details at DEBUG to avoid leaking IDs in production logs + logger.debug( f"Exporting {len(activities)} spans to endpoint: {url} " f"(tenant: {tenant_id}, agent: {agent_id})" ) @@ -115,15 +115,19 @@ def export(self, spans: Sequence[ReadableSpan]) -> SpanExportResult: try: token = self._token_resolver(agent_id, tenant_id) if token: + # Warn if sending bearer token over non-HTTPS connection + if not url.lower().startswith("https://"): + logger.warning( + "Bearer token is being sent over a non-HTTPS connection. " + "This may expose credentials in transit." + ) headers["authorization"] = f"Bearer {token}" - logger.info(f"Token resolved successfully for agent {agent_id}") + logger.debug("Token resolved successfully.") else: - logger.info(f"No token returned for agent {agent_id}") + logger.debug("No token returned by resolver.") except Exception as e: # If token resolution fails, treat as failure for this group - logger.error( - f"Token resolution failed for agent {agent_id}, tenant {tenant_id}: {e}" - ) + logger.error(f"Token resolution failed: {type(e).__name__}") any_failure = True continue @@ -162,6 +166,21 @@ def _truncate_text(text: str, max_length: int) -> str: return text[:max_length] + "..." return text + @staticmethod + def _parse_retry_after(resp: requests.Response) -> float | None: + """Parse the Retry-After header from a response. + + Returns: + The number of seconds to wait, or None if the header is absent or invalid. + """ + retry_after = resp.headers.get("Retry-After") + if retry_after is None: + return None + try: + return float(retry_after) + except (ValueError, TypeError): + return None + def _post_with_retries(self, url: str, body: str, headers: dict[str, str]) -> bool: for attempt in range(DEFAULT_MAX_RETRIES + 1): try: @@ -181,43 +200,46 @@ def _post_with_retries(self, url: str, body: str, headers: dict[str, str]) -> bo # 2xx => success if 200 <= resp.status_code < 300: - logger.info( + logger.debug( f"HTTP {resp.status_code} success on attempt {attempt + 1}. " - f"Correlation ID: {correlation_id}. " - f"Response: {self._truncate_text(resp.text, 200)}" + f"Correlation ID: {correlation_id}." ) return True - # Log non-success responses - response_text = self._truncate_text(resp.text, 500) - # Retry transient if resp.status_code in (408, 429) or 500 <= resp.status_code < 600: + # Respect Retry-After header for 429 responses + retry_after = self._parse_retry_after(resp) if attempt < DEFAULT_MAX_RETRIES: - time.sleep(0.2 * (attempt + 1)) + if retry_after is not None: + time.sleep(min(retry_after, 60.0)) + else: + # Exponential backoff with base 0.5s + time.sleep(0.5 * (2**attempt)) continue # Final attempt failed logger.error( - f"HTTP {resp.status_code} final failure after {DEFAULT_MAX_RETRIES + 1} attempts. " - f"Correlation ID: {correlation_id}. " - f"Response: {response_text}" + f"HTTP {resp.status_code} final failure after " + f"{DEFAULT_MAX_RETRIES + 1} attempts. " + f"Correlation ID: {correlation_id}." ) else: # Non-retryable error logger.error( f"HTTP {resp.status_code} non-retryable error. " - f"Correlation ID: {correlation_id}. " - f"Response: {response_text}" + f"Correlation ID: {correlation_id}." ) return False except requests.RequestException as e: if attempt < DEFAULT_MAX_RETRIES: - time.sleep(0.2 * (attempt + 1)) + # Exponential backoff with base 0.5s + time.sleep(0.5 * (2**attempt)) continue # Final attempt failed logger.error( - f"Request failed after {DEFAULT_MAX_RETRIES + 1} attempts with exception: {e}" + f"Request failed after {DEFAULT_MAX_RETRIES + 1} attempts: " + f"{type(e).__name__}" ) return False return False diff --git a/libraries/microsoft-agents-a365-observability-core/microsoft_agents_a365/observability/core/opentelemetry_scope.py b/libraries/microsoft-agents-a365-observability-core/microsoft_agents_a365/observability/core/opentelemetry_scope.py index 93255439..cf4e2814 100644 --- a/libraries/microsoft-agents-a365-observability-core/microsoft_agents_a365/observability/core/opentelemetry_scope.py +++ b/libraries/microsoft-agents-a365-observability-core/microsoft_agents_a365/observability/core/opentelemetry_scope.py @@ -126,6 +126,7 @@ def __init__( self._error_type: str | None = None self._exception: Exception | None = None self._context_token = None + self._baggage_tokens: list[object] = [] if self._is_telemetry_enabled(): tracer = self._get_tracer() @@ -244,6 +245,12 @@ def set_tag_maybe(self, name: str, value: Any) -> None: def add_baggage(self, key: str, value: str) -> None: """Add baggage to the current context. + .. warning:: + This method attaches a new context that cannot be detached. Prefer using + :class:`~microsoft_agents_a365.observability.core.middleware.baggage_builder.BaggageBuilder` + with its context-manager API (``with builder.build(): ...``) which properly + restores the previous context on exit. + Args: key: The baggage key value: The baggage value @@ -254,7 +261,9 @@ def add_baggage(self, key: str, value: str) -> None: # This will be inherited by child spans created within this context baggage_context = baggage.set_baggage(key, value) # The context needs to be made current for child spans to inherit the baggage - context.attach(baggage_context) + token = context.attach(baggage_context) + # Store the token so it can be detached when the scope ends + self._baggage_tokens.append(token) def record_attributes(self, attributes: dict[str, Any] | list[tuple[str, Any]]) -> None: """Record multiple attribute key/value pairs for telemetry tracking. @@ -294,6 +303,11 @@ def _end(self) -> None: span_id = f"{self._span.context.span_id:016x}" if self._span.context else "unknown" logger.info(f"Span ended: '{self._span.name}' ({span_id})") + # Detach any baggage tokens in reverse order + for token in reversed(self._baggage_tokens): + context.detach(token) + self._baggage_tokens.clear() + # Convert custom end time to OTel-compatible format (nanoseconds since epoch) otel_end_time = self._datetime_to_ns(self._custom_end_time) if otel_end_time is not None: diff --git a/libraries/microsoft-agents-a365-observability-extensions-langchain/microsoft_agents_a365/observability/extensions/langchain/utils.py b/libraries/microsoft-agents-a365-observability-extensions-langchain/microsoft_agents_a365/observability/extensions/langchain/utils.py index b081e44e..871b1fb1 100644 --- a/libraries/microsoft-agents-a365-observability-extensions-langchain/microsoft_agents_a365/observability/extensions/langchain/utils.py +++ b/libraries/microsoft-agents-a365-observability-extensions-langchain/microsoft_agents_a365/observability/extensions/langchain/utils.py @@ -53,7 +53,8 @@ def prompts(inputs: Mapping[str, Any] | None) -> Iterator[tuple[str, list[str]]] """Yields prompts if present.""" if not inputs: return - assert hasattr(inputs, "get"), f"expected Mapping, found {type(inputs)}" + if not hasattr(inputs, "get"): + raise TypeError(f"expected Mapping, found {type(inputs)}") if prompts := inputs.get("prompts"): yield GEN_AI_SYSTEM_INSTRUCTIONS_KEY, prompts @@ -62,17 +63,21 @@ def prompts(inputs: Mapping[str, Any] | None) -> Iterator[tuple[str, list[str]]] def _extract_message_kwargs(message_data: Mapping[str, Any] | None) -> Iterator[[str, Any]]: if not message_data: return - assert hasattr(message_data, "get"), f"expected Mapping, found {type(message_data)}" + if not hasattr(message_data, "get"): + raise TypeError(f"expected Mapping, found {type(message_data)}") if kwargs := message_data.get("kwargs"): - assert hasattr(kwargs, "get"), f"expected Mapping, found {type(kwargs)}" + if not hasattr(kwargs, "get"): + raise TypeError(f"expected Mapping, found {type(kwargs)}") if content := kwargs.get("content"): # Just yield as-is (string or list) yield "message.content", content if tool_call_id := kwargs.get("tool_call_id"): - assert isinstance(tool_call_id, str), f"expected str, found {type(tool_call_id)}" + if not isinstance(tool_call_id, str): + raise TypeError(f"expected str, found {type(tool_call_id)}") yield GEN_AI_TOOL_CALL_ID_KEY, tool_call_id if name := kwargs.get("name"): - assert isinstance(name, str), f"expected str, found {type(name)}" + if not isinstance(name, str): + raise TypeError(f"expected str, found {type(name)}") yield "message.name", name @@ -82,19 +87,20 @@ def _extract_message_additional_kwargs( ) -> Iterator[tuple[str, Any]]: if not message_data: return - assert hasattr(message_data, "get"), f"expected Mapping, found {type(message_data)}" + if not hasattr(message_data, "get"): + raise TypeError(f"expected Mapping, found {type(message_data)}") if kwargs := message_data.get("kwargs"): - assert hasattr(kwargs, "get"), f"expected Mapping, found {type(kwargs)}" + if not hasattr(kwargs, "get"): + raise TypeError(f"expected Mapping, found {type(kwargs)}") if additional_kwargs := kwargs.get("additional_kwargs"): - assert hasattr(additional_kwargs, "get"), ( - f"expected Mapping, found {type(additional_kwargs)}" - ) + if not hasattr(additional_kwargs, "get"): + raise TypeError(f"expected Mapping, found {type(additional_kwargs)}") if function_call := additional_kwargs.get("function_call"): - assert hasattr(function_call, "get"), ( - f"expected Mapping, found {type(function_call)}" - ) + if not hasattr(function_call, "get"): + raise TypeError(f"expected Mapping, found {type(function_call)}") if name := function_call.get("name"): - assert isinstance(name, str), f"expected str, found {type(name)}" + if not isinstance(name, str): + raise TypeError(f"expected str, found {type(name)}") yield GEN_AI_TOOL_NAME_KEY, name if arguments := function_call.get("arguments"): if isinstance(arguments, str): @@ -107,7 +113,8 @@ def _extract_message_additional_kwargs( def _get_tool_call(tool_call: Mapping[str, Any] | None) -> Iterator[tuple[str, Any]]: if not tool_call: return - assert hasattr(tool_call, "get"), f"expected Mapping, found {type(tool_call)}" + if not hasattr(tool_call, "get"): + raise TypeError(f"expected Mapping, found {type(tool_call)}") # id id_ = tool_call.get("id") @@ -127,7 +134,8 @@ def _get_tool_call(tool_call: Mapping[str, Any] | None) -> Iterator[tuple[str, A # name if name is not None: - assert isinstance(name, str), f"expected str, found {type(name)}" + if not isinstance(name, str): + raise TypeError(f"expected str, found {type(name)}") yield GEN_AI_TOOL_NAME_KEY, name # arguments -> always emit a JSON string @@ -143,7 +151,8 @@ def _process_tool_calls(tool_calls: Any) -> str: """Return all tool calls as a single compact string (JSON-joined), or '' if none.""" if not tool_calls: return "" - assert isinstance(tool_calls, Iterable), f"expected Iterable, found {type(tool_calls)}" + if not isinstance(tool_calls, Iterable): + raise TypeError(f"expected Iterable, found {type(tool_calls)}") parts: list[str] = [] for tool_call in tool_calls: @@ -161,7 +170,8 @@ def _extract_message_tool_calls( ) -> Iterator[tuple[str, str]]: if not message_data: return - assert hasattr(message_data, "get"), f"expected Mapping, found {type(message_data)}" + if not hasattr(message_data, "get"): + raise TypeError(f"expected Mapping, found {type(message_data)}") # Collect tool_calls from multiple possible locations all_tool_calls: list[str] = [] @@ -178,13 +188,13 @@ def collect(calls: Any) -> None: collect(message_data.get("tool_calls")) if kwargs := message_data.get("kwargs"): - assert hasattr(kwargs, "get"), f"expected Mapping, found {type(kwargs)}" + if not hasattr(kwargs, "get"): + raise TypeError(f"expected Mapping, found {type(kwargs)}") collect(kwargs.get("tool_calls")) if additional_kwargs := kwargs.get("additional_kwargs"): - assert hasattr(additional_kwargs, "get"), ( - f"expected Mapping, found {type(additional_kwargs)}" - ) + if not hasattr(additional_kwargs, "get"): + raise TypeError(f"expected Mapping, found {type(additional_kwargs)}") collect(additional_kwargs.get("tool_calls")) if all_tool_calls: @@ -207,13 +217,13 @@ def input_messages( """Yields chat messages as a JSON array of content strings.""" if not inputs: return - assert hasattr(inputs, "get"), f"expected Mapping, found {type(inputs)}" + if not hasattr(inputs, "get"): + raise TypeError(f"expected Mapping, found {type(inputs)}") # There may be more than one set of messages. We'll use just the first set. if not (multiple_messages := inputs.get("messages")): return - assert isinstance(multiple_messages, Iterable), ( - f"expected Iterable, found {type(multiple_messages)}" - ) + if not isinstance(multiple_messages, Iterable): + raise TypeError(f"expected Iterable, found {type(multiple_messages)}") # This will only get the first set of messages. if not (first_messages := next(iter(multiple_messages), None)): return @@ -249,7 +259,8 @@ def metadata(run: Run) -> Iterator[tuple[str, str]]: """ if not run.extra or not (metadata := run.extra.get("metadata")): return - assert isinstance(metadata, Mapping), f"expected Mapping, found {type(metadata)}" + if not isinstance(metadata, Mapping): + raise TypeError(f"expected Mapping, found {type(metadata)}") if session_id := ( metadata.get(LANGCHAIN_SESSION_ID) or metadata.get(LANGCHAIN_CONVERSATION_ID) @@ -265,7 +276,8 @@ def output_messages( """Yields chat messages as a JSON array of content strings.""" if not outputs: return - assert hasattr(outputs, "get"), f"expected Mapping, found {type(outputs)}" + if not hasattr(outputs, "get"): + raise TypeError(f"expected Mapping, found {type(outputs)}") output_type = outputs.get("type") if output_type and output_type.lower() == "llmresult": llm_output = outputs.get("llm_output") @@ -276,18 +288,17 @@ def output_messages( # There may be more than one set of generations. We'll use just the first set. if not (multiple_generations := outputs.get("generations")): return - assert isinstance(multiple_generations, Iterable), ( - f"expected Iterable, found {type(multiple_generations)}" - ) + if not isinstance(multiple_generations, Iterable): + raise TypeError(f"expected Iterable, found {type(multiple_generations)}") # This will only get the first set of generations. if not (first_generations := next(iter(multiple_generations), None)): return - assert isinstance(first_generations, Iterable), ( - f"expected Iterable, found {type(first_generations)}" - ) + if not isinstance(first_generations, Iterable): + raise TypeError(f"expected Iterable, found {type(first_generations)}") contents: list[str] = [] for generation in first_generations: - assert hasattr(generation, "get"), f"expected Mapping, found {type(generation)}" + if not hasattr(generation, "get"): + raise TypeError(f"expected Mapping, found {type(generation)}") if message_data := generation.get("message"): if isinstance(message_data, BaseMessage): if hasattr(message_data, "content") and message_data.content: @@ -309,11 +320,11 @@ def invocation_parameters(run: Run) -> Iterator[tuple[str, str]]: return if not (extra := run.extra): return - assert hasattr(extra, "get"), f"expected Mapping, found {type(extra)}" + if not hasattr(extra, "get"): + raise TypeError(f"expected Mapping, found {type(extra)}") if invocation_parameters := extra.get("invocation_params"): - assert isinstance(invocation_parameters, Mapping), ( - f"expected Mapping, found {type(invocation_parameters)}" - ) + if not isinstance(invocation_parameters, Mapping): + raise TypeError(f"expected Mapping, found {type(invocation_parameters)}") tools = invocation_parameters.get("tools", []) for idx, tool in enumerate(tools): yield f"{GEN_AI_TOOL_ARGS_KEY}.{idx}", safe_json_dumps(tool) @@ -346,7 +357,8 @@ def model_name( return if not extra: return - assert hasattr(extra, "get"), f"expected Mapping, found {type(extra)}" + if not hasattr(extra, "get"): + raise TypeError(f"expected Mapping, found {type(extra)}") if ( (metadata := extra.get("metadata")) and hasattr(metadata, "get") @@ -451,7 +463,8 @@ def function_calls(outputs: Mapping[str, Any] | None) -> Iterator[tuple[str, str """ if not outputs: return - assert hasattr(outputs, "get"), f"expected Mapping, found {type(outputs)}" + if not hasattr(outputs, "get"): + raise TypeError(f"expected Mapping, found {type(outputs)}") try: # Typical OpenAI LangChain shape: @@ -506,7 +519,8 @@ def tools(run: Run) -> Iterator[tuple[str, str]]: return if not (serialized := run.serialized): return - assert hasattr(serialized, "get"), f"expected Mapping, found {type(serialized)}" + if not hasattr(serialized, "get"): + raise TypeError(f"expected Mapping, found {type(serialized)}") yield GEN_AI_TOOL_TYPE_KEY, "extension" if name := serialized.get("name"): yield GEN_AI_TOOL_NAME_KEY, name @@ -618,7 +632,8 @@ def invoke_agent_input_message( if not inputs: return - assert hasattr(inputs, "get"), f"expected Mapping, found {type(inputs)}" + if not hasattr(inputs, "get"): + raise TypeError(f"expected Mapping, found {type(inputs)}") messages = inputs.get("messages") if not messages: @@ -659,7 +674,8 @@ def invoke_agent_output_message( if not outputs: return - assert hasattr(outputs, "get"), f"expected Mapping, found {type(outputs)}" + if not hasattr(outputs, "get"): + raise TypeError(f"expected Mapping, found {type(outputs)}") messages = outputs.get("messages") if not messages: diff --git a/libraries/microsoft-agents-a365-observability-extensions-openai/microsoft_agents_a365/observability/extensions/openai/trace_processor.py b/libraries/microsoft-agents-a365-observability-extensions-openai/microsoft_agents_a365/observability/extensions/openai/trace_processor.py index 777d3f0d..2df88190 100644 --- a/libraries/microsoft-agents-a365-observability-extensions-openai/microsoft_agents_a365/observability/extensions/openai/trace_processor.py +++ b/libraries/microsoft-agents-a365-observability-extensions-openai/microsoft_agents_a365/observability/extensions/openai/trace_processor.py @@ -78,24 +78,25 @@ class OpenAIAgentsTraceProcessor(TracingProcessor): _MAX_HANDOFFS_IN_FLIGHT = 1000 _MAX_PENDING_TOOL_CALLS = 1000 + _MAX_TRACKED_SPANS = 10000 def __init__(self, tracer: Tracer) -> None: self._tracer = tracer - self._root_spans: dict[str, OtelSpan] = {} - self._otel_spans: dict[str, OtelSpan] = {} - self._tokens: dict[str, object] = {} + self._root_spans: OrderedDict[str, OtelSpan] = OrderedDict() + self._otel_spans: OrderedDict[str, OtelSpan] = OrderedDict() + self._tokens: OrderedDict[str, object] = OrderedDict() # This captures in flight handoff. Once the handoff is complete, the entry is deleted # If the handoff does not complete, the entry stays in the dict. # Use an OrderedDict and _MAX_HANDOFFS_IN_FLIGHT to cap the size of the dict # in case there are large numbers of orphaned handoffs self._reverse_handoffs_dict: OrderedDict[str, str] = OrderedDict() # Track input/output messages for agent spans (keyed by agent span_id) - self._agent_inputs: dict[str, str] = {} - self._agent_outputs: dict[str, str] = {} + self._agent_inputs: OrderedDict[str, str] = OrderedDict() + self._agent_outputs: OrderedDict[str, str] = OrderedDict() # Track agent span IDs to find nearest ancestor - self._agent_span_ids: set[str] = set() + self._agent_span_ids: OrderedDict[str, None] = OrderedDict() # Track parent-child relationships: child_span_id -> parent_span_id - self._span_parents: dict[str, str] = {} + self._span_parents: OrderedDict[str, str] = OrderedDict() # Track tool_call_ids from GenerationSpan: (function_name, trace_id) -> call_id # Use an OrderedDict and _MAX_PENDING_TOOL_CALLS to cap the size of the dict # in case tool calls are captured but never consumed @@ -110,6 +111,12 @@ def _stamp_custom_parent(self, otel_span: OtelSpan, trace_id: str) -> None: pid_hex = "0x" + ot_trace.format_span_id(sc.span_id) otel_span.set_attribute(CUSTOM_PARENT_SPAN_ID_KEY, pid_hex) + @staticmethod + def _cap_ordered_dict(d: OrderedDict, max_size: int) -> None: + """Evict oldest entries from an OrderedDict to stay within max_size.""" + while len(d) > max_size: + d.popitem(last=False) + def on_trace_start(self, trace: Trace) -> None: """Called when a trace is started. @@ -153,13 +160,17 @@ def on_span_start(self, span: Span[Any]) -> None: }, ) self._otel_spans[span.span_id] = otel_span + self._cap_ordered_dict(self._otel_spans, self._MAX_TRACKED_SPANS) self._tokens[span.span_id] = attach(set_span_in_context(otel_span)) + self._cap_ordered_dict(self._tokens, self._MAX_TRACKED_SPANS) # Track parent-child relationship for ancestor lookup if span.parent_id: self._span_parents[span.span_id] = span.parent_id + self._cap_ordered_dict(self._span_parents, self._MAX_TRACKED_SPANS) # Track AgentSpan IDs if isinstance(span.span_data, AgentSpanData): - self._agent_span_ids.add(span.span_id) + self._agent_span_ids[span.span_id] = None + self._cap_ordered_dict(self._agent_span_ids, self._MAX_TRACKED_SPANS) def on_span_end(self, span: Span[Any]) -> None: """Called when a span is finished. Should not block or raise exceptions. @@ -202,8 +213,10 @@ def on_span_end(self, span: Span[Any]) -> None: ): if data.input: capture_input_message(agent_span_id, data.input, self._agent_inputs) + self._cap_ordered_dict(self._agent_inputs, self._MAX_TRACKED_SPANS) if data.output: capture_output_message(agent_span_id, data.output, self._agent_outputs) + self._cap_ordered_dict(self._agent_outputs, self._MAX_TRACKED_SPANS) # Capture tool_call_ids for later use by FunctionSpan if data.output: capture_tool_call_ids( @@ -246,7 +259,7 @@ def on_span_end(self, span: Span[Any]) -> None: otel_span.set_attribute(GEN_AI_OUTPUT_MESSAGES_KEY, output_msg) otel_span.update_name(f"{INVOKE_AGENT_OPERATION_NAME} {get_span_name(span)}") # Clean up tracking - self._agent_span_ids.discard(span.span_id) + self._agent_span_ids.pop(span.span_id, None) end_time: int | None = None if span.ended_at: diff --git a/libraries/microsoft-agents-a365-observability-extensions-openai/microsoft_agents_a365/observability/extensions/openai/utils.py b/libraries/microsoft-agents-a365-observability-extensions-openai/microsoft_agents_a365/observability/extensions/openai/utils.py index e01043e8..f32af728 100644 --- a/libraries/microsoft-agents-a365-observability-extensions-openai/microsoft_agents_a365/observability/extensions/openai/utils.py +++ b/libraries/microsoft-agents-a365-observability-extensions-openai/microsoft_agents_a365/observability/extensions/openai/utils.py @@ -611,7 +611,9 @@ def capture_output_message( def find_ancestor_agent_span_id( - span_id: str | None, agent_span_ids: set[str], span_parents: dict[str, str] + span_id: str | None, + agent_span_ids: set[str] | Mapping[str, object], + span_parents: Mapping[str, str], ) -> str | None: """Walk up the parent chain to find the nearest ancestor AgentSpan.""" current = span_id From b93e81c93c54fd7e1ceb3745adab94cfb6af02dd Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Mon, 16 Mar 2026 15:24:16 +0000 Subject: [PATCH 03/10] security: fix sensitive data logging, context leak, unbounded memory, asserts, and more - Fix #1: Downgrade sensitive data logging from INFO to DEBUG in agent365_exporter.py - Fix #2: Fix unpaired context.attach() in opentelemetry_scope.py add_baggage() by storing and detaching baggage tokens on scope end - Fix #3: Add bounded OrderedDict caps to unbounded dicts in OpenAI trace_processor.py - Fix #4: Replace 30 assert statements with proper TypeError raises in LangChain utils.py - Fix #5: Log security warning when HTTP domain override is detected - Fix #6: Warn when bearer token sent over non-HTTPS connection - Fix #10: Respect Retry-After header and use exponential backoff in retries - Fix #13: Rename reset() to _reset() in ObservabilityHostingManager - Fix #15: Replace print() with logger.warning() in LangChain tracer_instrumentor.py Co-authored-by: nikhilNava <211831449+nikhilNava@users.noreply.github.com> --- .../core/exporters/agent365_exporter.py | 3 +-- .../observability/core/exporters/utils.py | 7 +++++ .../langchain/tracer_instrumentor.py | 7 +++-- .../observability_hosting_manager.py | 2 +- .../core/test_agent365_exporter.py | 26 +++++++++---------- .../test_observability_hosting_manager.py | 4 +-- 6 files changed, 29 insertions(+), 20 deletions(-) diff --git a/libraries/microsoft-agents-a365-observability-core/microsoft_agents_a365/observability/core/exporters/agent365_exporter.py b/libraries/microsoft-agents-a365-observability-core/microsoft_agents_a365/observability/core/exporters/agent365_exporter.py index 5cce2dd7..0d5e54c5 100644 --- a/libraries/microsoft-agents-a365-observability-core/microsoft_agents_a365/observability/core/exporters/agent365_exporter.py +++ b/libraries/microsoft-agents-a365-observability-core/microsoft_agents_a365/observability/core/exporters/agent365_exporter.py @@ -238,8 +238,7 @@ def _post_with_retries(self, url: str, body: str, headers: dict[str, str]) -> bo continue # Final attempt failed logger.error( - f"Request failed after {DEFAULT_MAX_RETRIES + 1} attempts: " - f"{type(e).__name__}" + f"Request failed after {DEFAULT_MAX_RETRIES + 1} attempts: {type(e).__name__}" ) return False return False diff --git a/libraries/microsoft-agents-a365-observability-core/microsoft_agents_a365/observability/core/exporters/utils.py b/libraries/microsoft-agents-a365-observability-core/microsoft_agents_a365/observability/core/exporters/utils.py index 21261c9e..eef34a00 100644 --- a/libraries/microsoft-agents-a365-observability-core/microsoft_agents_a365/observability/core/exporters/utils.py +++ b/libraries/microsoft-agents-a365-observability-core/microsoft_agents_a365/observability/core/exporters/utils.py @@ -194,6 +194,13 @@ def get_validated_domain_override() -> str | None: logger.warning(f"Invalid domain override '{domain_override}': {e}") return None + # Warn when using insecure HTTP — telemetry data and bearer tokens may be exposed + if domain_override.lower().startswith("http://"): + logger.warning( + "Domain override uses insecure HTTP. Telemetry data (including " + "bearer tokens) will be transmitted in cleartext." + ) + return domain_override diff --git a/libraries/microsoft-agents-a365-observability-extensions-langchain/microsoft_agents_a365/observability/extensions/langchain/tracer_instrumentor.py b/libraries/microsoft-agents-a365-observability-extensions-langchain/microsoft_agents_a365/observability/extensions/langchain/tracer_instrumentor.py index 0b723bba..948ee59c 100644 --- a/libraries/microsoft-agents-a365-observability-extensions-langchain/microsoft_agents_a365/observability/extensions/langchain/tracer_instrumentor.py +++ b/libraries/microsoft-agents-a365-observability-extensions-langchain/microsoft_agents_a365/observability/extensions/langchain/tracer_instrumentor.py @@ -3,6 +3,7 @@ from __future__ import annotations +import logging from collections.abc import Callable, Collection from typing import Any from uuid import UUID @@ -21,6 +22,8 @@ from microsoft_agents_a365.observability.extensions.langchain.tracer import CustomLangChainTracer +logger = logging.getLogger(__name__) + _INSTRUMENTS: str = "langchain_core >= 1.2.0" @@ -86,7 +89,7 @@ def _uninstrument(self, **kwargs: Any) -> None: def get_span(self, run_id: UUID) -> Span | None: """Return the span for a specific LangChain run_id, if available.""" if not self._tracer: - print("Missing tracer; call InstrumentorForLangChain().instrument() first.") + logger.warning("Missing tracer; call InstrumentorForLangChain().instrument() first.") return None # TraceForLangChain is expected to expose get_span(run_id). get_span_fn = getattr(self._tracer, "get_span", None) @@ -95,7 +98,7 @@ def get_span(self, run_id: UUID) -> Span | None: def get_ancestors(self, run_id: UUID) -> list[Span]: """Return ancestor spans from the run’s parent up to the root (nearest first).""" if not self._tracer: - print("Missing tracer; call InstrumentorForLangChain().instrument() first.") + logger.warning("Missing tracer; call InstrumentorForLangChain().instrument() first.") return [] # Expect the processor to keep a run_map with parent linkage (string keys). diff --git a/libraries/microsoft-agents-a365-observability-hosting/microsoft_agents_a365/observability/hosting/middleware/observability_hosting_manager.py b/libraries/microsoft-agents-a365-observability-hosting/microsoft_agents_a365/observability/hosting/middleware/observability_hosting_manager.py index 54f56487..6c2a2c55 100644 --- a/libraries/microsoft-agents-a365-observability-hosting/microsoft_agents_a365/observability/hosting/middleware/observability_hosting_manager.py +++ b/libraries/microsoft-agents-a365-observability-hosting/microsoft_agents_a365/observability/hosting/middleware/observability_hosting_manager.py @@ -96,6 +96,6 @@ def configure( return instance @classmethod - def reset(cls) -> None: + def _reset(cls) -> None: """Reset the singleton instance. Intended for testing only.""" cls._instance = None diff --git a/tests/observability/core/test_agent365_exporter.py b/tests/observability/core/test_agent365_exporter.py index c44cb31c..5973098a 100644 --- a/tests/observability/core/test_agent365_exporter.py +++ b/tests/observability/core/test_agent365_exporter.py @@ -313,25 +313,25 @@ def test_export_logging(self, mock_logger): self.assertEqual(result, SpanExportResult.SUCCESS) # Verify logging calls - should use default endpoint URL - expected_log_calls = [ - # Should log groups found - unittest.mock.call.info("Found 1 identity groups with 2 total spans to export"), - # Should log endpoint being used (default endpoint) - unittest.mock.call.info( + expected_debug_calls = [ + # Should log groups found at DEBUG + unittest.mock.call.debug("Found 1 identity groups with 2 total spans to export"), + # Should log endpoint being used at DEBUG (default endpoint) + unittest.mock.call.debug( f"Exporting 2 spans to endpoint: {DEFAULT_ENDPOINT_URL}/observability/tenants/test-tenant-123/agents/test-agent-456/traces?api-version=1 " "(tenant: test-tenant-123, agent: test-agent-456)" ), - # Should log token resolution success - unittest.mock.call.info("Token resolved successfully for agent test-agent-456"), - # Should log HTTP success - unittest.mock.call.info( - "HTTP 200 success on attempt 1. Correlation ID: test-correlation-123. Response: success" + # Should log token resolution success at DEBUG + unittest.mock.call.debug("Token resolved successfully."), + # Should log HTTP success at DEBUG + unittest.mock.call.debug( + "HTTP 200 success on attempt 1. Correlation ID: test-correlation-123." ), ] - # Check that all expected info calls were made - for expected_call in expected_log_calls: - self.assertIn(expected_call, mock_logger.info.call_args_list) + # Check that all expected debug calls were made + for expected_call in expected_debug_calls: + self.assertIn(expected_call, mock_logger.debug.call_args_list) @patch("microsoft_agents_a365.observability.core.exporters.agent365_exporter.logger") def test_export_error_logging(self, mock_logger): diff --git a/tests/observability/hosting/middleware/test_observability_hosting_manager.py b/tests/observability/hosting/middleware/test_observability_hosting_manager.py index 3b00bf90..a3423211 100644 --- a/tests/observability/hosting/middleware/test_observability_hosting_manager.py +++ b/tests/observability/hosting/middleware/test_observability_hosting_manager.py @@ -19,9 +19,9 @@ @pytest.fixture(autouse=True) def _reset_singleton(): """Reset the singleton before and after each test.""" - ObservabilityHostingManager.reset() + ObservabilityHostingManager._reset() yield - ObservabilityHostingManager.reset() + ObservabilityHostingManager._reset() def test_configure_is_singleton(): From 5c39fb200f2a6199d2bc02f3ad5f1dfe321d915e Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Mon, 16 Mar 2026 16:39:23 +0000 Subject: [PATCH 04/10] Restore agent/tenant IDs and response text in exporter log messages Agent IDs and tenant IDs are not sensitive data and are useful for debugging. Restore them in debug/error log messages. Also restore truncated response text in HTTP error logs to help developers debug failures. Log levels remain at DEBUG (from the prior security fix). Co-authored-by: nikhilNava <211831449+nikhilNava@users.noreply.github.com> --- .../core/exporters/agent365_exporter.py | 21 +++++++++++++------ .../core/test_agent365_exporter.py | 7 +++++-- 2 files changed, 20 insertions(+), 8 deletions(-) diff --git a/libraries/microsoft-agents-a365-observability-core/microsoft_agents_a365/observability/core/exporters/agent365_exporter.py b/libraries/microsoft-agents-a365-observability-core/microsoft_agents_a365/observability/core/exporters/agent365_exporter.py index cdf6b014..117fd94d 100644 --- a/libraries/microsoft-agents-a365-observability-core/microsoft_agents_a365/observability/core/exporters/agent365_exporter.py +++ b/libraries/microsoft-agents-a365-observability-core/microsoft_agents_a365/observability/core/exporters/agent365_exporter.py @@ -115,12 +115,15 @@ def export(self, spans: Sequence[ReadableSpan]) -> SpanExportResult: "This may expose credentials in transit." ) headers["authorization"] = f"Bearer {token}" - logger.debug("Token resolved successfully.") + logger.debug(f"Token resolved successfully for agent {agent_id}") else: - logger.debug("No token returned by resolver.") + logger.debug(f"No token returned for agent {agent_id}") except Exception as e: # If token resolution fails, treat as failure for this group - logger.error(f"Token resolution failed: {type(e).__name__}") + logger.error( + f"Token resolution failed for agent {agent_id}, " + f"tenant {tenant_id}: {type(e).__name__}" + ) any_failure = True continue @@ -195,10 +198,14 @@ def _post_with_retries(self, url: str, body: str, headers: dict[str, str]) -> bo if 200 <= resp.status_code < 300: logger.debug( f"HTTP {resp.status_code} success on attempt {attempt + 1}. " - f"Correlation ID: {correlation_id}." + f"Correlation ID: {correlation_id}. " + f"Response: {self._truncate_text(resp.text, 200)}" ) return True + # Log non-success responses + response_text = self._truncate_text(resp.text, 500) + # Retry transient if resp.status_code in (408, 429) or 500 <= resp.status_code < 600: # Respect Retry-After header for 429 responses @@ -214,13 +221,15 @@ def _post_with_retries(self, url: str, body: str, headers: dict[str, str]) -> bo logger.error( f"HTTP {resp.status_code} final failure after " f"{DEFAULT_MAX_RETRIES + 1} attempts. " - f"Correlation ID: {correlation_id}." + f"Correlation ID: {correlation_id}. " + f"Response: {response_text}" ) else: # Non-retryable error logger.error( f"HTTP {resp.status_code} non-retryable error. " - f"Correlation ID: {correlation_id}." + f"Correlation ID: {correlation_id}. " + f"Response: {response_text}" ) return False diff --git a/tests/observability/core/test_agent365_exporter.py b/tests/observability/core/test_agent365_exporter.py index 5973098a..26bfe73e 100644 --- a/tests/observability/core/test_agent365_exporter.py +++ b/tests/observability/core/test_agent365_exporter.py @@ -322,10 +322,13 @@ def test_export_logging(self, mock_logger): "(tenant: test-tenant-123, agent: test-agent-456)" ), # Should log token resolution success at DEBUG - unittest.mock.call.debug("Token resolved successfully."), + unittest.mock.call.debug( + "Token resolved successfully for agent test-agent-456" + ), # Should log HTTP success at DEBUG unittest.mock.call.debug( - "HTTP 200 success on attempt 1. Correlation ID: test-correlation-123." + "HTTP 200 success on attempt 1. " + "Correlation ID: test-correlation-123. Response: success" ), ] From aa5acefc0b3ba8cea02252508d64f4682ec46eaa Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Mon, 16 Mar 2026 16:40:53 +0000 Subject: [PATCH 05/10] Remove add_baggage() from OpenTelemetryScope The method had an unpaired context.attach() that leaked context tokens. Users should use BaggageBuilder.build() context manager instead, which properly restores the previous context on exit. Co-authored-by: nikhilNava <211831449+nikhilNava@users.noreply.github.com> --- .../observability/core/opentelemetry_scope.py | 31 +------------------ 1 file changed, 1 insertion(+), 30 deletions(-) diff --git a/libraries/microsoft-agents-a365-observability-core/microsoft_agents_a365/observability/core/opentelemetry_scope.py b/libraries/microsoft-agents-a365-observability-core/microsoft_agents_a365/observability/core/opentelemetry_scope.py index cf4e2814..c9ee1387 100644 --- a/libraries/microsoft-agents-a365-observability-core/microsoft_agents_a365/observability/core/opentelemetry_scope.py +++ b/libraries/microsoft-agents-a365-observability-core/microsoft_agents_a365/observability/core/opentelemetry_scope.py @@ -9,7 +9,7 @@ from threading import Lock from typing import TYPE_CHECKING, Any -from opentelemetry import baggage, context, trace +from opentelemetry import context, trace from opentelemetry.trace import ( Span, SpanKind, @@ -126,7 +126,6 @@ def __init__( self._error_type: str | None = None self._exception: Exception | None = None self._context_token = None - self._baggage_tokens: list[object] = [] if self._is_telemetry_enabled(): tracer = self._get_tracer() @@ -242,29 +241,6 @@ def set_tag_maybe(self, name: str, value: Any) -> None: if value is not None and self._span and self._is_telemetry_enabled(): self._span.set_attribute(name, value) - def add_baggage(self, key: str, value: str) -> None: - """Add baggage to the current context. - - .. warning:: - This method attaches a new context that cannot be detached. Prefer using - :class:`~microsoft_agents_a365.observability.core.middleware.baggage_builder.BaggageBuilder` - with its context-manager API (``with builder.build(): ...``) which properly - restores the previous context on exit. - - Args: - key: The baggage key - value: The baggage value - """ - # Set baggage in the current context - if self._is_telemetry_enabled(): - # Set baggage on the current context - # This will be inherited by child spans created within this context - baggage_context = baggage.set_baggage(key, value) - # The context needs to be made current for child spans to inherit the baggage - token = context.attach(baggage_context) - # Store the token so it can be detached when the scope ends - self._baggage_tokens.append(token) - def record_attributes(self, attributes: dict[str, Any] | list[tuple[str, Any]]) -> None: """Record multiple attribute key/value pairs for telemetry tracking. @@ -303,11 +279,6 @@ def _end(self) -> None: span_id = f"{self._span.context.span_id:016x}" if self._span.context else "unknown" logger.info(f"Span ended: '{self._span.name}' ({span_id})") - # Detach any baggage tokens in reverse order - for token in reversed(self._baggage_tokens): - context.detach(token) - self._baggage_tokens.clear() - # Convert custom end time to OTel-compatible format (nanoseconds since epoch) otel_end_time = self._datetime_to_ns(self._custom_end_time) if otel_end_time is not None: From d62338f794a6739215ea76e3d5aa6fc223795490 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Mon, 16 Mar 2026 16:49:07 +0000 Subject: [PATCH 06/10] Security hardening for observability packages Co-authored-by: nikhilNava <211831449+nikhilNava@users.noreply.github.com> --- .../extensions/langchain/utils.py | 87 +++++++++---------- 1 file changed, 41 insertions(+), 46 deletions(-) diff --git a/libraries/microsoft-agents-a365-observability-extensions-langchain/microsoft_agents_a365/observability/extensions/langchain/utils.py b/libraries/microsoft-agents-a365-observability-extensions-langchain/microsoft_agents_a365/observability/extensions/langchain/utils.py index 871b1fb1..2b1af000 100644 --- a/libraries/microsoft-agents-a365-observability-extensions-langchain/microsoft_agents_a365/observability/extensions/langchain/utils.py +++ b/libraries/microsoft-agents-a365-observability-extensions-langchain/microsoft_agents_a365/observability/extensions/langchain/utils.py @@ -53,8 +53,8 @@ def prompts(inputs: Mapping[str, Any] | None) -> Iterator[tuple[str, list[str]]] """Yields prompts if present.""" if not inputs: return - if not hasattr(inputs, "get"): - raise TypeError(f"expected Mapping, found {type(inputs)}") + if not isinstance(inputs, Mapping): + return if prompts := inputs.get("prompts"): yield GEN_AI_SYSTEM_INSTRUCTIONS_KEY, prompts @@ -63,22 +63,20 @@ def prompts(inputs: Mapping[str, Any] | None) -> Iterator[tuple[str, list[str]]] def _extract_message_kwargs(message_data: Mapping[str, Any] | None) -> Iterator[[str, Any]]: if not message_data: return - if not hasattr(message_data, "get"): - raise TypeError(f"expected Mapping, found {type(message_data)}") + if not isinstance(message_data, Mapping): + return if kwargs := message_data.get("kwargs"): - if not hasattr(kwargs, "get"): - raise TypeError(f"expected Mapping, found {type(kwargs)}") + if not isinstance(kwargs, Mapping): + return if content := kwargs.get("content"): # Just yield as-is (string or list) yield "message.content", content if tool_call_id := kwargs.get("tool_call_id"): - if not isinstance(tool_call_id, str): - raise TypeError(f"expected str, found {type(tool_call_id)}") - yield GEN_AI_TOOL_CALL_ID_KEY, tool_call_id + if isinstance(tool_call_id, str): + yield GEN_AI_TOOL_CALL_ID_KEY, tool_call_id if name := kwargs.get("name"): - if not isinstance(name, str): - raise TypeError(f"expected str, found {type(name)}") - yield "message.name", name + if isinstance(name, str): + yield "message.name", name @stop_on_exception @@ -87,21 +85,20 @@ def _extract_message_additional_kwargs( ) -> Iterator[tuple[str, Any]]: if not message_data: return - if not hasattr(message_data, "get"): - raise TypeError(f"expected Mapping, found {type(message_data)}") + if not isinstance(message_data, Mapping): + return if kwargs := message_data.get("kwargs"): - if not hasattr(kwargs, "get"): - raise TypeError(f"expected Mapping, found {type(kwargs)}") + if not isinstance(kwargs, Mapping): + return if additional_kwargs := kwargs.get("additional_kwargs"): - if not hasattr(additional_kwargs, "get"): - raise TypeError(f"expected Mapping, found {type(additional_kwargs)}") + if not isinstance(additional_kwargs, Mapping): + return if function_call := additional_kwargs.get("function_call"): - if not hasattr(function_call, "get"): - raise TypeError(f"expected Mapping, found {type(function_call)}") + if not isinstance(function_call, Mapping): + return if name := function_call.get("name"): - if not isinstance(name, str): - raise TypeError(f"expected str, found {type(name)}") - yield GEN_AI_TOOL_NAME_KEY, name + if isinstance(name, str): + yield GEN_AI_TOOL_NAME_KEY, name if arguments := function_call.get("arguments"): if isinstance(arguments, str): yield GEN_AI_TOOL_ARGS_KEY, arguments @@ -113,8 +110,8 @@ def _extract_message_additional_kwargs( def _get_tool_call(tool_call: Mapping[str, Any] | None) -> Iterator[tuple[str, Any]]: if not tool_call: return - if not hasattr(tool_call, "get"): - raise TypeError(f"expected Mapping, found {type(tool_call)}") + if not isinstance(tool_call, Mapping): + return # id id_ = tool_call.get("id") @@ -125,7 +122,7 @@ def _get_tool_call(tool_call: Mapping[str, Any] | None) -> Iterator[tuple[str, A name = None arguments = None - if hasattr(fn, "get"): + if isinstance(fn, Mapping): name = fn.get("name") arguments = fn.get("arguments") else: @@ -133,9 +130,7 @@ def _get_tool_call(tool_call: Mapping[str, Any] | None) -> Iterator[tuple[str, A arguments = tool_call.get("args") # name - if name is not None: - if not isinstance(name, str): - raise TypeError(f"expected str, found {type(name)}") + if name is not None and isinstance(name, str): yield GEN_AI_TOOL_NAME_KEY, name # arguments -> always emit a JSON string @@ -152,7 +147,7 @@ def _process_tool_calls(tool_calls: Any) -> str: if not tool_calls: return "" if not isinstance(tool_calls, Iterable): - raise TypeError(f"expected Iterable, found {type(tool_calls)}") + return "" parts: list[str] = [] for tool_call in tool_calls: @@ -170,8 +165,8 @@ def _extract_message_tool_calls( ) -> Iterator[tuple[str, str]]: if not message_data: return - if not hasattr(message_data, "get"): - raise TypeError(f"expected Mapping, found {type(message_data)}") + if not isinstance(message_data, Mapping): + return # Collect tool_calls from multiple possible locations all_tool_calls: list[str] = [] @@ -188,13 +183,13 @@ def collect(calls: Any) -> None: collect(message_data.get("tool_calls")) if kwargs := message_data.get("kwargs"): - if not hasattr(kwargs, "get"): - raise TypeError(f"expected Mapping, found {type(kwargs)}") + if not isinstance(kwargs, Mapping): + return collect(kwargs.get("tool_calls")) if additional_kwargs := kwargs.get("additional_kwargs"): - if not hasattr(additional_kwargs, "get"): - raise TypeError(f"expected Mapping, found {type(additional_kwargs)}") + if not isinstance(additional_kwargs, Mapping): + return collect(additional_kwargs.get("tool_calls")) if all_tool_calls: @@ -217,13 +212,13 @@ def input_messages( """Yields chat messages as a JSON array of content strings.""" if not inputs: return - if not hasattr(inputs, "get"): - raise TypeError(f"expected Mapping, found {type(inputs)}") + if not isinstance(inputs, Mapping): + return # There may be more than one set of messages. We'll use just the first set. if not (multiple_messages := inputs.get("messages")): return if not isinstance(multiple_messages, Iterable): - raise TypeError(f"expected Iterable, found {type(multiple_messages)}") + return # This will only get the first set of messages. if not (first_messages := next(iter(multiple_messages), None)): return @@ -260,7 +255,7 @@ def metadata(run: Run) -> Iterator[tuple[str, str]]: if not run.extra or not (metadata := run.extra.get("metadata")): return if not isinstance(metadata, Mapping): - raise TypeError(f"expected Mapping, found {type(metadata)}") + return if session_id := ( metadata.get(LANGCHAIN_SESSION_ID) or metadata.get(LANGCHAIN_CONVERSATION_ID) @@ -276,8 +271,8 @@ def output_messages( """Yields chat messages as a JSON array of content strings.""" if not outputs: return - if not hasattr(outputs, "get"): - raise TypeError(f"expected Mapping, found {type(outputs)}") + if not isinstance(outputs, Mapping): + return output_type = outputs.get("type") if output_type and output_type.lower() == "llmresult": llm_output = outputs.get("llm_output") @@ -289,16 +284,16 @@ def output_messages( if not (multiple_generations := outputs.get("generations")): return if not isinstance(multiple_generations, Iterable): - raise TypeError(f"expected Iterable, found {type(multiple_generations)}") + return # This will only get the first set of generations. if not (first_generations := next(iter(multiple_generations), None)): return if not isinstance(first_generations, Iterable): - raise TypeError(f"expected Iterable, found {type(first_generations)}") + return contents: list[str] = [] for generation in first_generations: - if not hasattr(generation, "get"): - raise TypeError(f"expected Mapping, found {type(generation)}") + if not isinstance(generation, Mapping): + continue if message_data := generation.get("message"): if isinstance(message_data, BaseMessage): if hasattr(message_data, "content") and message_data.content: From 596cc6a6bbedfe39f79e9c85745f7f5bd3850768 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Mon, 16 Mar 2026 17:31:12 +0000 Subject: [PATCH 07/10] Fix formatting in test_agent365_exporter.py and replace remaining raise TypeError with isinstance guards in langchain utils.py Co-authored-by: nikhilNava <211831449+nikhilNava@users.noreply.github.com> --- .../extensions/langchain/utils.py | 26 +++++++++---------- .../core/test_agent365_exporter.py | 4 +-- 2 files changed, 14 insertions(+), 16 deletions(-) diff --git a/libraries/microsoft-agents-a365-observability-extensions-langchain/microsoft_agents_a365/observability/extensions/langchain/utils.py b/libraries/microsoft-agents-a365-observability-extensions-langchain/microsoft_agents_a365/observability/extensions/langchain/utils.py index 2b1af000..72fe66d2 100644 --- a/libraries/microsoft-agents-a365-observability-extensions-langchain/microsoft_agents_a365/observability/extensions/langchain/utils.py +++ b/libraries/microsoft-agents-a365-observability-extensions-langchain/microsoft_agents_a365/observability/extensions/langchain/utils.py @@ -315,11 +315,11 @@ def invocation_parameters(run: Run) -> Iterator[tuple[str, str]]: return if not (extra := run.extra): return - if not hasattr(extra, "get"): - raise TypeError(f"expected Mapping, found {type(extra)}") + if not isinstance(extra, Mapping): + return if invocation_parameters := extra.get("invocation_params"): if not isinstance(invocation_parameters, Mapping): - raise TypeError(f"expected Mapping, found {type(invocation_parameters)}") + return tools = invocation_parameters.get("tools", []) for idx, tool in enumerate(tools): yield f"{GEN_AI_TOOL_ARGS_KEY}.{idx}", safe_json_dumps(tool) @@ -352,8 +352,8 @@ def model_name( return if not extra: return - if not hasattr(extra, "get"): - raise TypeError(f"expected Mapping, found {type(extra)}") + if not isinstance(extra, Mapping): + return if ( (metadata := extra.get("metadata")) and hasattr(metadata, "get") @@ -458,8 +458,8 @@ def function_calls(outputs: Mapping[str, Any] | None) -> Iterator[tuple[str, str """ if not outputs: return - if not hasattr(outputs, "get"): - raise TypeError(f"expected Mapping, found {type(outputs)}") + if not isinstance(outputs, Mapping): + return try: # Typical OpenAI LangChain shape: @@ -514,8 +514,8 @@ def tools(run: Run) -> Iterator[tuple[str, str]]: return if not (serialized := run.serialized): return - if not hasattr(serialized, "get"): - raise TypeError(f"expected Mapping, found {type(serialized)}") + if not isinstance(serialized, Mapping): + return yield GEN_AI_TOOL_TYPE_KEY, "extension" if name := serialized.get("name"): yield GEN_AI_TOOL_NAME_KEY, name @@ -627,8 +627,8 @@ def invoke_agent_input_message( if not inputs: return - if not hasattr(inputs, "get"): - raise TypeError(f"expected Mapping, found {type(inputs)}") + if not isinstance(inputs, Mapping): + return messages = inputs.get("messages") if not messages: @@ -669,8 +669,8 @@ def invoke_agent_output_message( if not outputs: return - if not hasattr(outputs, "get"): - raise TypeError(f"expected Mapping, found {type(outputs)}") + if not isinstance(outputs, Mapping): + return messages = outputs.get("messages") if not messages: diff --git a/tests/observability/core/test_agent365_exporter.py b/tests/observability/core/test_agent365_exporter.py index 26bfe73e..8cc6fc91 100644 --- a/tests/observability/core/test_agent365_exporter.py +++ b/tests/observability/core/test_agent365_exporter.py @@ -322,9 +322,7 @@ def test_export_logging(self, mock_logger): "(tenant: test-tenant-123, agent: test-agent-456)" ), # Should log token resolution success at DEBUG - unittest.mock.call.debug( - "Token resolved successfully for agent test-agent-456" - ), + unittest.mock.call.debug("Token resolved successfully for agent test-agent-456"), # Should log HTTP success at DEBUG unittest.mock.call.debug( "HTTP 200 success on attempt 1. " From 84be10fd8dbc92c212e54942262f48b0610becd5 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Mon, 16 Mar 2026 18:39:32 +0000 Subject: [PATCH 08/10] Move _parse_retry_after to exporters/utils.py as standalone parse_retry_after function Co-authored-by: nikhilNava <211831449+nikhilNava@users.noreply.github.com> --- .../core/exporters/agent365_exporter.py | 18 ++----------- .../observability/core/exporters/utils.py | 27 +++++++++++++++++++ 2 files changed, 29 insertions(+), 16 deletions(-) diff --git a/libraries/microsoft-agents-a365-observability-core/microsoft_agents_a365/observability/core/exporters/agent365_exporter.py b/libraries/microsoft-agents-a365-observability-core/microsoft_agents_a365/observability/core/exporters/agent365_exporter.py index 117fd94d..8e6f6dd6 100644 --- a/libraries/microsoft-agents-a365-observability-core/microsoft_agents_a365/observability/core/exporters/agent365_exporter.py +++ b/libraries/microsoft-agents-a365-observability-core/microsoft_agents_a365/observability/core/exporters/agent365_exporter.py @@ -23,6 +23,7 @@ hex_span_id, hex_trace_id, kind_name, + parse_retry_after, partition_by_identity, status_name, truncate_span, @@ -162,21 +163,6 @@ def _truncate_text(text: str, max_length: int) -> str: return text[:max_length] + "..." return text - @staticmethod - def _parse_retry_after(resp: requests.Response) -> float | None: - """Parse the Retry-After header from a response. - - Returns: - The number of seconds to wait, or None if the header is absent or invalid. - """ - retry_after = resp.headers.get("Retry-After") - if retry_after is None: - return None - try: - return float(retry_after) - except (ValueError, TypeError): - return None - def _post_with_retries(self, url: str, body: str, headers: dict[str, str]) -> bool: for attempt in range(DEFAULT_MAX_RETRIES + 1): try: @@ -209,7 +195,7 @@ def _post_with_retries(self, url: str, body: str, headers: dict[str, str]) -> bo # Retry transient if resp.status_code in (408, 429) or 500 <= resp.status_code < 600: # Respect Retry-After header for 429 responses - retry_after = self._parse_retry_after(resp) + retry_after = parse_retry_after(resp.headers) if attempt < DEFAULT_MAX_RETRIES: if retry_after is not None: time.sleep(min(retry_after, 60.0)) diff --git a/libraries/microsoft-agents-a365-observability-core/microsoft_agents_a365/observability/core/exporters/utils.py b/libraries/microsoft-agents-a365-observability-core/microsoft_agents_a365/observability/core/exporters/utils.py index eef34a00..d48bd1ab 100644 --- a/libraries/microsoft-agents-a365-observability-core/microsoft_agents_a365/observability/core/exporters/utils.py +++ b/libraries/microsoft-agents-a365-observability-core/microsoft_agents_a365/observability/core/exporters/utils.py @@ -1,6 +1,8 @@ # Copyright (c) Microsoft Corporation. # Licensed under the MIT License. +from __future__ import annotations + import json import logging import os @@ -230,6 +232,31 @@ def build_export_url( return f"https://{endpoint}{endpoint_path}?api-version=1" +def parse_retry_after(headers: dict[str, str]) -> float | None: + """Parse the ``Retry-After`` header value. + + Only numeric (seconds) values are supported. HTTP-date values + (e.g. ``Wed, 21 Oct 2025 07:28:00 GMT``) are intentionally ignored + and treated as absent, falling back to exponential backoff. + + Args: + headers: Response headers mapping. + + Returns: + The number of seconds to wait, or ``None`` if the header is + absent, non-numeric, or otherwise invalid. + """ + retry_after = headers.get("Retry-After") + if retry_after is None: + return None + try: + return float(retry_after) + except (ValueError, TypeError): + # Intentionally ignore HTTP-date formatted Retry-After values; + # callers should fall back to exponential backoff. + return None + + def is_agent365_exporter_enabled() -> bool: """Check if Agent 365 exporter is enabled.""" # Check environment variable From c08b24f54e9463203236aa2dac3dff43aa5663ad Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Tue, 17 Mar 2026 16:19:07 +0000 Subject: [PATCH 09/10] Replace type(e).__name__ with str(e) in exporter error logging per PR review Co-authored-by: nikhilNava <211831449+nikhilNava@users.noreply.github.com> --- .../observability/core/exporters/agent365_exporter.py | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/libraries/microsoft-agents-a365-observability-core/microsoft_agents_a365/observability/core/exporters/agent365_exporter.py b/libraries/microsoft-agents-a365-observability-core/microsoft_agents_a365/observability/core/exporters/agent365_exporter.py index 8e6f6dd6..5920dec3 100644 --- a/libraries/microsoft-agents-a365-observability-core/microsoft_agents_a365/observability/core/exporters/agent365_exporter.py +++ b/libraries/microsoft-agents-a365-observability-core/microsoft_agents_a365/observability/core/exporters/agent365_exporter.py @@ -122,8 +122,7 @@ def export(self, spans: Sequence[ReadableSpan]) -> SpanExportResult: except Exception as e: # If token resolution fails, treat as failure for this group logger.error( - f"Token resolution failed for agent {agent_id}, " - f"tenant {tenant_id}: {type(e).__name__}" + f"Token resolution failed for agent {agent_id}, tenant {tenant_id}: {e}" ) any_failure = True continue @@ -225,9 +224,7 @@ def _post_with_retries(self, url: str, body: str, headers: dict[str, str]) -> bo time.sleep(0.5 * (2**attempt)) continue # Final attempt failed - logger.error( - f"Request failed after {DEFAULT_MAX_RETRIES + 1} attempts: {type(e).__name__}" - ) + logger.error(f"Request failed after {DEFAULT_MAX_RETRIES + 1} attempts: {e}") return False return False From f02e68c496b33ffec25ed11334ee33d9006f0b02 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Tue, 17 Mar 2026 19:13:58 +0000 Subject: [PATCH 10/10] feat: add bounded collections for LangChain tracer and OutputScope - Convert LangChain _spans_by_run from unbounded DictWithLock to bounded OrderedDict with _MAX_TRACKED_RUNS=10000 cap - Add _cap_ordered_dict helper for FIFO eviction (matching OpenAI pattern) - Add thread-safe lock usage for _spans_by_run in error handlers - Add _MAX_OUTPUT_MESSAGES=5000 cap for OutputScope._output_messages - Add unit tests for both bounded collections Co-authored-by: nikhilNava <211831449+nikhilNava@users.noreply.github.com> --- .../core/spans_scopes/output_scope.py | 5 + .../extensions/langchain/tracer.py | 34 ++++-- .../core/test_output_scope_bounded.py | 100 ++++++++++++++++ .../langchain/test_tracer_bounded.py | 109 ++++++++++++++++++ 4 files changed, 241 insertions(+), 7 deletions(-) create mode 100644 tests/observability/core/test_output_scope_bounded.py create mode 100644 tests/observability/extensions/langchain/test_tracer_bounded.py diff --git a/libraries/microsoft-agents-a365-observability-core/microsoft_agents_a365/observability/core/spans_scopes/output_scope.py b/libraries/microsoft-agents-a365-observability-core/microsoft_agents_a365/observability/core/spans_scopes/output_scope.py index 873a9dc9..8f128adf 100644 --- a/libraries/microsoft-agents-a365-observability-core/microsoft_agents_a365/observability/core/spans_scopes/output_scope.py +++ b/libraries/microsoft-agents-a365-observability-core/microsoft_agents_a365/observability/core/spans_scopes/output_scope.py @@ -16,6 +16,8 @@ class OutputScope(OpenTelemetryScope): """Provides OpenTelemetry tracing scope for output messages.""" + _MAX_OUTPUT_MESSAGES = 5000 + @staticmethod def start( agent_details: AgentDetails, @@ -82,9 +84,12 @@ def record_output_messages(self, messages: list[str]) -> None: """Records the output messages for telemetry tracking. Appends the provided messages to the accumulated output messages list. + The list is capped at _MAX_OUTPUT_MESSAGES to prevent unbounded memory growth. Args: messages: List of output messages to append """ self._output_messages.extend(messages) + if len(self._output_messages) > self._MAX_OUTPUT_MESSAGES: + self._output_messages = self._output_messages[-self._MAX_OUTPUT_MESSAGES :] self.set_tag_maybe(GEN_AI_OUTPUT_MESSAGES_KEY, safe_json_dumps(self._output_messages)) diff --git a/libraries/microsoft-agents-a365-observability-extensions-langchain/microsoft_agents_a365/observability/extensions/langchain/tracer.py b/libraries/microsoft-agents-a365-observability-extensions-langchain/microsoft_agents_a365/observability/extensions/langchain/tracer.py index 3204912b..b5cb3dcd 100644 --- a/libraries/microsoft-agents-a365-observability-extensions-langchain/microsoft_agents_a365/observability/extensions/langchain/tracer.py +++ b/libraries/microsoft-agents-a365-observability-extensions-langchain/microsoft_agents_a365/observability/extensions/langchain/tracer.py @@ -3,6 +3,7 @@ import logging import re +from collections import OrderedDict from collections.abc import Iterator from itertools import chain from threading import RLock @@ -69,6 +70,8 @@ class CustomLangChainTracer(BaseTracer): + _MAX_TRACKED_RUNS = 10000 + __slots__ = ( "_tracer", "_separate_trace_from_runtime_context", @@ -98,11 +101,18 @@ def __init__( self.run_map = DictWithLock[str, Run](self.run_map) self._tracer = tracer self._separate_trace_from_runtime_context = separate_trace_from_runtime_context - self._spans_by_run: dict[UUID, Span] = DictWithLock[UUID, Span]() + self._spans_by_run: OrderedDict[UUID, Span] = OrderedDict() self._lock = RLock() # handlers may be run in a thread by langchain def get_span(self, run_id: UUID) -> Span | None: - return self._spans_by_run.get(run_id) + with self._lock: + return self._spans_by_run.get(run_id) + + @staticmethod + def _cap_ordered_dict(d: OrderedDict, max_size: int) -> None: + """Evict oldest entries from an OrderedDict to stay within max_size.""" + while len(d) > max_size: + d.popitem(last=False) def _start_trace(self, run: Run) -> None: self.run_map[str(run.id)] = run @@ -142,12 +152,14 @@ def _start_trace(self, run: Run) -> None: # token = context_api.attach(context) with self._lock: self._spans_by_run[run.id] = span + self._cap_ordered_dict(self._spans_by_run, self._MAX_TRACKED_RUNS) def _end_trace(self, run: Run) -> None: self.run_map.pop(str(run.id), None) if context_api.get_value(_SUPPRESS_INSTRUMENTATION_KEY): return - span = self._spans_by_run.pop(run.id, None) + with self._lock: + span = self._spans_by_run.pop(run.id, None) if span: try: _update_span(span, run) @@ -162,24 +174,32 @@ def _persist_run(self, run: Run) -> None: pass def on_llm_error(self, error: BaseException, *args: Any, run_id: UUID, **kwargs: Any) -> Run: - if span := self._spans_by_run.get(run_id): + with self._lock: + span = self._spans_by_run.get(run_id) + if span: record_exception(span, error) return super().on_llm_error(error, *args, run_id=run_id, **kwargs) def on_chain_error(self, error: BaseException, *args: Any, run_id: UUID, **kwargs: Any) -> Run: - if span := self._spans_by_run.get(run_id): + with self._lock: + span = self._spans_by_run.get(run_id) + if span: record_exception(span, error) return super().on_chain_error(error, *args, run_id=run_id, **kwargs) def on_retriever_error( self, error: BaseException, *args: Any, run_id: UUID, **kwargs: Any ) -> Run: - if span := self._spans_by_run.get(run_id): + with self._lock: + span = self._spans_by_run.get(run_id) + if span: record_exception(span, error) return super().on_retriever_error(error, *args, run_id=run_id, **kwargs) def on_tool_error(self, error: BaseException, *args: Any, run_id: UUID, **kwargs: Any) -> Run: - if span := self._spans_by_run.get(run_id): + with self._lock: + span = self._spans_by_run.get(run_id) + if span: record_exception(span, error) return super().on_tool_error(error, *args, run_id=run_id, **kwargs) diff --git a/tests/observability/core/test_output_scope_bounded.py b/tests/observability/core/test_output_scope_bounded.py new file mode 100644 index 00000000..deaa4fae --- /dev/null +++ b/tests/observability/core/test_output_scope_bounded.py @@ -0,0 +1,100 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +"""Tests for bounded output messages in OutputScope.""" + +import unittest +from unittest.mock import MagicMock, patch + +from microsoft_agents_a365.observability.core.spans_scopes.output_scope import OutputScope + + +class TestOutputScopeBounded(unittest.TestCase): + """Tests that OutputScope._output_messages list is properly bounded.""" + + def _make_scope(self, initial_messages: list[str] | None = None) -> OutputScope: + """Create an OutputScope with mocked dependencies.""" + agent_details = MagicMock() + agent_details.agent_id = "test-agent" + agent_details.agent_name = "Test Agent" + agent_details.agent_description = None + agent_details.platform_id = None + agent_details.conversation_id = None + agent_details.icon_uri = None + agent_details.agent_auid = None + agent_details.agent_upn = None + agent_details.agent_blueprint_id = None + + tenant_details = MagicMock() + tenant_details.tenant_id = "test-tenant" + + response = MagicMock() + response.messages = initial_messages or ["hello"] + + with patch.object(OutputScope, "__init__", lambda self, *a, **kw: None): + scope = OutputScope.__new__(OutputScope) + scope._output_messages = list(response.messages) + scope.set_tag_maybe = MagicMock() + + return scope + + def test_max_output_messages_default(self): + """Default _MAX_OUTPUT_MESSAGES should be 5000.""" + self.assertEqual(OutputScope._MAX_OUTPUT_MESSAGES, 5000) + + def test_record_output_messages_within_limit(self): + """Messages under the limit should not be truncated.""" + scope = self._make_scope(["initial"]) + scope.record_output_messages(["msg1", "msg2", "msg3"]) + self.assertEqual(len(scope._output_messages), 4) + self.assertEqual(scope._output_messages, ["initial", "msg1", "msg2", "msg3"]) + + def test_record_output_messages_exceeds_limit(self): + """Messages exceeding the limit should be truncated to keep newest.""" + scope = self._make_scope([]) + original_max = OutputScope._MAX_OUTPUT_MESSAGES + try: + OutputScope._MAX_OUTPUT_MESSAGES = 10 + + # Add 15 messages + scope.record_output_messages([f"msg_{i}" for i in range(15)]) + + # Should be capped at 10 (keeping the newest) + self.assertEqual(len(scope._output_messages), 10) + # Oldest 5 should be gone, newest 10 should remain + self.assertEqual(scope._output_messages[0], "msg_5") + self.assertEqual(scope._output_messages[-1], "msg_14") + finally: + OutputScope._MAX_OUTPUT_MESSAGES = original_max + + def test_record_output_messages_multiple_calls_capped(self): + """Multiple calls to record_output_messages should stay bounded.""" + scope = self._make_scope([]) + original_max = OutputScope._MAX_OUTPUT_MESSAGES + try: + OutputScope._MAX_OUTPUT_MESSAGES = 5 + + for batch in range(4): + scope.record_output_messages([f"batch{batch}_msg{i}" for i in range(3)]) + + # Total of 12 messages added in 4 batches, should be capped at 5 + self.assertLessEqual(len(scope._output_messages), 5) + # Latest messages should be from the last batches + self.assertIn("batch3_msg2", scope._output_messages) + finally: + OutputScope._MAX_OUTPUT_MESSAGES = original_max + + def test_record_output_messages_exactly_at_limit(self): + """Messages exactly at the limit should not be truncated.""" + scope = self._make_scope([]) + original_max = OutputScope._MAX_OUTPUT_MESSAGES + try: + OutputScope._MAX_OUTPUT_MESSAGES = 5 + scope.record_output_messages([f"msg_{i}" for i in range(5)]) + self.assertEqual(len(scope._output_messages), 5) + finally: + OutputScope._MAX_OUTPUT_MESSAGES = original_max + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/observability/extensions/langchain/test_tracer_bounded.py b/tests/observability/extensions/langchain/test_tracer_bounded.py new file mode 100644 index 00000000..b70c920b --- /dev/null +++ b/tests/observability/extensions/langchain/test_tracer_bounded.py @@ -0,0 +1,109 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT License. + +"""Tests for bounded collections in the LangChain tracer.""" + +import unittest +from collections import OrderedDict +from unittest.mock import MagicMock, patch +from uuid import uuid4 + +from microsoft_agents_a365.observability.extensions.langchain.tracer import ( + CustomLangChainTracer, +) + + +class TestLangChainTracerBounded(unittest.TestCase): + """Tests that LangChain tracer collections are properly bounded.""" + + def _make_tracer(self) -> CustomLangChainTracer: + """Create a tracer with a mock OTel tracer.""" + mock_otel_tracer = MagicMock() + mock_span = MagicMock() + mock_otel_tracer.start_span.return_value = mock_span + return CustomLangChainTracer( + tracer=mock_otel_tracer, + separate_trace_from_runtime_context=True, + ) + + def test_spans_by_run_is_ordered_dict(self): + """_spans_by_run should be an OrderedDict for bounded eviction.""" + tracer = self._make_tracer() + self.assertIsInstance(tracer._spans_by_run, OrderedDict) + + def test_cap_ordered_dict_evicts_oldest(self): + """_cap_ordered_dict should evict oldest entries (FIFO).""" + d: OrderedDict[str, int] = OrderedDict() + for i in range(15): + d[f"key_{i}"] = i + CustomLangChainTracer._cap_ordered_dict(d, 10) + + self.assertEqual(len(d), 10) + # oldest 5 should be gone + for i in range(5): + self.assertNotIn(f"key_{i}", d) + # newest 10 should remain + for i in range(5, 15): + self.assertIn(f"key_{i}", d) + self.assertEqual(d[f"key_{i}"], i) + + def test_cap_ordered_dict_noop_when_under_limit(self): + """_cap_ordered_dict should be a no-op when size is under limit.""" + d: OrderedDict[str, int] = OrderedDict() + for i in range(5): + d[f"key_{i}"] = i + CustomLangChainTracer._cap_ordered_dict(d, 10) + self.assertEqual(len(d), 5) + + def test_spans_by_run_bounded_on_start_trace(self): + """_spans_by_run should be bounded when _start_trace adds entries.""" + tracer = self._make_tracer() + # Use a small cap for testing + original_max = CustomLangChainTracer._MAX_TRACKED_RUNS + try: + CustomLangChainTracer._MAX_TRACKED_RUNS = 5 + + # Add more runs than the cap + for i in range(10): + run = MagicMock() + run.id = uuid4() + run.parent_run_id = None + run.run_type = "llm" + run.name = f"test_run_{i}" + run.start_time = MagicMock() + + with patch( + "microsoft_agents_a365.observability.extensions.langchain.tracer" + ".context_api.get_value", + return_value=None, + ): + tracer._start_trace(run) + + # Should be capped at 5 + self.assertLessEqual(len(tracer._spans_by_run), 5) + finally: + CustomLangChainTracer._MAX_TRACKED_RUNS = original_max + + def test_get_span_returns_none_for_missing(self): + """get_span should return None for non-existent run_id.""" + tracer = self._make_tracer() + result = tracer.get_span(uuid4()) + self.assertIsNone(result) + + def test_get_span_returns_span_for_existing(self): + """get_span should return the span for existing run_id.""" + tracer = self._make_tracer() + run_id = uuid4() + mock_span = MagicMock() + with tracer._lock: + tracer._spans_by_run[run_id] = mock_span + result = tracer.get_span(run_id) + self.assertEqual(result, mock_span) + + def test_max_tracked_runs_default(self): + """Default _MAX_TRACKED_RUNS should be 10000.""" + self.assertEqual(CustomLangChainTracer._MAX_TRACKED_RUNS, 10000) + + +if __name__ == "__main__": + unittest.main()