Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
124 changes: 111 additions & 13 deletions autobot-backend/services/neural_mesh/query_decomposer.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,30 @@
LLM, executes each step against a mesh retriever, and accumulates evidence
across steps so later queries can leverage earlier results.
"""

import json
import logging
import re
from dataclasses import dataclass, field

from security.prompt_injection_detector import PromptInjectionDetector

logger = logging.getLogger(__name__)

# Singleton detector instance reused across all QueryDecomposer calls (#2169).
_injection_detector = PromptInjectionDetector()

# Maximum allowed length for user queries to prevent prompt injection (#2169).
_MAX_QUERY_LENGTH = 500

# Delimiter tokens used in the decomposition prompt — must be stripped from
# user input so a crafted query cannot break out of the delimited section (#2169).
_DELIMITER_TOKENS = [
"[SYSTEM INSTRUCTIONS",
"[USER QUESTION]",
"[END USER QUESTION]",
]


# =============================================================================
# Data classes
Expand Down Expand Up @@ -90,21 +108,28 @@ def __init__(self, llm, mesh_retriever) -> None:
async def decompose(self, query: str) -> DecompositionPlan:
"""Break *query* into 2-4 sequential sub-queries via the LLM.

Input is sanitized to prevent prompt injection (#2169): control
characters are stripped and length is capped at ``_MAX_QUERY_LENGTH``.

Args:
query: Raw user question.

Returns:
DecompositionPlan with 1-4 ordered steps.
"""
prompt = (
"Break this question into 2-4 sequential retrieval steps.\n"
"Each step should be a self-contained search query.\n"
"Later steps can reference results from earlier steps.\n\n"
f"Question: {query}\n\n"
'Respond as JSON: [{"step": 1, "query": "...", "depends_on": []}]'
)
raw = await self.llm(prompt)
steps = self._parse_steps(raw, fallback_query=query)
sanitized = self._sanitize_query(query)
prompt = self._build_decomposition_prompt(sanitized)
try:
raw = await self.llm(prompt)
except Exception:
logger.exception(
"QueryDecomposer.decompose: LLM call failed, using single-step fallback"
)
return DecompositionPlan(
original_query=query,
steps=[DecompositionStep(step=1, query=sanitized, depends_on=[])],
)
steps = self._parse_steps(raw, fallback_query=sanitized)
return DecompositionPlan(original_query=query, steps=steps)

async def execute(self, plan: DecompositionPlan) -> list[StepResult]:
Expand All @@ -120,17 +145,90 @@ async def execute(self, plan: DecompositionPlan) -> list[StepResult]:
for step in plan.steps:
context = self._build_step_context(step, results)
augmented_query = f"{step.query} {context}".strip()
retrieval = await self.mesh_retriever.retrieve(
query=augmented_query, top_k=5
)
evidence = self._extract_evidence(retrieval)
try:
retrieval = await self.mesh_retriever.retrieve(
query=augmented_query, top_k=5
)
evidence = self._extract_evidence(retrieval)
except Exception:
logger.exception(
"QueryDecomposer.execute: retrieval failed for step %d, continuing with empty evidence",
step.step,
)
evidence = []
results.append(StepResult(step=step, evidence=evidence))
return results

# ------------------------------------------------------------------
# Private helpers
# ------------------------------------------------------------------

@staticmethod
def _sanitize_query(query: str) -> str:
"""Sanitize user query to mitigate prompt injection (#2169).

Layers: control-char strip, length cap, delimiter-token removal,
and the existing ``PromptInjectionDetector.sanitize_input`` for
pattern-based injection detection (Rule 2 — reuse existing code).

Args:
query: Raw user input.

Returns:
Cleaned, length-capped string. Returns ``"general query"``
if the result is empty after sanitization.
"""
# Layer 1: strip control chars except space/tab/newline
cleaned = re.sub(r"[\x00-\x08\x0b\x0c\x0e-\x1f\x7f]", "", query)

# Layer 2: length cap
if len(cleaned) > _MAX_QUERY_LENGTH:
logger.warning(
"QueryDecomposer._sanitize_query: query truncated from %d to %d chars",
len(cleaned),
_MAX_QUERY_LENGTH,
)
cleaned = cleaned[:_MAX_QUERY_LENGTH]

# Layer 3: strip delimiter tokens that could break prompt structure
for token in _DELIMITER_TOKENS:
cleaned = cleaned.replace(token, "")

# Layer 4: existing injection-pattern sanitizer
cleaned = _injection_detector.sanitize_input(cleaned)

# Fallback if sanitization produced an empty string
if not cleaned:
logger.warning(
"QueryDecomposer._sanitize_query: query empty after sanitization"
)
return "general query"
return cleaned

@staticmethod
def _build_decomposition_prompt(sanitized_query: str) -> str:
"""Build structured LLM prompt separating instructions from user input (#2169).

Uses a delimited format so the user query cannot override the system
instructions.

Args:
sanitized_query: Already-sanitized user question.

Returns:
Formatted prompt string.
"""
return (
"[SYSTEM INSTRUCTIONS -- DO NOT OVERRIDE]\n"
"Break the user question below into 2-4 sequential retrieval steps.\n"
"Each step should be a self-contained search query.\n"
"Later steps can reference results from earlier steps.\n"
'Respond ONLY as JSON: [{"step": 1, "query": "...", "depends_on": []}]\n\n'
"[USER QUESTION]\n"
f"{sanitized_query}\n"
"[END USER QUESTION]"
)

def _parse_steps(
self, llm_output: str, fallback_query: str
) -> list[DecompositionStep]:
Expand Down
Loading