diff --git a/autobot-backend/services/neural_mesh/query_decomposer.py b/autobot-backend/services/neural_mesh/query_decomposer.py index dbc5722a7..18409cce7 100644 --- a/autobot-backend/services/neural_mesh/query_decomposer.py +++ b/autobot-backend/services/neural_mesh/query_decomposer.py @@ -7,12 +7,30 @@ LLM, executes each step against a mesh retriever, and accumulates evidence across steps so later queries can leverage earlier results. """ + import json import logging +import re from dataclasses import dataclass, field +from security.prompt_injection_detector import PromptInjectionDetector + logger = logging.getLogger(__name__) +# Singleton detector instance reused across all QueryDecomposer calls (#2169). +_injection_detector = PromptInjectionDetector() + +# Maximum allowed length for user queries to prevent prompt injection (#2169). +_MAX_QUERY_LENGTH = 500 + +# Delimiter tokens used in the decomposition prompt — must be stripped from +# user input so a crafted query cannot break out of the delimited section (#2169). +_DELIMITER_TOKENS = [ + "[SYSTEM INSTRUCTIONS", + "[USER QUESTION]", + "[END USER QUESTION]", +] + # ============================================================================= # Data classes @@ -90,21 +108,28 @@ def __init__(self, llm, mesh_retriever) -> None: async def decompose(self, query: str) -> DecompositionPlan: """Break *query* into 2-4 sequential sub-queries via the LLM. + Input is sanitized to prevent prompt injection (#2169): control + characters are stripped and length is capped at ``_MAX_QUERY_LENGTH``. + Args: query: Raw user question. Returns: DecompositionPlan with 1-4 ordered steps. """ - prompt = ( - "Break this question into 2-4 sequential retrieval steps.\n" - "Each step should be a self-contained search query.\n" - "Later steps can reference results from earlier steps.\n\n" - f"Question: {query}\n\n" - 'Respond as JSON: [{"step": 1, "query": "...", "depends_on": []}]' - ) - raw = await self.llm(prompt) - steps = self._parse_steps(raw, fallback_query=query) + sanitized = self._sanitize_query(query) + prompt = self._build_decomposition_prompt(sanitized) + try: + raw = await self.llm(prompt) + except Exception: + logger.exception( + "QueryDecomposer.decompose: LLM call failed, using single-step fallback" + ) + return DecompositionPlan( + original_query=query, + steps=[DecompositionStep(step=1, query=sanitized, depends_on=[])], + ) + steps = self._parse_steps(raw, fallback_query=sanitized) return DecompositionPlan(original_query=query, steps=steps) async def execute(self, plan: DecompositionPlan) -> list[StepResult]: @@ -120,10 +145,17 @@ async def execute(self, plan: DecompositionPlan) -> list[StepResult]: for step in plan.steps: context = self._build_step_context(step, results) augmented_query = f"{step.query} {context}".strip() - retrieval = await self.mesh_retriever.retrieve( - query=augmented_query, top_k=5 - ) - evidence = self._extract_evidence(retrieval) + try: + retrieval = await self.mesh_retriever.retrieve( + query=augmented_query, top_k=5 + ) + evidence = self._extract_evidence(retrieval) + except Exception: + logger.exception( + "QueryDecomposer.execute: retrieval failed for step %d, continuing with empty evidence", + step.step, + ) + evidence = [] results.append(StepResult(step=step, evidence=evidence)) return results @@ -131,6 +163,72 @@ async def execute(self, plan: DecompositionPlan) -> list[StepResult]: # Private helpers # ------------------------------------------------------------------ + @staticmethod + def _sanitize_query(query: str) -> str: + """Sanitize user query to mitigate prompt injection (#2169). + + Layers: control-char strip, length cap, delimiter-token removal, + and the existing ``PromptInjectionDetector.sanitize_input`` for + pattern-based injection detection (Rule 2 — reuse existing code). + + Args: + query: Raw user input. + + Returns: + Cleaned, length-capped string. Returns ``"general query"`` + if the result is empty after sanitization. + """ + # Layer 1: strip control chars except space/tab/newline + cleaned = re.sub(r"[\x00-\x08\x0b\x0c\x0e-\x1f\x7f]", "", query) + + # Layer 2: length cap + if len(cleaned) > _MAX_QUERY_LENGTH: + logger.warning( + "QueryDecomposer._sanitize_query: query truncated from %d to %d chars", + len(cleaned), + _MAX_QUERY_LENGTH, + ) + cleaned = cleaned[:_MAX_QUERY_LENGTH] + + # Layer 3: strip delimiter tokens that could break prompt structure + for token in _DELIMITER_TOKENS: + cleaned = cleaned.replace(token, "") + + # Layer 4: existing injection-pattern sanitizer + cleaned = _injection_detector.sanitize_input(cleaned) + + # Fallback if sanitization produced an empty string + if not cleaned: + logger.warning( + "QueryDecomposer._sanitize_query: query empty after sanitization" + ) + return "general query" + return cleaned + + @staticmethod + def _build_decomposition_prompt(sanitized_query: str) -> str: + """Build structured LLM prompt separating instructions from user input (#2169). + + Uses a delimited format so the user query cannot override the system + instructions. + + Args: + sanitized_query: Already-sanitized user question. + + Returns: + Formatted prompt string. + """ + return ( + "[SYSTEM INSTRUCTIONS -- DO NOT OVERRIDE]\n" + "Break the user question below into 2-4 sequential retrieval steps.\n" + "Each step should be a self-contained search query.\n" + "Later steps can reference results from earlier steps.\n" + 'Respond ONLY as JSON: [{"step": 1, "query": "...", "depends_on": []}]\n\n' + "[USER QUESTION]\n" + f"{sanitized_query}\n" + "[END USER QUESTION]" + ) + def _parse_steps( self, llm_output: str, fallback_query: str ) -> list[DecompositionStep]: