-
Notifications
You must be signed in to change notification settings - Fork 70
fixing MCP headers usag for Llama stack 0.4.x, adding additional e2e tests for mcp servers #1080
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
WalkthroughSwitches MCP mock server to a JSON-RPC interface (initialize, tools/list, tools/call); unifies per-server Changes
Sequence Diagram(s)sequenceDiagram
participant Client as Client
participant App as Lightspeed App
participant ResponsesAPI as Responses API
participant MCP as MCP Mock Server
Client->>App: POST /v1/query (may include MCP-HEADERS)
App->>ResponsesAPI: prepare request (resolve per-server authorization, build toolgroups)
ResponsesAPI->>MCP: JSON-RPC "initialize" (Authorization)
MCP-->>ResponsesAPI: initialize response (protocolVersion, serverInfo)
ResponsesAPI->>MCP: JSON-RPC "tools/list" (Authorization)
MCP-->>ResponsesAPI: tools list (tool_name, inputSchema)
ResponsesAPI->>MCP: JSON-RPC "tools/call" (Authorization, payload)
MCP-->>ResponsesAPI: result or error (error-mode toggles error response)
ResponsesAPI-->>App: aggregated tool results/summary
App-->>Client: HTTP response (includes tool outputs)
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Possibly related PRs
Suggested labels
Suggested reviewers
🚥 Pre-merge checks | ✅ 3✅ Passed checks (3 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
b30f848 to
4b358ab
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 5
🤖 Fix all issues with AI agents
In `@tests/e2e/features/steps/mcp.py`:
- Around line 699-713: The step function
check_streaming_response_has_tool_results only checks status_code but doesn't
parse SSE events; update it to decode context.response.content (or iterate
response.iter_lines() if streaming), extract lines starting with "data:",
collect/parses SSE payloads, then assert that at least one event contains
tool-related content (e.g., contains "tool" or expected tool result keys) and
print the count of matching events; keep using context.response and check for
None/status before parsing.
- Around line 268-281: The step function check_service_logs_contain currently
only prints a message and requires manual verification; replace the manual
prints with a deterministic assertion that programmatically fetches and searches
the service logs (e.g., read from a known log file or query the running
service/container logs via context or a helper like get_service_logs) and raise
an AssertionError if log_fragment is not present; update the
check_service_logs_contain implementation to obtain logs from the test context
(or a utility function), search for log_fragment, and assert presence with a
clear failure message so the test becomes automated and deterministic.
- Around line 428-445: The step function check_tools_list_calls currently only
confirms the mock server is reachable but does not assert any requests contained
the "tools/list" method; either implement a real inspection of request bodies by
fetching the debug request list from the mock server (use the requests.get to
retrieve the full payloads from mock_server_url+"/debug/requests", iterate the
returned requests and assert any body or JSON contains "tools/list") or
explicitly document this limitation in the feature file/tests rather than
silently passing; update check_tools_list_calls to perform the body/JSON check
or adjust the feature to note the verification is not possible without
request-body logging.
- Around line 253-266: The step function check_mcp_tool_in_log currently only
prints and must instead retrieve the MCP mock server request log from the test
context (e.g., context.mcp_request_log or via the MCP test client available in
context), scan the recorded requests for the expected tool_name by inspecting
the Authorization header or the response body (tools/list) entries, and assert
that at least one request contains the expected tool_name; update the function
to raise an assertion (or use behave's assert) with a clear failure message if
no matching entry is found so the test actually fails when the tool is missing.
- Around line 531-541: The configure_mock_server_errors step currently only sets
context.expect_tool_errors and prints a placeholder but the mock server and
subsequent assertions never consume that flag; implement real error-mode support
by adding an error injection mechanism in the MCP mock server (e.g., a query
param/header or a toggle handler) that, when enabled, causes tool execution
endpoints used by tests to return deterministic error responses; update the
configure_mock_server_errors function to flip that toggle on (and ensure it
resets after tests), ensure the flag/context value is actually passed to
requests sent by tests (or the mock reads the toggle) so the mock returns
errors, and implement the corresponding step definitions for "The response
should indicate tool execution failed" and "The service logs should contain tool
failure information" to assert the error payload and log entries are present
(reference configure_mock_server_errors and the failing test scenario names to
locate where to wire the toggle and assertions).
🧹 Nitpick comments (9)
dev-tools/mcp-mock-server/server.py (1)
149-154: Consider consistent truncation handling.The auth preview truncation adds
"..."unconditionally in the result text, even when the header is shorter than 50 characters (line 154 always appends"...").Minor fix for truncation suffix
- auth_preview = ( - auth_header[:50] if len(auth_header) > 50 else auth_header - ) - result_text = ( - f"Mock tool '{tool_called}' executed successfully " - f"with arguments: {arguments}. Auth used: {auth_preview}..." - ) + if len(auth_header) > 50: + auth_preview = f"{auth_header[:50]}..." + else: + auth_preview = auth_header + result_text = ( + f"Mock tool '{tool_called}' executed successfully " + f"with arguments: {arguments}. Auth used: {auth_preview}" + )src/app/endpoints/query_v2.py (2)
1040-1044: Consider using debug level for per-request tool preparation.The
logger.infofor prepared MCP tools will emit a log line for every request. For high-traffic scenarios, this could be verbose. Consider usinglogger.debugfor the per-request detail and keepinginfofor startup/configuration logging only.Use debug level for per-request logging
- logger.info( + logger.debug( "Prepared %d MCP tool(s) for request: %s", len(mcp_tools), ", ".join(mcp_server_names), )
1064-1070: Summary logging at info level may be too verbose for production.Similar to the above, the summary log at
infolevel will emit for every request. Considerdebuglevel or gating behind a more restrictive condition.Consider debug level for per-request summary
- if toolgroups: - logger.info( + if toolgroups: + logger.debug( "Prepared %d total tool(s) for Responses API: %d RAG + %d MCP", len(toolgroups), len(rag_tools) if rag_tools else 0, len(mcp_tools) if mcp_tools else 0, )tests/e2e/features/mcp_tools.feature (2)
20-25: Consider adding MCP-specific assertions to the info endpoint test.This scenario verifies only that the info endpoint returns 200. To validate MCP configuration is reported correctly, consider adding a step that checks for MCP-related fields in the response body.
74-74: Magic number for request count may be fragile.The assertion
at least 6 requestsassumes a specific number of MCP protocol exchanges. If the mock server or protocol changes, this could break. Consider documenting why 6 is expected (e.g., 2 requests × 3 servers = 6) or using a more resilient assertion.tests/e2e/features/steps/mcp.py (4)
39-39: Extract mock server URL to a constant or context variable.The mock server URL
http://localhost:9000is hardcoded in multiple places throughout this file. Consider extracting it to a module-level constant or storing it in the context for easier configuration and maintenance.♻️ Suggested approach
+MCP_MOCK_SERVER_URL = "http://localhost:9000" + + `@given`("The MCP mock server request log is cleared") def clear_mcp_mock_server_log(context: Context) -> None: ... - mock_server_url = "http://localhost:9000" + mock_server_url = getattr(context, "mcp_mock_server_url", MCP_MOCK_SERVER_URL)Apply similarly to all other occurrences in this file.
88-98: Fixed sleep is fragile for synchronization.Using
time.sleep(2)for waiting on async operations can lead to flaky tests if the server is slow, or wasted time if it's fast. Consider polling the mock server's request count with a timeout instead.♻️ Suggested polling approach
`@when`("I wait for MCP server to receive requests") def wait_for_mcp_requests(context: Context) -> None: - # Wait for requests to be processed - time.sleep(2) - print("⏱️ Waited for MCP server to process requests") + """Poll MCP mock server until requests are received or timeout.""" + mock_server_url = "http://localhost:9000" + timeout = 10 + poll_interval = 0.5 + elapsed = 0 + + while elapsed < timeout: + try: + response = requests.get(f"{mock_server_url}/debug/requests", timeout=2) + if response.status_code == 200 and len(response.json()) > 0: + print(f"⏱️ MCP server received requests after {elapsed:.1f}s") + return + except requests.RequestException: + pass + time.sleep(poll_interval) + elapsed += poll_interval + + print("⚠️ Timeout waiting for MCP server requests")
100-151: Consider extracting a helper for fetching mock server requests.Both
check_mcp_server_received_requestsandcheck_mcp_server_request_countshare the same pattern for fetching and parsing the debug requests. A helper function would reduce duplication.♻️ Suggested helper
def _get_mock_server_requests(mock_server_url: str = "http://localhost:9000") -> list: """Fetch requests log from MCP mock server.""" response = requests.get(f"{mock_server_url}/debug/requests", timeout=5) if response.status_code != 200: raise AssertionError(f"Failed to get debug requests: {response.status_code}") return response.json()
153-251: Three auth header verification functions share nearly identical logic.
check_file_auth_header,check_k8s_auth_header, andcheck_client_auth_headerhave the same structure with minor variations (exact match vs. contains). Consider consolidating into a reusable helper to reduce duplication.
| @then("The streaming response should contain tool execution results") | ||
| def check_streaming_response_has_tool_results(context: Context) -> None: | ||
| """Verify streaming response contains tool execution results. | ||
|
|
||
| Parameters: | ||
| context (Context): Behave context. | ||
| """ | ||
| assert context.response is not None, "No response received" | ||
| assert ( | ||
| context.response.status_code == 200 | ||
| ), f"Bad status: {context.response.status_code}" | ||
|
|
||
| # For streaming responses, we'd need to parse SSE events | ||
| # For now, just verify we got a successful response | ||
| print("✅ Streaming response contains tool execution results") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Streaming response verification doesn't parse SSE events.
The step acknowledges that SSE events should be parsed to verify tool execution results but only checks the status code. This means the assertion "streaming response contains tool execution results" is not actually verified.
💡 Consider implementing SSE parsing
`@then`("The streaming response should contain tool execution results")
def check_streaming_response_has_tool_results(context: Context) -> None:
assert context.response is not None, "No response received"
assert context.response.status_code == 200
# Parse SSE events from streaming response
content = context.response.content.decode("utf-8")
events = [line for line in content.split("\n") if line.startswith("data:")]
# Verify at least one event contains tool-related content
tool_events = [e for e in events if "tool" in e.lower()]
assert len(tool_events) > 0, "No tool execution events found in streaming response"
print(f"✅ Found {len(tool_events)} tool-related events in streaming response")🤖 Prompt for AI Agents
In `@tests/e2e/features/steps/mcp.py` around lines 699 - 713, The step function
check_streaming_response_has_tool_results only checks status_code but doesn't
parse SSE events; update it to decode context.response.content (or iterate
response.iter_lines() if streaming), extract lines starting with "data:",
collect/parses SSE payloads, then assert that at least one event contains
tool-related content (e.g., contains "tool" or expected tool result keys) and
print the count of matching events; keep using context.response and check for
None/status before parsing.
tisnik
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- please add Jira issue into title (see contribution guide)
- it changes the LCORE code too, so the title is improper and wrong people will review it
- e2e part will be reviewed by QA
jrobertboos
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall LGTM. I think there are some more test scenarios that can be added but for now really good job getting the e2e tests for MCP started :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great job adding all the logs!
| Then The status code of the response is 200 | ||
|
|
||
| # ============================================================================ | ||
| # Authentication Methods |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should probably have more tests for incorrect tokens and make sure they return correct errors (e.g. 401). This can also be implemented later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 3
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
src/app/main.py (1)
120-125:⚠️ Potential issue | 🟠 MajorFix CORS allow_methods (currently bound to headers).
Line 124 sets
allow_methods=cors.allow_headers, which breaks CORS preflight checks by exposing the headers list instead of the methods list. This should becors.allow_methods.🔧 Suggested fix
app.add_middleware( CORSMiddleware, allow_origins=cors.allow_origins, allow_credentials=cors.allow_credentials, - allow_methods=cors.allow_headers, + allow_methods=cors.allow_methods, allow_headers=cors.allow_headers, )
🤖 Fix all issues with AI agents
In `@src/app/main.py`:
- Around line 269-308: The exception handler in __call__ should track whether
the application has already sent "http.response.start" to avoid sending a second
start and violating ASGI; wrap the send callable (e.g., create a local
wrapped_send that sets response_started = True when it sees message["type"] ==
"http.response.start" and forwards messages to the original send) and use that
wrapped_send when awaiting self.app(scope, receive, wrapped_send); then in the
except block check response_started and if True return early (skip sending
headers/body), otherwise proceed to send the InternalServerErrorResponse
start/body using the original send path (still using JSONResponse and
InternalServerErrorResponse.generic()).
In `@tests/e2e/features/steps/mcp.py`:
- Around line 31-48: The clear_mcp_mock_server_log step currently swallows
failures via print statements which can leave stale data; change function
clear_mcp_mock_server_log to fail fast by replacing the print warnings with
assertions or raised exceptions: after calling
requests.get(f"{MOCK_MCP_SERVER_URL}/debug/clear", timeout=2) assert
response.status_code == 200 (or call response.raise_for_status()) and include
the response content in the assertion message, and in the except
requests.RequestException block re-raise an AssertionError (or let the original
exception propagate) with a clear message including the caught exception so the
test setup fails immediately when the clear endpoint cannot be reached or
returns a non-200 result.
- Around line 533-548: The test check_tools_call_method currently only asserts
that MCP returned some requests; update it to assert that at least one logged
request corresponds to the tools/call endpoint by inspecting the parsed
requests_log (from response.json()) and searching entries for a path or method
field that equals or contains "tools/call"; add an assertion like "No tools/call
request received by MCP server" if none found and keep using mock_server_url and
the /debug/requests endpoint to fetch the log.
🧹 Nitpick comments (8)
src/app/endpoints/query_v2.py (2)
1043-1048: Consider usinglogger.debuginstead oflogger.infofor MCP tool preparation.The log at line 1044 uses
logger.infofor routine MCP tool preparation details. Per coding guidelines,infois for general execution milestones whiledebugis for diagnostic info. Since this is per-request diagnostic information rather than a significant operational milestone,debugmay be more appropriate to avoid log noise in production.Proposed change
- logger.info( + logger.debug( "Prepared %d MCP tool(s) for request: %s", len(mcp_tools), ", ".join(mcp_server_names), )
1068-1074: Same consideration for summary log - usedebugfor per-request diagnostics.This summary log (line 1069) also uses
logger.infofor per-request tool counts. Considerdebuglevel for consistency with the diagnostic nature of this information.Proposed change
- logger.info( + logger.debug( "Prepared %d total tool(s) for Responses API: %d RAG + %d MCP", len(toolgroups), len(rag_tools) if rag_tools else 0, len(mcp_tools) if mcp_tools else 0, )src/app/endpoints/query.py (3)
81-106: Background task pattern is well-designed; minor logging format issue.The pattern correctly:
- Maintains strong references to prevent GC
- Auto-removes completed tasks via done callback
- Handles task creation errors
However, lines 103 and 106 use f-strings instead of %-formatting for logging, which bypasses lazy evaluation:
Use %-style formatting for logger calls
- logger.debug( - f"Background task created, active tasks: {len(background_tasks_set)}" - ) + logger.debug( + "Background task created, active tasks: %d", len(background_tasks_set) + ) except Exception as e: - logger.error(f"Failed to create background task: {e}", exc_info=True) + logger.error("Failed to create background task: %s", e, exc_info=True)
415-432: Background persistence approach is sound; consider extracting the delay constant.The 500ms delay for MCP session cleanup is a reasonable heuristic. Consider extracting it as a named constant for clarity and easier tuning:
Extract delay constant
+# Delay before background task runs to allow MCP session cleanup +MCP_CLEANUP_DELAY_SECONDS = 0.5 + async def persist_with_topic_summary() -> None: ... logger.debug("Background task: waiting for MCP cleanup") - await asyncio.sleep(0.5) # 500ms should be enough for cleanup + await asyncio.sleep(MCP_CLEANUP_DELAY_SECONDS) logger.debug("Background task: MCP cleanup complete")
467-469: Broad exception handling is acceptable here but loses stack trace.For background task resilience, catching all exceptions is reasonable. However, consider adding
exc_info=Trueto preserve the stack trace for debugging:Add exc_info for better debugging
except Exception as e: # pylint: disable=broad-exception-caught - logger.error("Failed to generate topic summary: %s", e) + logger.error("Failed to generate topic summary: %s", e, exc_info=True) topic_summary = Nonesrc/app/endpoints/streaming_query_v2.py (1)
80-93: Code duplication:create_background_taskdiffers fromquery.pyversion.This implementation lacks error handling and logging present in
src/app/endpoints/query.py(lines 87-106). Thequery.pyversion has:
- Try/except around task creation
- Debug logging for task count
- Error logging on failure
Consider extracting to a shared utility module for consistency, or at minimum align the implementations.
Align with query.py implementation
def create_background_task(coro: Any) -> None: """Create a detached background task with strong reference. Args: coro: Coroutine to run as background task """ - task = asyncio.create_task(coro) - background_tasks_set.add(task) - task.add_done_callback(background_tasks_set.discard) + try: + task = asyncio.create_task(coro) + background_tasks_set.add(task) + task.add_done_callback(background_tasks_set.discard) + logger.debug( + "Background task created, active tasks: %d", len(background_tasks_set) + ) + except Exception as e: + logger.error("Failed to create background task: %s", e, exc_info=True)tests/unit/app/endpoints/test_streaming_query_v2.py (1)
226-227: Timing-based test synchronization may be flaky.The 0.7s sleep waits for the 0.5s background task delay plus 0.2s buffer. This could be flaky on slow CI systems. Consider:
- Increasing the buffer margin (e.g., 1.0s total)
- Mocking
asyncio.sleepto make tests deterministic- Using an event or condition for synchronization
Option 1: Increase buffer for CI reliability
# Wait for background cleanup task to complete (has 0.5s delay) - await asyncio.sleep(0.7) + await asyncio.sleep(1.0) # 0.5s delay + 0.5s buffer for slow CIsrc/app/main.py (1)
4-6: Introduce ASGI type aliases to reduce repeated complex signatures.This improves readability and aligns with module-level alias guidance.
As per coding guidelines: Define type aliases at module level for clarity in Python files.♻️ Suggested refactor
from collections.abc import Awaitable, Callable from contextlib import asynccontextmanager from typing import Any, AsyncIterator +ASGIScope = dict[str, Any] +ASGIReceive = Callable[[], Awaitable[dict[str, Any]]] +ASGISend = Callable[[dict[str, Any]], Awaitable[None]] + @@ async def __call__( self, - scope: dict[str, Any], - receive: Callable[[], Awaitable[dict[str, Any]]], - send: Callable[[dict[str, Any]], Awaitable[None]], + scope: ASGIScope, + receive: ASGIReceive, + send: ASGISend, ) -> None: @@ async def __call__( self, - scope: dict[str, Any], - receive: Callable[[], Awaitable[dict[str, Any]]], - send: Callable[[dict[str, Any]], Awaitable[None]], + scope: ASGIScope, + receive: ASGIReceive, + send: ASGISend, ) -> None:Also applies to: 196-201, 269-273
c736daa to
becb161
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 3
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
src/app/main.py (1)
120-125:⚠️ Potential issue | 🟠 MajorCORS
allow_methodsis wired to headers instead of methods.Line 124 incorrectly assigns
cors.allow_headerstoallow_methods, which breaks CORS preflight behavior by sending header names instead of HTTP method names. TheCORSConfigurationclass defines bothallow_methods(for HTTP methods like GET, POST) andallow_headers(for header names like Content-Type, Authorization).🔧 Suggested fix
app.add_middleware( CORSMiddleware, allow_origins=cors.allow_origins, allow_credentials=cors.allow_credentials, - allow_methods=cors.allow_headers, + allow_methods=cors.allow_methods, allow_headers=cors.allow_headers, )
🤖 Fix all issues with AI agents
In `@src/app/endpoints/query_v2.py`:
- Around line 905-960: The comparison that determines missing auth headers is
case-sensitive and will treat "authorization" vs "Authorization" as different;
update the resolution check to be case-insensitive by normalizing header names
(e.g., lowercasing) before comparing: when building required_headers from
mcp_server.authorization_headers.keys(), when creating resolved_headers from
headers.keys(), and when appending the special Authorization resolution (the
local variable authorization), use a consistent case (like .lower()) so
missing_headers is computed with a case-insensitive match for Authorization and
other headers; ensure the logging output still shows original names or a
readable form if needed.
In `@src/app/endpoints/query.py`:
- Around line 446-459: The conversation existence check uses conversation_id
directly, which may include a "conv_" prefix while the DB stores normalized IDs;
update the logic so check_conversation_exists normalizes conversation_id before
querying (e.g., strip a "conv_" prefix or run the same normalization used when
storing IDs) and then query UserConversation with the normalized id; ensure the
same normalization function or rules are applied (call or replicate
normalization used elsewhere) and keep the await
asyncio.to_thread(check_conversation_exists) usage intact so the DB query still
runs in the thread pool.
In `@tests/e2e/features/mcp_tools.feature`:
- Around line 110-117: Update the system_in_default_state step in
tests/e2e/features/steps/common.py to fully reset the MCP error mode by setting
context.expect_tool_errors = False (in addition to resetting
context.auth_headers). Locate the step implementation named
system_in_default_state and add the assignment to ensure the flag cleared
between scenarios so MCP mock error behavior does not leak across tests.
🧹 Nitpick comments (4)
src/app/endpoints/streaming_query_v2.py (1)
85-94: Add error handling/logging when creating background tasks.This keeps parity with the non‑streaming path and avoids silent failures if task creation fails.
♻️ Suggested update
def create_background_task(coro: Any) -> None: """Create a detached background task with strong reference. Args: coro: Coroutine to run as background task """ - task = asyncio.create_task(coro) - background_tasks_set.add(task) - task.add_done_callback(background_tasks_set.discard) + try: + task = asyncio.create_task(coro) + background_tasks_set.add(task) + task.add_done_callback(background_tasks_set.discard) + logger.debug("Background task created, active tasks: %d", len(background_tasks_set)) + except Exception as e: # pylint: disable=broad-exception-caught + logger.error("Failed to create background task: %s", e, exc_info=True)tests/integration/endpoints/test_query_v2_integration.py (1)
36-59: Avoid fixed sleeps; poll background tasks with a timeout to reduce flakiness.This keeps tests stable under variable CI load.
♻️ Suggested update
- # Wait for the initial 500ms delay + buffer - await asyncio.sleep(0.6) - - # Wait for any remaining background tasks to complete - # pylint: disable=import-outside-toplevel - from app.endpoints.query import background_tasks_set - - if background_tasks_set: - await asyncio.gather(*background_tasks_set, return_exceptions=True) - - # Give thread pool operations extra time to complete - await asyncio.sleep(0.2) + # Wait for the initial 500ms delay + buffer + await asyncio.sleep(0.6) + + # Wait for any remaining background tasks to complete (bounded) + # pylint: disable=import-outside-toplevel + from app.endpoints.query import background_tasks_set + + if background_tasks_set: + timeout = 3.0 + loop = asyncio.get_running_loop() + start = loop.time() + while background_tasks_set and (loop.time() - start) < timeout: + await asyncio.sleep(0.05) + await asyncio.gather(*background_tasks_set, return_exceptions=True)tests/e2e/features/mcp_tools.feature (2)
20-24: Scenario assertions may be incomplete for its stated purpose.The scenario is named "Service reports MCP configuration correctly" but only asserts the HTTP status code. Consider adding assertions to validate the actual MCP configuration content in the response (e.g., verifying MCP servers are listed, their auth types, etc.) to match what the scenario name implies.
💡 Suggested additional assertions
Scenario: Service reports MCP configuration correctly Given The system is in default state And I set the Authorization header to Bearer test-token When I access REST API endpoint "info" using HTTP GET method Then The status code of the response is 200 + And The response should contain MCP configuration + And The response should list configured MCP servers
48-56: Consider adding explicit token verification for client-provided auth.For consistency with the file-based (line 37) and Kubernetes (line 46) scenarios, this scenario could benefit from an explicit assertion verifying the captured Authorization header from the client-auth server, rather than only checking which tools were called.
💡 Suggested additional assertion
Then The MCP mock server should have received requests And The MCP mock server request log should contain exactly tools mock_tool_file, mock_tool_k8s, mock_tool_client + And The MCP mock server should have captured Authorization header containing "client-test-token" from mock-client-auth server
| authorization = None | ||
| for name, value in mcp_server.resolved_authorization_headers.items(): | ||
| # for each defined header | ||
| h_value = _get_token_value(value, name) | ||
| # only add the header if we got value | ||
| if h_value is not None: | ||
| headers[name] = h_value | ||
| # Log successful resolution - determine auth type for logging | ||
| match value: | ||
| case _ if value == constants.MCP_AUTH_KUBERNETES: | ||
| auth_type = "kubernetes" | ||
| case _ if value == constants.MCP_AUTH_CLIENT: | ||
| auth_type = "client" | ||
| case _: | ||
| auth_type = "static" | ||
|
|
||
| logger.debug( | ||
| "MCP server '%s': Header '%s' -> type: %s (resolved)", | ||
| mcp_server.name, | ||
| name, | ||
| auth_type, | ||
| ) | ||
| # Special handling for Authorization header (llama_stack 0.4.x+) | ||
| if name.lower() == "authorization": | ||
| authorization = h_value | ||
| else: | ||
| headers[name] = h_value | ||
| else: | ||
| # Log failed resolution | ||
| logger.debug( | ||
| "MCP server '%s': Header '%s' -> FAILED to resolve (value was: %s)", | ||
| mcp_server.name, | ||
| name, | ||
| value, | ||
| ) | ||
|
|
||
| # Skip server if auth headers were configured but not all could be resolved | ||
| if mcp_server.authorization_headers and len(headers) != len( | ||
| mcp_server.authorization_headers | ||
| resolved_count = len(headers) + (1 if authorization is not None else 0) | ||
| if mcp_server.resolved_authorization_headers and resolved_count != len( | ||
| mcp_server.resolved_authorization_headers | ||
| ): | ||
| logger.warning( | ||
| "Skipping MCP server %s: required %d auth headers but only resolved %d", | ||
| required_headers = list(mcp_server.authorization_headers.keys()) | ||
| resolved_headers = list(headers.keys()) | ||
| if authorization is not None: | ||
| resolved_headers.append("Authorization") | ||
| missing_headers = [h for h in required_headers if h not in resolved_headers] | ||
|
|
||
| logger.debug( | ||
| "MCP server '%s' SKIPPED - incomplete auth: " | ||
| "Required: %s | Resolved: %s | Missing: %s (resolved_count=%d, expected=%d)", | ||
| mcp_server.name, | ||
| len(mcp_server.authorization_headers), | ||
| len(headers), | ||
| ", ".join(required_headers), | ||
| ", ".join(resolved_headers) if resolved_headers else "none", | ||
| ", ".join(missing_headers) if missing_headers else "none", | ||
| resolved_count, | ||
| len(mcp_server.resolved_authorization_headers), | ||
| ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Treat Authorization header names case‑insensitively when validating resolution.
If config uses authorization (lowercase), the current comparison adds "Authorization" to resolved_headers, which can falsely mark the header as missing and skip the server.
🐛 Suggested fix (case‑insensitive compare)
- authorization = None
+ authorization = None
+ authorization_header_name: str | None = None
for name, value in mcp_server.resolved_authorization_headers.items():
# for each defined header
h_value = _get_token_value(value, name)
# only add the header if we got value
if h_value is not None:
@@
if name.lower() == "authorization":
authorization = h_value
+ authorization_header_name = name
else:
headers[name] = h_value
@@
- resolved_headers = list(headers.keys())
- if authorization is not None:
- resolved_headers.append("Authorization")
- missing_headers = [h for h in required_headers if h not in resolved_headers]
+ resolved_headers = list(headers.keys())
+ if authorization is not None:
+ resolved_headers.append(authorization_header_name or "Authorization")
+ resolved_header_lc = {h.lower() for h in resolved_headers}
+ missing_headers = [
+ h for h in required_headers if h.lower() not in resolved_header_lc
+ ]🤖 Prompt for AI Agents
In `@src/app/endpoints/query_v2.py` around lines 905 - 960, The comparison that
determines missing auth headers is case-sensitive and will treat "authorization"
vs "Authorization" as different; update the resolution check to be
case-insensitive by normalizing header names (e.g., lowercasing) before
comparing: when building required_headers from
mcp_server.authorization_headers.keys(), when creating resolved_headers from
headers.keys(), and when appending the special Authorization resolution (the
local variable authorization), use a consistent case (like .lower()) so
missing_headers is computed with a case-insensitive match for Authorization and
other headers; ensure the logging output still shows original names or a
readable form if needed.
| def check_conversation_exists() -> bool: | ||
| """Check if conversation exists in database (runs in thread pool).""" | ||
| with get_session() as session: | ||
| existing = ( | ||
| session.query(UserConversation) | ||
| .filter_by(id=conversation_id) | ||
| .first() | ||
| ) | ||
| return existing is not None | ||
|
|
||
| # Run database check in thread pool to avoid connection issues | ||
| conversation_exists = await asyncio.to_thread( | ||
| check_conversation_exists | ||
| ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Normalize conversation IDs before the existence check.
conversation_id may include a conv_ prefix, while the DB stores normalized IDs. This check can misclassify existing conversations and trigger unnecessary topic-summary generation.
🔧 Suggested fix
def check_conversation_exists() -> bool:
"""Check if conversation exists in database (runs in thread pool)."""
+ normalized_id = normalize_conversation_id(conversation_id)
with get_session() as session:
existing = (
session.query(UserConversation)
- .filter_by(id=conversation_id)
+ .filter_by(id=normalized_id)
.first()
)
return existing is not None🤖 Prompt for AI Agents
In `@src/app/endpoints/query.py` around lines 446 - 459, The conversation
existence check uses conversation_id directly, which may include a "conv_"
prefix while the DB stores normalized IDs; update the logic so
check_conversation_exists normalizes conversation_id before querying (e.g.,
strip a "conv_" prefix or run the same normalization used when storing IDs) and
then query UserConversation with the normalized id; ensure the same
normalization function or rules are applied (call or replicate normalization
used elsewhere) and keep the await asyncio.to_thread(check_conversation_exists)
usage intact so the DB query still runs in the thread pool.
| Scenario: Failed MCP tool execution is handled gracefully | ||
| Given The system is in default state | ||
| And I set the Authorization header to Bearer my-k8s-token | ||
| And The MCP mock server is configured to return errors | ||
| When I send a query that uses MCP tools | ||
| Then The status code of the response is 200 | ||
| And The response should indicate tool execution failed | ||
| And The MCP mock server should confirm error mode is active |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
🏁 Script executed:
find tests/e2e -type f -name "*.py" | head -20Repository: lightspeed-core/lightspeed-stack
Length of output: 695
🏁 Script executed:
rg -n "system is in default state" tests/e2e/ -A 10Repository: lightspeed-core/lightspeed-stack
Length of output: 50389
🏁 Script executed:
rg -n "configured to return errors|error.mode|reset.*error" tests/e2e/ -A 5 -B 2Repository: lightspeed-core/lightspeed-stack
Length of output: 6569
🏁 Script executed:
sed -n '25,60p' tests/e2e/features/steps/common.pyRepository: lightspeed-core/lightspeed-stack
Length of output: 715
🏁 Script executed:
cat -n tests/e2e/features/steps/common.py | sed -n '25,80p'Repository: lightspeed-core/lightspeed-stack
Length of output: 834
🏁 Script executed:
rg "expect_tool_errors" tests/e2e/features/steps/ -B 2 -A 2Repository: lightspeed-core/lightspeed-stack
Length of output: 450
🏁 Script executed:
# Check if there are other test state attributes that might not be reset
rg "context\." tests/e2e/features/steps/common.py | head -20Repository: lightspeed-core/lightspeed-stack
Length of output: 363
🏁 Script executed:
rg "expect_tool_errors" tests/e2e/features/steps/ -B 3 -A 3Repository: lightspeed-core/lightspeed-stack
Length of output: 560
🏁 Script executed:
rg "expect_tool_errors" tests/e2e/features/steps/ -A 5Repository: lightspeed-core/lightspeed-stack
Length of output: 504
🏁 Script executed:
# Check complete implementation of system_in_default_state to see all reset logic
cat -n tests/e2e/features/steps/common.py | sed -n '25,50p'Repository: lightspeed-core/lightspeed-stack
Length of output: 834
🏁 Script executed:
cat -n tests/e2e/features/steps/common.py | sed -n '25,100p'Repository: lightspeed-core/lightspeed-stack
Length of output: 834
🏁 Script executed:
# Find all uses of expect_tool_errors to understand impact
rg "expect_tool_errors" tests/e2e/features/steps/ -B 5 -A 5Repository: lightspeed-core/lightspeed-stack
Length of output: 900
🏁 Script executed:
# Check all uses of expect_tool_errors to see if it impacts subsequent scenarios
rg "expect_tool_errors" tests/e2e/features/steps/Repository: lightspeed-core/lightspeed-stack
Length of output: 145
🏁 Script executed:
# See the complete implementation and context of system_in_default_state
wc -l tests/e2e/features/steps/common.pyRepository: lightspeed-core/lightspeed-stack
Length of output: 113
🏁 Script executed:
# Get the complete implementation of system_in_default_state
cat tests/e2e/features/steps/common.pyRepository: lightspeed-core/lightspeed-stack
Length of output: 1511
Add cleanup for MCP error mode flag in "The system is in default state" step.
The context.auth_headers is properly reset, but the context.expect_tool_errors = True flag set by "The MCP mock server is configured to return errors" persists across scenarios. Add context.expect_tool_errors = False to the system_in_default_state step in tests/e2e/features/steps/common.py to ensure complete test isolation.
🤖 Prompt for AI Agents
In `@tests/e2e/features/mcp_tools.feature` around lines 110 - 117, Update the
system_in_default_state step in tests/e2e/features/steps/common.py to fully
reset the MCP error mode by setting context.expect_tool_errors = False (in
addition to resetting context.auth_headers). Locate the step implementation
named system_in_default_state and add the assignment to ensure the flag cleared
between scenarios so MCP mock error behavior does not leak across tests.
3c4e8a9 to
a362b01
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
src/app/main.py (1)
120-125:⚠️ Potential issue | 🟠 MajorFix CORS allow_methods configuration parameter.
The
allow_methodsparameter expects HTTP verbs (e.g.,GET,POST,PUT), not header names. Currently it's receivingcors.allow_headers, which is semantically incorrect. Usecors.allow_methodsinstead to ensure proper CORS preflight handling, especially when custom methods are configured.🛠 Suggested fix
- allow_methods=cors.allow_headers, + allow_methods=cors.allow_methods,
🤖 Fix all issues with AI agents
In `@src/app/endpoints/query_v2.py`:
- Around line 932-938: The debug log in the failed header resolution block
currently prints the raw header value (variable value) which may leak secrets;
update the logger.debug call in query_v2.py (the block that references
mcp_server.name, name, value) to avoid emitting the raw value—either remove the
value from the message and log only mcp_server.name and name (header name/type),
or replace value with a redacted/masked placeholder (e.g. "<redacted>" or a
short hash) before logging; ensure you update only the logger.debug invocation
in that resolution failure path.
🧹 Nitpick comments (1)
docker-compose.yaml (1)
87-91: Consider parameterizing the test token value.The entrypoint correctly chains commands with
&&to ensure the Python script only executes after the token file is successfully created. Since this compose file is clearly for E2E testing, the hardcoded token is acceptable.For additional flexibility (e.g., testing different token values or scenarios), you could parameterize the token via an environment variable:
♻️ Optional: Parameterize the test token
environment: - OPENAI_API_KEY=${OPENAI_API_KEY} # Azure Entra ID credentials (AZURE_API_KEY is obtained dynamically) - TENANT_ID=${TENANT_ID:-} - CLIENT_ID=${CLIENT_ID:-} - CLIENT_SECRET=${CLIENT_SECRET:-} + - MCP_TEST_TOKEN=${MCP_TEST_TOKEN:-test-secret-token-123} entrypoint: > /bin/bash -c " - echo 'test-secret-token-123' > /tmp/lightspeed-mcp-test-token && + echo \"$MCP_TEST_TOKEN\" > /tmp/lightspeed-mcp-test-token && python3.12 /app-root/src/lightspeed_stack.py "
| # Log failed resolution | ||
| logger.debug( | ||
| "MCP server '%s': Header '%s' -> FAILED to resolve (value was: %s)", | ||
| mcp_server.name, | ||
| name, | ||
| value, | ||
| ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Avoid logging raw auth header values.
On failed resolution, the log prints value, which can be a static secret. This leaks credentials into debug logs; log only the header name/type or redact.
🛡 Suggested fix
- logger.debug(
- "MCP server '%s': Header '%s' -> FAILED to resolve (value was: %s)",
- mcp_server.name,
- name,
- value,
- )
+ logger.debug(
+ "MCP server '%s': Header '%s' -> FAILED to resolve",
+ mcp_server.name,
+ name,
+ )📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| # Log failed resolution | |
| logger.debug( | |
| "MCP server '%s': Header '%s' -> FAILED to resolve (value was: %s)", | |
| mcp_server.name, | |
| name, | |
| value, | |
| ) | |
| # Log failed resolution | |
| logger.debug( | |
| "MCP server '%s': Header '%s' -> FAILED to resolve", | |
| mcp_server.name, | |
| name, | |
| ) |
🤖 Prompt for AI Agents
In `@src/app/endpoints/query_v2.py` around lines 932 - 938, The debug log in the
failed header resolution block currently prints the raw header value (variable
value) which may leak secrets; update the logger.debug call in query_v2.py (the
block that references mcp_server.name, name, value) to avoid emitting the raw
value—either remove the value from the message and log only mcp_server.name and
name (header name/type), or replace value with a redacted/masked placeholder
(e.g. "<redacted>" or a short hash) before logging; ensure you update only the
logger.debug invocation in that resolution failure path.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 3
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (2)
src/app/endpoints/query_v2.py (1)
179-270:⚠️ Potential issue | 🟠 MajorAvoid logging raw MCP arguments/output content.
The new debug logs print tool arguments and output previews, which can include secrets or PII. Consider redacting values (log only metadata such as sizes/types) or gating this behind an explicit safe-debug flag.
🛡️ Suggested redaction
- logger.debug(" Arguments: %s", args) + logger.debug(" Arguments: <redacted>") ... - logger.debug(" Output preview: %s", output_preview) + logger.debug(" Output preview: <redacted>")src/app/endpoints/query.py (1)
372-379:⚠️ Potential issue | 🟠 MajorCache is not updated with generated topic_summary; conversation list queries will show empty summary.
Line 378 passes
Nonefortopic_summaryto the cache immediately. The background task generates the summary and updates the database viapersist_user_conversation_details(), but it never callscache.set_topic_summary(). Since conversation list endpoints read from the cache (e.g.,conversations_v2.py:102), users will see an emptytopic_summaryeven after generation completes. Callconfiguration.conversation_cache.set_topic_summary()in the background task after generating the summary to keep cache and database in sync.
🤖 Fix all issues with AI agents
In `@dev-tools/mcp-mock-server/server.py`:
- Around line 235-238: The /debug/clear handler currently clears request_log but
leaves last_headers set, causing stale data to persist; update the clear branch
that handles "/debug/clear" to also reset the last_headers variable (and any
similar header-tracking state) when calling request_log.clear(), and ensure the
response still uses
self._send_json_response({"status":"cleared","request_count":0}) so
/debug/headers returns an empty/cleared state after the reset.
In `@src/app/main.py`:
- Line 124: The CORS configuration in main.py sets
allow_methods=cors.allow_headers which passes header names where HTTP methods
are expected; update the call that configures CORS (the parameter allow_methods)
to use cors.allow_methods instead of cors.allow_headers so the allowed HTTP
methods (GET, POST, PUT, DELETE, etc.) are provided to the CORS middleware.
In `@tests/e2e/features/steps/mcp.py`:
- Around line 626-639: The test currently sets
context.auth_headers["Authorization"] which the stack may not forward; update
the configure_mock_server_errors function to instead ensure context.mcp_headers
exists and propagate the error-mode Authorization through it (e.g., set
context.mcp_headers["Authorization"] = "Bearer error-mode"), keep setting
context.expect_tool_errors = True, and remove or avoid relying solely on
context.auth_headers so the mock server receives the MCP-HEADERS error token.
src/app/main.py
Outdated
| allow_origins=cors.allow_origins, | ||
| allow_credentials=cors.allow_credentials, | ||
| allow_methods=cors.allow_methods, | ||
| allow_methods=cors.allow_headers, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
🏁 Script executed:
# First, let's find and examine the file around line 124
find . -type f -name "main.py" | grep -E "src/app" | head -5Repository: lightspeed-core/lightspeed-stack
Length of output: 93
🏁 Script executed:
# Read the main.py file to see context around line 124
head -150 src/app/main.py | tail -50Repository: lightspeed-core/lightspeed-stack
Length of output: 1824
🏁 Script executed:
# Also search for CORS configuration to understand the cors object structure
rg -A 5 -B 5 "cors\s*=" src/app/main.pyRepository: lightspeed-core/lightspeed-stack
Length of output: 365
🏁 Script executed:
# Search for CORS configuration class/model definition
rg -A 20 "class.*[Cc]ors" src/Repository: lightspeed-core/lightspeed-stack
Length of output: 58
🏁 Script executed:
# Look for cors configuration in the configuration module
find . -type f -name "*.py" -path "*/configuration/*" | head -10Repository: lightspeed-core/lightspeed-stack
Length of output: 58
🏁 Script executed:
# Search for allow_methods in the codebase to understand what's expected
rg "allow_methods" src/Repository: lightspeed-core/lightspeed-stack
Length of output: 322
🏁 Script executed:
# Examine the CORS configuration model
rg -B 5 -A 10 "allow_methods" src/models/config.pyRepository: lightspeed-core/lightspeed-stack
Length of output: 1109
🏁 Script executed:
# Get the full CORS configuration class
rg -B 20 -A 30 "class.*Cors" src/models/config.pyRepository: lightspeed-core/lightspeed-stack
Length of output: 58
🏁 Script executed:
# Let me see the full CORSConfiguration class definition
rg -B 40 "allow_methods.*Field" src/models/config.py | head -60Repository: lightspeed-core/lightspeed-stack
Length of output: 1538
🌐 Web query:
FastAPI CORSMiddleware allow_methods parameter default behavior HTTP methods
💡 Result:
In FastAPI, CORSMiddleware is Starlette’s CORS middleware, and its allow_methods default is only GET:
- FastAPI reference signature shows
allow_methods=("GET",)by default. [1] - Starlette docs confirm
allow_methodsdefaults to['GET'], and you can use['*']to allow all standard methods. [2]
Practical effect: if your browser sends a CORS preflight (OPTIONS with Access-Control-Request-Method: POST/PUT/...) and that method isn’t in allow_methods, the preflight will fail and the browser won’t send the actual request. To allow all methods, set allow_methods=["*"] (or list the exact methods you need). [3]
Sources:
[1] FastAPI middleware reference (CORSMiddleware signature) (fastapi.tiangolo.com)
[2] Starlette middleware docs (CORSMiddleware defaults) (starlette.io)
[3] FastAPI CORS tutorial (using allow_methods=["*"]) (fastapi.tiangolo.com)
Citations:
- 1: https://fastapi.tiangolo.com/reference/middleware/?utm_source=openai
- 2: https://www.starlette.io/middleware/?utm_source=openai
- 3: https://fastapi.tiangolo.com/bn/tutorial/cors/?utm_source=openai
Fix CORS allow_methods mapping.
Line 124 incorrectly passes cors.allow_headers to the allow_methods parameter. The allow_methods parameter expects HTTP method names (GET, POST, PUT, DELETE, etc.), while allow_headers contains header names (Content-Type, Authorization, etc.). This will cause CORS preflight failures because browsers request allowed methods but receive header names instead, blocking actual requests. Use cors.allow_methods instead.
🔧 Suggested fix
- allow_methods=cors.allow_headers,
+ allow_methods=cors.allow_methods,📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| allow_methods=cors.allow_headers, | |
| allow_methods=cors.allow_methods, |
🤖 Prompt for AI Agents
In `@src/app/main.py` at line 124, The CORS configuration in main.py sets
allow_methods=cors.allow_headers which passes header names where HTTP methods
are expected; update the call that configures CORS (the parameter allow_methods)
to use cors.allow_methods instead of cors.allow_headers so the allowed HTTP
methods (GET, POST, PUT, DELETE, etc.) are provided to the CORS middleware.
a362b01 to
4f35463
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🤖 Fix all issues with AI agents
In `@src/app/main.py`:
- Around line 184-244: The code compares scope["path"] (concrete request path)
to app_routes_paths strings, so parameterized routes (e.g. /conversations/{id})
are never matched and using the raw scope path as a metric label risks
cardinality explosion; replace the simple membership check and label usage with
a route match lookup and use the route's template for labels: iterate your
registered routes (instead of comparing path strings), call route.matches(scope)
(or the equivalent match method on your framework's Route object) to find the
matching Route, skip if no match, set a label_path = route.path or
route.template (the template string like "/conversations/{conversation_id}") and
use label_path in metrics.response_duration_seconds.labels(...) and
metrics.rest_api_calls_total.labels(label_path, status_code).inc(); keep
send_wrapper and status capture as-is. Ensure app_routes_paths is the collection
of Route objects (or convert strings to Route objects) so route.matches() and
route.path are available.
🧹 Nitpick comments (2)
tests/integration/endpoints/test_query_v2_integration.py (1)
36-59: Reduce fixed-sleep flakiness in background-task waits.
The hardcoded 0.6s/0.2s sleeps can be brittle under slower CI. Prefer a timeout-based wait on the tracked tasks so tests fail fast if tasks stall.♻️ Proposed refactor
async def wait_for_background_tasks() -> None: """Wait for background tasks to complete. @@ - # Wait for the initial 500ms delay + buffer - await asyncio.sleep(0.6) - - # Wait for any remaining background tasks to complete + # Wait for any remaining background tasks to complete (with timeout) # pylint: disable=import-outside-toplevel from app.endpoints.query import background_tasks_set - if background_tasks_set: - await asyncio.gather(*background_tasks_set, return_exceptions=True) - - # Give thread pool operations extra time to complete - await asyncio.sleep(0.2) + if background_tasks_set: + await asyncio.wait_for( + asyncio.gather(*background_tasks_set, return_exceptions=True), + timeout=2.0, + )tests/unit/app/endpoints/test_streaming_query_v2.py (1)
226-227: Avoid fixed sleep; wait on the tracked cleanup task instead.
This makes the test deterministic and less timing-sensitive.♻️ Proposed refactor
-from app.endpoints.streaming_query_v2 import ( +from app.endpoints.streaming_query_v2 import ( + background_tasks_set, retrieve_response, streaming_query_endpoint_handler_v2, ) @@ - # Wait for background cleanup task to complete (has 0.5s delay) - await asyncio.sleep(0.7) + # Wait for background cleanup task to complete + await asyncio.wait_for( + asyncio.gather(*background_tasks_set, return_exceptions=True), + timeout=2.0, + )
| def __init__( # pylint: disable=redefined-outer-name | ||
| self, app: Any, app_routes_paths: list[str] | ||
| ) -> None: | ||
| """Initialize metrics middleware. | ||
|
|
||
| Parameters: | ||
| app: The ASGI application instance to wrap | ||
| app_routes_paths: List of route paths to monitor (others ignored) | ||
| """ | ||
| self.app = app | ||
| self.app_routes_paths = app_routes_paths | ||
|
|
||
| async def __call__( | ||
| self, | ||
| scope: dict[str, Any], | ||
| receive: Callable[[], Awaitable[dict[str, Any]]], | ||
| send: Callable[[dict[str, Any]], Awaitable[None]], | ||
| ) -> None: | ||
| """Handle ASGI request/response cycle with metrics collection. | ||
|
|
||
| This is the ASGI interface. The method receives: | ||
| - scope: Request metadata (type, path, headers, method, etc.) | ||
| - receive: Async callable to get messages from client | ||
| - send: Async callable to send messages to client | ||
|
|
||
| We wrap the send callable to intercept the response status code. | ||
| """ | ||
| if scope["type"] != "http": | ||
| await self.app(scope, receive, send) | ||
| return | ||
|
|
||
| path = scope["path"] | ||
|
|
||
| # Ignore paths not in app routes | ||
| if path not in self.app_routes_paths: | ||
| await self.app(scope, receive, send) | ||
| return | ||
|
|
||
| logger.debug("Processing API request for path: %s", path) | ||
|
|
||
| # Track response status code by wrapping send callable | ||
| # ASGI sends responses in two messages: | ||
| # 1. http.response.start (contains status code and headers) | ||
| # 2. http.response.body (contains response content) | ||
| # We intercept message #1 to capture the status code | ||
| status_code = None | ||
|
|
||
| async def send_wrapper(message: dict[str, Any]) -> None: | ||
| """Capture response status code from ASGI messages.""" | ||
| nonlocal status_code | ||
| if message["type"] == "http.response.start": | ||
| status_code = message["status"] | ||
| await send(message) | ||
|
|
||
| # Measure duration and execute | ||
| with metrics.response_duration_seconds.labels(path).time(): | ||
| await self.app(scope, receive, send_wrapper) | ||
|
|
||
| # Update metrics (ignore /metrics endpoint) | ||
| if status_code and not path.endswith("/metrics"): | ||
| metrics.rest_api_calls_total.labels(path, status_code).inc() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
🏁 Script executed:
find . -name "main.py" -path "*/src/app/*" | head -5Repository: lightspeed-core/lightspeed-stack
Length of output: 93
🏁 Script executed:
wc -l src/app/main.py 2>/dev/null || echo "File not found at src/app/main.py"Repository: lightspeed-core/lightspeed-stack
Length of output: 95
🏁 Script executed:
head -250 src/app/main.py | tail -80Repository: lightspeed-core/lightspeed-stack
Length of output: 3136
🏁 Script executed:
grep -n "app_routes_paths" src/app/main.pyRepository: lightspeed-core/lightspeed-stack
Length of output: 418
🏁 Script executed:
grep -n "app.routes\|Mount\|Route\|WebSocketRoute" src/app/main.pyRepository: lightspeed-core/lightspeed-stack
Length of output: 618
🏁 Script executed:
sed -n '330,350p' src/app/main.pyRepository: lightspeed-core/lightspeed-stack
Length of output: 645
🏁 Script executed:
fd "routers" -type f -o -type d | grep -E "\.py$|/$"Repository: lightspeed-core/lightspeed-stack
Length of output: 246
🏁 Script executed:
find . -name "routers.py" -o -name "routers" -type dRepository: lightspeed-core/lightspeed-stack
Length of output: 96
🏁 Script executed:
grep -r "path.*=" . -A 2 -B 2 --include="*.py" | grep -E "items|users|{.*}" | head -20Repository: lightspeed-core/lightspeed-stack
Length of output: 1923
🏁 Script executed:
grep -r "starlette" . --include="*.txt" --include="*.toml" --include="*.cfg" --include="pyproject.toml"Repository: lightspeed-core/lightspeed-stack
Length of output: 272
🏁 Script executed:
cat src/app/routers.pyRepository: lightspeed-core/lightspeed-stack
Length of output: 2562
🏁 Script executed:
grep -n "{" src/app/routers.py | head -20Repository: lightspeed-core/lightspeed-stack
Length of output: 58
🏁 Script executed:
python3 << 'EOF'
# Check if Starlette 0.50.0 supports Match enum and route.matches()
try:
from starlette.routing import Match, Route
print("Match enum available:", hasattr(Match, 'FULL'))
# Create a test route to check if matches() method exists
test_route = Route("/items/{item_id}", endpoint=lambda x: None)
print("route.matches() available:", hasattr(test_route, 'matches'))
print("Match.FULL value:", Match.FULL)
except Exception as e:
print(f"Error: {e}")
EOFRepository: lightspeed-core/lightspeed-stack
Length of output: 163
🏁 Script executed:
grep -r "@router\|@.*\.get\|@.*\.post\|path=" src/app/endpoints/ | grep -E "\{.*\}" | head -20Repository: lightspeed-core/lightspeed-stack
Length of output: 633
🏁 Script executed:
find src/app/endpoints -name "*.py" -exec grep -l "{" {} \;Repository: lightspeed-core/lightspeed-stack
Length of output: 746
🏁 Script executed:
head -100 src/app/endpoints/conversations_v3.py | grep -A 5 "@router"Repository: lightspeed-core/lightspeed-stack
Length of output: 58
🏁 Script executed:
grep -n "conversations/{conversation_id}\|providers/{provider_id}\|rags/{rag_id}" src/app/endpoints/*.pyRepository: lightspeed-core/lightspeed-stack
Length of output: 1140
🏁 Script executed:
python3 << 'EOF'
from starlette.routing import Match, Route
# Test how matches() works with parameterized routes
test_route = Route("/conversations/{conversation_id}", endpoint=lambda x: None)
# Simulate scope for a parameterized request
scope_with_param = {
"type": "http",
"path": "/conversations/abc123",
"method": "GET"
}
# Test matching
match_result = test_route.matches(scope_with_param)
print(f"Match result type: {type(match_result[0])}")
print(f"Match result: {match_result[0]}")
print(f"Match.FULL comparison: {match_result[0] == Match.FULL}")
print(f"Route path: {test_route.path}")
# Test non-matching
scope_no_param = {
"type": "http",
"path": "/other/path",
"method": "GET"
}
match_result_no = test_route.matches(scope_no_param)
print(f"\nNon-matching result: {match_result_no[0]}")
EOFRepository: lightspeed-core/lightspeed-stack
Length of output: 240
Metrics path filtering fails for parameterized routes and can cause cardinality explosion.
Line 218 compares scope["path"] (actual request path like /conversations/abc123) as a string against template paths like /conversations/{conversation_id} in app_routes_paths, so requests to parameterized routes won't be matched and metrics won't be recorded. Additionally, using raw scope["path"] as a metric label can explode cardinality. Match routes dynamically using route.matches() and label with the route template path instead.
🔧 Suggested fix (match routes; label with template path)
-from starlette.routing import Mount, Route, WebSocketRoute
+from starlette.routing import Mount, Route, WebSocketRoute, Match
@@
- def __init__( # pylint: disable=redefined-outer-name
- self, app: Any, app_routes_paths: list[str]
- ) -> None:
+ def __init__( # pylint: disable=redefined-outer-name
+ self, app: Any, app_routes: list[Route]
+ ) -> None:
@@
- self.app_routes_paths = app_routes_paths
+ self.app_routes = app_routes
@@
- path = scope["path"]
-
- # Ignore paths not in app routes
- if path not in self.app_routes_paths:
- await self.app(scope, receive, send)
- return
+ path = scope["path"]
+ matched_route = next(
+ (r for r in self.app_routes if r.matches(scope)[0] == Match.FULL),
+ None,
+ )
+ if matched_route is None:
+ await self.app(scope, receive, send)
+ return
+ path_label = matched_route.path
@@
- with metrics.response_duration_seconds.labels(path).time():
+ with metrics.response_duration_seconds.labels(path_label).time():
await self.app(scope, receive, send_wrapper)
@@
- if status_code and not path.endswith("/metrics"):
- metrics.rest_api_calls_total.labels(path, status_code).inc()
+ if status_code and not path_label.endswith("/metrics"):
+ metrics.rest_api_calls_total.labels(path_label, status_code).inc()
@@
-app_routes_paths = [
+app_routes = [
route.path
for route in app.routes
if isinstance(route, (Mount, Route, WebSocketRoute))
]
@@
-app.add_middleware(MetricsMiddleware, app_routes_paths=app_routes_paths)
+app.add_middleware(MetricsMiddleware, app_routes=app_routes)Also applies to: 338-342
🤖 Prompt for AI Agents
In `@src/app/main.py` around lines 184 - 244, The code compares scope["path"]
(concrete request path) to app_routes_paths strings, so parameterized routes
(e.g. /conversations/{id}) are never matched and using the raw scope path as a
metric label risks cardinality explosion; replace the simple membership check
and label usage with a route match lookup and use the route's template for
labels: iterate your registered routes (instead of comparing path strings), call
route.matches(scope) (or the equivalent match method on your framework's Route
object) to find the matching Route, skip if no match, set a label_path =
route.path or route.template (the template string like
"/conversations/{conversation_id}") and use label_path in
metrics.response_duration_seconds.labels(...) and
metrics.rest_api_calls_total.labels(label_path, status_code).inc(); keep
send_wrapper and status capture as-is. Ensure app_routes_paths is the collection
of Route objects (or convert strings to Route objects) so route.matches() and
route.path are available.
4f35463 to
09c1419
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🤖 Fix all issues with AI agents
In `@src/app/endpoints/streaming_query_v2.py`:
- Around line 85-94: Wrap the asyncio.create_task call in create_background_task
with a try/except and log any exceptions so task creation failures aren't
silent: call asyncio.create_task(coro) inside a try block, add the resulting
task to background_tasks_set and attach
task.add_done_callback(background_tasks_set.discard) in that try, and in the
except Exception as e block log the error (use the module's logger, e.g.,
logger.exception(...) or process_logger.error(...) if present) including the
exception details; do not change the existing background_tasks_set or callback
names.
🧹 Nitpick comments (7)
tests/e2e/features/steps/common.py (1)
39-41: Consider also resettingexpect_tool_errorsflag for complete test isolation.The
auth_headersreset is good, but per the past review comment onmcp_tools.feature, thecontext.expect_tool_errorsflag set by "The MCP mock server is configured to return errors" step can also persist across scenarios. For complete test isolation, consider adding:# Clear auth headers to ensure clean state between scenarios if hasattr(context, "auth_headers"): context.auth_headers = {} + # Clear MCP error mode flag to ensure clean state between scenarios + if hasattr(context, "expect_tool_errors"): + context.expect_tool_errors = FalseBased on learnings: "In the Behave Python testing framework, the Context object is created once for the entire test run and reused across all features and scenarios. Custom attributes added to the context persist until explicitly cleared."
src/app/endpoints/streaming_query_v2.py (1)
80-94: Consider extracting shared background task infrastructure to a utility module.The
background_tasks_setandcreate_background_taskpattern is duplicated between this file andsrc/app/endpoints/query.py. Extracting this to a shared utility (e.g.,utils/background_tasks.py) would reduce duplication and ensure consistent behavior.dev-tools/mcp-mock-server/server.py (1)
97-113: Consider using if/elif instead ofmatch Truefor token matching.The
match Truewith case guards is an unconventional pattern. While it works, standardif/elifchains are more idiomatic for this type of string matching:♻️ Alternative with if/elif (optional)
- match True: - case _ if "test-secret-token" in auth_header: - tool_name = "mock_tool_file" - tool_desc = "Mock tool with file-based auth" - case _ if "my-k8s-token" in auth_header: - tool_name = "mock_tool_k8s" - tool_desc = "Mock tool with Kubernetes token" - case _ if "my-client-token" in auth_header: - tool_name = "mock_tool_client" - tool_desc = "Mock tool with client-provided token" - case _ if "error-mode" in auth_header: - tool_name = "mock_tool_error" - tool_desc = "Mock tool configured to return errors" - error_mode = True - case _: - # Default case already set above - pass + if "test-secret-token" in auth_header: + tool_name = "mock_tool_file" + tool_desc = "Mock tool with file-based auth" + elif "my-k8s-token" in auth_header: + tool_name = "mock_tool_k8s" + tool_desc = "Mock tool with Kubernetes token" + elif "my-client-token" in auth_header: + tool_name = "mock_tool_client" + tool_desc = "Mock tool with client-provided token" + elif "error-mode" in auth_header: + tool_name = "mock_tool_error" + tool_desc = "Mock tool configured to return errors" + error_mode = Truetests/integration/endpoints/test_query_v2_integration.py (1)
161-168: Good approach for test database isolation, but kwargs not forwarded.The
mock_to_threadfunction accepts**kwargsbut doesn't pass them tofunc(). Whileasyncio.to_threadtypically doesn't use kwargs for the target function (they're passed as positional args), the signature mismatch could cause subtle issues if kwargs are ever used.♻️ Minor fix to forward kwargs
async def mock_to_thread(func: Any, *args: Any, **kwargs: Any) -> Any: """Run function synchronously instead of in thread pool for tests.""" - return func(*args, **kwargs) + return func(*args)Or if kwargs support is intentional:
async def mock_to_thread(func: Any, *args: Any, **kwargs: Any) -> Any: """Run function synchronously instead of in thread pool for tests.""" return func(*args, **kwargs)src/app/main.py (1)
81-85: Remove commented-out code.The commented code block for MCP server registration (lines 81-85) should be removed rather than left as comments. If this code path is no longer needed for the Responses API, removing it improves maintainability.
♻️ Remove commented code
- # NOTE: MCP server registration not needed for Responses API - # The Responses API takes inline tool definitions instead of pre-registered toolgroups - # logger.info("Registering MCP servers") - # await register_mcp_servers_async(logger, configuration.configuration) - # get_logger("app.endpoints.handlers") logger.info("App startup complete")tests/e2e/features/steps/mcp.py (2)
831-848: Streaming response verification doesn't parse SSE events.The step acknowledges that SSE events should be parsed (line 843 comment) but only checks the status code. This means the assertion "streaming response contains tool execution results" isn't actually verified.
💡 Consider implementing SSE parsing
`@then`("The streaming response should contain tool execution results") def check_streaming_response_has_tool_results(context: Context) -> None: assert context.response is not None, "No response received" assert context.response.status_code == 200 # Parse SSE events from streaming response content = context.response.content.decode("utf-8") events = [line for line in content.split("\n") if line.startswith("data:")] # Verify at least one event contains tool-related content tool_events = [e for e in events if "tool" in e.lower()] assert len(tool_events) > 0, "No tool execution events found in streaming response" print(f"✅ Found {len(tool_events)} tool-related events in streaming response") time.sleep(0.5)
78-87: Consider polling instead of fixed sleep for MCP request waits.The fixed
time.sleep(2)at line 86 may be insufficient under load or too long for fast scenarios. Consider polling the/debug/requestsendpoint until requests appear or a timeout is reached.💡 Polling approach example
`@when`("I wait for MCP server to receive requests") def wait_for_mcp_requests(context: Context) -> None: """Wait for MCP server to receive and log requests with polling.""" max_wait = 5 # seconds poll_interval = 0.2 elapsed = 0 while elapsed < max_wait: try: response = requests.get(f"{MOCK_MCP_SERVER_URL}/debug/requests", timeout=2) if response.status_code == 200 and len(response.json()) > 0: print(f"⏱️ MCP server received requests after {elapsed:.1f}s") return except requests.RequestException: pass time.sleep(poll_interval) elapsed += poll_interval print(f"⏱️ Waited {max_wait}s for MCP server (may have no requests)")
| def create_background_task(coro: Any) -> None: | ||
| """Create a detached background task with strong reference. | ||
|
|
||
| Args: | ||
| coro: Coroutine to run as background task | ||
| """ | ||
| task = asyncio.create_task(coro) | ||
| background_tasks_set.add(task) | ||
| task.add_done_callback(background_tasks_set.discard) | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add error handling to create_background_task for consistency with query.py.
The create_background_task implementation in query.py (lines 86-105) includes try/except with error logging, but this version lacks it. This inconsistency could lead to silent failures.
♻️ Proposed fix to add error handling
def create_background_task(coro: Any) -> None:
"""Create a detached background task with strong reference.
Args:
coro: Coroutine to run as background task
"""
- task = asyncio.create_task(coro)
- background_tasks_set.add(task)
- task.add_done_callback(background_tasks_set.discard)
+ try:
+ task = asyncio.create_task(coro)
+ background_tasks_set.add(task)
+ task.add_done_callback(background_tasks_set.discard)
+ logger.debug(
+ f"Background task created, active tasks: {len(background_tasks_set)}"
+ )
+ except Exception as e:
+ logger.error(f"Failed to create background task: {e}", exc_info=True)🤖 Prompt for AI Agents
In `@src/app/endpoints/streaming_query_v2.py` around lines 85 - 94, Wrap the
asyncio.create_task call in create_background_task with a try/except and log any
exceptions so task creation failures aren't silent: call
asyncio.create_task(coro) inside a try block, add the resulting task to
background_tasks_set and attach
task.add_done_callback(background_tasks_set.discard) in that try, and in the
except Exception as e block log the error (use the module's logger, e.g.,
logger.exception(...) or process_logger.error(...) if present) including the
exception details; do not change the existing background_tasks_set or callback
names.
Description
This PR achieves 3 main things:
This PR turned out much larger then expected because of 2 issues
Root Cause 1 - Streaming Response Bug: Llama Stack's MCP tool execution uses streaming responses (Server-Sent Events), which exposed critical bugs in FastAPI's BaseHTTPMiddleware - specifically the RuntimeError: "No response returned" error that occurs when middleware tries to handle streaming endpoints.
Root Cause 2 - MCP Cleanup & Connection Management: MCP server connections and LLM streaming calls need to be properly closed AFTER the response is fully streamed, but we also need to persist conversation data to the database without blocking the stream or delaying the client.
The Fix: Required a complete architectural change:
In short: MCP streaming responses + required cleanup + database persistence = complete architectural change from decorator-based to ASGI middleware + async background tasks for DB writes and MCP cleanup.
Type of change
Tools used to create PR
Identify any AI code assistants used in this PR (for transparency and review context)
Related Tickets & Documents
Checklist before requesting a review
Testing
Summary by CodeRabbit
New Features
Bug Fixes
Documentation
Tests
Other
✏️ Tip: You can customize this high-level summary in your review settings.