Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,18 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

## [0.9.0] - 2026-04-06

### Added
- **Content Immune System (CIS)**: Multi-stage content screening pipeline
- Innate scan: pattern-based detection (prompt injection, jailbreak, XSS blocklists)
- Release gate: risk-proportionate gating (pass/quarantine/reject)
- Wired into `vault.add()`: content screened before indexing
- Quarantined resources get `ResourceStatus.QUARANTINED`
- **New CLI commands**: `vault health`, `vault list`, `vault delete`, `vault transition`, `vault expiring`
- `vault.add_batch(sources)` for bulk import
- PostgreSQL schema parity: `adversarial_status`, `tenant_id`, `provenance` table, missing indexes

## [0.8.0] - 2026-04-06

### Added
Expand Down
131 changes: 0 additions & 131 deletions pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,131 +0,0 @@
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"

[project]
name = "qp-vault"
version = "0.8.0"
description = "Governed knowledge store for autonomous organizations. Trust tiers, cryptographic audit trails, content-addressed storage, air-gap native."
readme = "README.md"
license = "Apache-2.0"
requires-python = ">=3.12"
authors = [
{ name = "Quantum Pipes", email = "team@quantumpipes.io" },
]
keywords = [
"knowledge-store",
"vector-search",
"trust-tiers",
"audit-trail",
"content-addressed",
"air-gap",
"post-quantum",
"rag",
"enterprise",
"governance",
]
classifiers = [
"Development Status :: 3 - Alpha",
"Intended Audience :: Developers",
"Intended Audience :: Science/Research",
"License :: OSI Approved :: Apache Software License",
"Operating System :: OS Independent",
"Programming Language :: Python :: 3",
"Programming Language :: Python :: 3.12",
"Programming Language :: Python :: 3.13",
"Programming Language :: Python :: 3.14",
"Topic :: Database",
"Topic :: Scientific/Engineering :: Artificial Intelligence",
"Topic :: Security :: Cryptography",
"Typing :: Typed",
]
dependencies = [
"pydantic>=2.12",
]

[project.optional-dependencies]
sqlite = [
"aiosqlite>=0.22",
]
postgres = [
"sqlalchemy>=2.0.48",
"asyncpg>=0.31",
"pgvector>=0.4.2",
]
docling = [
"docling>=2.73",
]
local = [
"sentence-transformers>=3.0",
]
openai = [
"openai>=1.0",
]
capsule = [
"qp-capsule>=1.5",
]
encryption = [
"cryptography>=42",
"pynacl>=1.6.2",
]
integrity = [
"numpy>=2.0",
]
fastapi = [
"fastapi>=0.135",
]
atlas = [
"gitpython>=3.1",
]
cli = [
"typer>=0.21",
"rich>=14.3",
]
dev = [
"pytest>=8.0",
"pytest-asyncio>=0.25",
"pytest-cov>=6.0",
"httpx>=0.28",
"mypy>=1.14",
"ruff>=0.9",
]
all = [
"qp-vault[sqlite,postgres,docling,capsule,encryption,integrity,fastapi,atlas,cli]",
]

[project.scripts]
vault = "qp_vault.cli.main:app"

[project.urls]
Homepage = "https://github.com/quantumpipes/vault"
Documentation = "https://docs.quantumpipes.io/vault"
Repository = "https://github.com/quantumpipes/vault"
Changelog = "https://github.com/quantumpipes/vault/blob/main/CHANGELOG.md"

[tool.hatch.build.targets.wheel]
packages = ["src/qp_vault"]

[tool.pytest.ini_options]
testpaths = ["tests"]
asyncio_mode = "auto"
markers = [
"unit: Unit tests (no external deps)",
"integration: Integration tests (may need database)",
"e2e: End-to-end tests (full pipeline)",
"postgres: Requires PostgreSQL",
"slow: Tests that take > 5 seconds",
]

[tool.mypy]
python_version = "3.12"
strict = true
warn_return_any = true
warn_unused_configs = true

[tool.ruff]
target-version = "py312"
line-length = 100

[tool.ruff.lint]
select = ["E", "F", "I", "N", "UP", "B", "SIM", "TCH"]
ignore = ["E501"] # Long SQL strings and complex comprehensions
8 changes: 8 additions & 0 deletions src/qp_vault/cis/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
# Copyright 2026 Quantum Pipes Technologies, LLC
# SPDX-License-Identifier: Apache-2.0

"""Content Immune System (CIS): multi-stage content screening pipeline."""

from qp_vault.cis.pipeline import CISPipeline

__all__ = ["CISPipeline"]
89 changes: 89 additions & 0 deletions src/qp_vault/cis/innate_scan.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
# Copyright 2026 Quantum Pipes Technologies, LLC
# SPDX-License-Identifier: Apache-2.0

"""Innate scan: pattern-based content screening.

Fast, deterministic checks using regex patterns, blocklists,
and heuristics. No LLM required. Analogous to the innate immune
system: broad, fast, non-adaptive detection.
"""

from __future__ import annotations

import re
from dataclasses import dataclass, field

from qp_vault.enums import CISResult, CISStage
from qp_vault.models import CISStageRecord

# Default blocklist patterns (prompt injection, jailbreak attempts, data exfiltration)
DEFAULT_BLOCKLIST: list[str] = [
r"ignore\s+(all\s+)?previous\s+instructions",
r"ignore\s+the\s+above",
r"disregard\s+your\s+(system\s+)?prompt",
r"you\s+are\s+now\s+(?:DAN|jailbroken|unfiltered)",
r"pretend\s+you\s+are\s+(?:not\s+)?an?\s+AI",
r"bypass\s+(?:your\s+)?(?:safety|content)\s+(?:filter|policy)",
r"<script[\s>]",
r"javascript:",
r"data:text/html",
r"\bexec\s*\(",
r"\beval\s*\(",
r"__import__\s*\(",
r"subprocess\.",
r"os\.system\s*\(",
]


@dataclass
class InnateScanConfig:
"""Configuration for innate scan stage."""

blocklist_patterns: list[str] = field(default_factory=lambda: list(DEFAULT_BLOCKLIST))
max_pattern_matches: int = 0 # 0 = any match triggers flag
case_sensitive: bool = False


async def run_innate_scan(
content: str,
config: InnateScanConfig | None = None,
) -> CISStageRecord:
"""Run innate scan on content.

Checks content against regex blocklist patterns.

Args:
content: The text content to scan.
config: Optional scan configuration.

Returns:
CISStageRecord with PASS, FLAG, or FAIL result.
"""
if config is None:
config = InnateScanConfig()

flags = re.IGNORECASE if not config.case_sensitive else 0
matches: list[str] = []

for pattern in config.blocklist_patterns:
try:
if re.search(pattern, content, flags):
matches.append(pattern)
except re.error:
continue # Skip malformed patterns

if matches:
return CISStageRecord(
stage=CISStage.INNATE_SCAN,
result=CISResult.FLAG,
details={
"matched_patterns": len(matches),
"patterns": matches[:5], # Limit detail to first 5
},
)

return CISStageRecord(
stage=CISStage.INNATE_SCAN,
result=CISResult.PASS, # nosec B105 — CIS stage result, not a password
details={"patterns_checked": len(config.blocklist_patterns)},
)
85 changes: 85 additions & 0 deletions src/qp_vault/cis/pipeline.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
# Copyright 2026 Quantum Pipes Technologies, LLC
# SPDX-License-Identifier: Apache-2.0

"""CIS Pipeline: orchestrates multi-stage content screening.

Runs content through the Content Immune System stages:
1. INNATE_SCAN — pattern-based detection (regex, blocklists)
2. RELEASE — risk-proportionate gating decision

Future stages (adaptive scan, correlate, surveil, present, remember)
will be added as the pipeline matures.
"""

from __future__ import annotations

from qp_vault.cis.innate_scan import InnateScanConfig, run_innate_scan
from qp_vault.cis.release_gate import evaluate_release
from qp_vault.enums import CISResult, CISStage, ResourceStatus
from qp_vault.models import CISPipelineStatus, CISStageRecord


class CISPipeline:
"""Content Immune System pipeline.

Screens content through multiple stages before allowing indexing.
Content that fails screening is quarantined.

Args:
innate_config: Configuration for the innate scan stage.
enabled: Whether CIS screening is active. Default True.
"""

def __init__(
self,
*,
innate_config: InnateScanConfig | None = None,
enabled: bool = True,
) -> None:
self._innate_config = innate_config
self._enabled = enabled

async def screen(self, content: str) -> CISPipelineStatus:
"""Run content through the CIS pipeline.

Args:
content: Text content to screen.

Returns:
CISPipelineStatus with stage results and overall decision.
"""
if not self._enabled:
return CISPipelineStatus(
stages=[
CISStageRecord(
stage=CISStage.RELEASE,
result=CISResult.PASS, # nosec B105
details={"decision": "released", "reason": "CIS disabled"},
),
],
overall_result=CISResult.PASS, # nosec B105
recommended_status=ResourceStatus.INDEXED,
)

stages: list[CISStageRecord] = []

# Stage 1: Innate scan
innate_result = await run_innate_scan(content, self._innate_config)
stages.append(innate_result)

# Stage 2: Release gate
release_result = await evaluate_release(stages)
stages.append(release_result)

# Determine overall result and recommended status
overall = release_result.result
if overall == CISResult.FAIL or overall == CISResult.FLAG:
status = ResourceStatus.QUARANTINED
else:
status = ResourceStatus.INDEXED

return CISPipelineStatus(
stages=stages,
overall_result=overall,
recommended_status=status,
)
57 changes: 57 additions & 0 deletions src/qp_vault/cis/release_gate.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
# Copyright 2026 Quantum Pipes Technologies, LLC
# SPDX-License-Identifier: Apache-2.0

"""Release gate: risk-proportionate gating before indexing.

Evaluates CIS stage results and decides whether content should be:
- RELEASED (indexed normally)
- HELD (quarantined for human review)
- REJECTED (blocked from indexing)
"""

from __future__ import annotations

from qp_vault.enums import CISResult, CISStage
from qp_vault.models import CISStageRecord


async def evaluate_release(
stage_records: list[CISStageRecord],
) -> CISStageRecord:
"""Evaluate whether content should be released for indexing.

Decision logic:
- If any stage FAIL'd: FAIL (reject)
- If any stage FLAG'd: FLAG (quarantine for review)
- Otherwise: PASS (release)

Args:
stage_records: Results from previous CIS stages.

Returns:
CISStageRecord for the RELEASE stage.
"""
has_fail = any(r.result == CISResult.FAIL for r in stage_records)
has_flag = any(r.result == CISResult.FLAG for r in stage_records)

if has_fail:
failed_stages = [r.stage.value for r in stage_records if r.result == CISResult.FAIL]
return CISStageRecord(
stage=CISStage.RELEASE,
result=CISResult.FAIL,
details={"decision": "rejected", "failed_stages": failed_stages},
)

if has_flag:
flagged_stages = [r.stage.value for r in stage_records if r.result == CISResult.FLAG]
return CISStageRecord(
stage=CISStage.RELEASE,
result=CISResult.FLAG,
details={"decision": "quarantined", "flagged_stages": flagged_stages},
)

return CISStageRecord(
stage=CISStage.RELEASE,
result=CISResult.PASS, # nosec B105
details={"decision": "released", "stages_passed": len(stage_records)},
)
Loading
Loading