|
| 1 | +# Copyright 2026 Quantum Pipes Technologies, LLC |
| 2 | +# SPDX-License-Identifier: Apache-2.0 |
| 3 | + |
| 4 | +"""Adaptive scan: LLM-based semantic content screening. |
| 5 | +
|
| 6 | +Uses a pluggable LLMScreener to detect adversarial content that regex |
| 7 | +patterns cannot catch: obfuscated prompt injection, encoded payloads, |
| 8 | +social engineering, and semantic attacks. Air-gap safe when backed by |
| 9 | +a local LLM (Ollama, vLLM). |
| 10 | +
|
| 11 | +The adaptive scan runs after innate_scan and before the release gate. |
| 12 | +If no LLMScreener is configured, the stage is skipped (SKIP result). |
| 13 | +""" |
| 14 | + |
| 15 | +from __future__ import annotations |
| 16 | + |
| 17 | +import time |
| 18 | +from dataclasses import dataclass, field |
| 19 | +from typing import TYPE_CHECKING |
| 20 | + |
| 21 | +from qp_vault.enums import MembraneResult, MembraneStage |
| 22 | +from qp_vault.models import MembraneStageRecord |
| 23 | + |
| 24 | +if TYPE_CHECKING: |
| 25 | + from qp_vault.protocols import LLMScreener |
| 26 | + |
| 27 | +_DEFAULT_MAX_CONTENT_LENGTH = 4000 # Chars sent to LLM (cost/latency bound) |
| 28 | +_DEFAULT_RISK_THRESHOLD = 0.7 # >= this score triggers FLAG |
| 29 | + |
| 30 | + |
| 31 | +@dataclass |
| 32 | +class AdaptiveScanConfig: |
| 33 | + """Configuration for the adaptive scan stage.""" |
| 34 | + |
| 35 | + screener: LLMScreener | None = None |
| 36 | + max_content_length: int = _DEFAULT_MAX_CONTENT_LENGTH |
| 37 | + risk_threshold: float = _DEFAULT_RISK_THRESHOLD |
| 38 | + flag_categories: list[str] = field(default_factory=lambda: [ |
| 39 | + "prompt_injection", |
| 40 | + "jailbreak", |
| 41 | + "encoded_payload", |
| 42 | + "social_engineering", |
| 43 | + "data_exfiltration", |
| 44 | + "instruction_override", |
| 45 | + ]) |
| 46 | + |
| 47 | + |
| 48 | +async def run_adaptive_scan( |
| 49 | + content: str, |
| 50 | + config: AdaptiveScanConfig | None = None, |
| 51 | +) -> MembraneStageRecord: |
| 52 | + """Run LLM-based adaptive scan on content. |
| 53 | +
|
| 54 | + Args: |
| 55 | + content: The text content to screen. |
| 56 | + config: Adaptive scan configuration (includes LLMScreener). |
| 57 | +
|
| 58 | + Returns: |
| 59 | + MembraneStageRecord with PASS, FLAG, or SKIP result. |
| 60 | + """ |
| 61 | + if config is None or config.screener is None: |
| 62 | + return MembraneStageRecord( |
| 63 | + stage=MembraneStage.ADAPTIVE_SCAN, |
| 64 | + result=MembraneResult.SKIP, |
| 65 | + reasoning="No LLM screener configured, stage skipped", |
| 66 | + ) |
| 67 | + |
| 68 | + # Truncate content for cost/latency |
| 69 | + scan_content = content[:config.max_content_length] |
| 70 | + |
| 71 | + start = time.monotonic() |
| 72 | + try: |
| 73 | + screening = await config.screener.screen(scan_content) |
| 74 | + except Exception as e: |
| 75 | + # LLM failure should not block ingestion; log and skip |
| 76 | + return MembraneStageRecord( |
| 77 | + stage=MembraneStage.ADAPTIVE_SCAN, |
| 78 | + result=MembraneResult.SKIP, |
| 79 | + reasoning=f"LLM screener error: {type(e).__name__}", |
| 80 | + duration_ms=int((time.monotonic() - start) * 1000), |
| 81 | + ) |
| 82 | + |
| 83 | + duration_ms = int((time.monotonic() - start) * 1000) |
| 84 | + |
| 85 | + if screening.risk_score >= config.risk_threshold: |
| 86 | + return MembraneStageRecord( |
| 87 | + stage=MembraneStage.ADAPTIVE_SCAN, |
| 88 | + result=MembraneResult.FLAG, |
| 89 | + risk_score=screening.risk_score, |
| 90 | + reasoning=screening.reasoning, |
| 91 | + matched_patterns=screening.flags or [], |
| 92 | + duration_ms=duration_ms, |
| 93 | + ) |
| 94 | + |
| 95 | + return MembraneStageRecord( |
| 96 | + stage=MembraneStage.ADAPTIVE_SCAN, |
| 97 | + result=MembraneResult.PASS, # nosec B105 |
| 98 | + risk_score=screening.risk_score, |
| 99 | + reasoning=screening.reasoning, |
| 100 | + matched_patterns=screening.flags or [], |
| 101 | + duration_ms=duration_ms, |
| 102 | + ) |
0 commit comments