Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,19 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

## [0.10.0] - 2026-04-06

### Added
- **Search intelligence**: deduplication (one result per resource), pagination offset, explain mode (scoring breakdown)
- **Knowledge self-healing**: semantic near-duplicate detection, contradiction detection (trust/lifecycle conflicts)
- **Real-time event streaming**: VaultEventStream for subscribing to vault mutations
- **Telemetry**: VaultTelemetry with operation counters, latency, error rates
- **Per-resource health**: vault.health(resource_id) for individual quality assessment
- **Import/export**: vault.export_vault(path) and vault.import_vault(path) for portable vaults

### Removed
- `[atlas]` extra (no implementation; removed to avoid confusion)

## [0.9.0] - 2026-04-06

### Added
Expand Down
7 changes: 2 additions & 5 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "hatchling.build"

[project]
name = "qp-vault"
version = "0.9.0"
version = "0.10.0"
description = "Governed knowledge store for autonomous organizations. Trust tiers, cryptographic audit trails, content-addressed storage, air-gap native."
readme = "README.md"
license = "Apache-2.0"
Expand Down Expand Up @@ -74,9 +74,6 @@ integrity = [
fastapi = [
"fastapi>=0.135",
]
atlas = [
"gitpython>=3.1",
]
cli = [
"typer>=0.21",
"rich>=14.3",
Expand All @@ -90,7 +87,7 @@ dev = [
"ruff>=0.9",
]
all = [
"qp-vault[sqlite,postgres,docling,capsule,encryption,integrity,fastapi,atlas,cli]",
"qp-vault[sqlite,postgres,docling,local,openai,capsule,encryption,integrity,fastapi,cli]",
]

[project.scripts]
Expand Down
2 changes: 1 addition & 1 deletion src/qp_vault/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
Docs: https://github.com/quantumpipes/vault
"""

__version__ = "0.9.0"
__version__ = "0.10.0"
__author__ = "Quantum Pipes Technologies, LLC"
__license__ = "Apache-2.0"

Expand Down
4 changes: 2 additions & 2 deletions src/qp_vault/adversarial.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@
model where effective RAG weight = trust_tier_weight * adversarial_multiplier.

Status transitions:
UNVERIFIED -> VERIFIED (all CIS stages passed)
UNVERIFIED -> SUSPICIOUS (one or more CIS stages flagged)
UNVERIFIED -> VERIFIED (all Membrane stages passed)
UNVERIFIED -> SUSPICIOUS (one or more Membrane stages flagged)
SUSPICIOUS -> VERIFIED (human reviewer cleared after investigation)
VERIFIED -> SUSPICIOUS (re-assessment flagged new concerns)
"""
Expand Down
2 changes: 1 addition & 1 deletion src/qp_vault/core/search_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ def apply_trust_weighting(
tier = result.trust_tier.value if hasattr(result.trust_tier, "value") else str(result.trust_tier)
tw = compute_trust_weight(tier, config)

# CIS 2D trust: multiply by adversarial verification status
# Membrane 2D trust: multiply by adversarial verification status
adv_status = getattr(result, "adversarial_status", None)
adv_str = adv_status.value if hasattr(adv_status, "value") else str(adv_status or "unverified")
adv_mult = compute_adversarial_multiplier(adv_str)
Expand Down
16 changes: 8 additions & 8 deletions src/qp_vault/enums.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ class ResourceStatus(StrEnum):
"""Registered, not yet processed."""

QUARANTINED = "quarantined"
"""CIS screening in progress. Excluded from search and RAG retrieval."""
"""Membrane screening in progress. Excluded from search and RAG retrieval."""

PROCESSING = "processing"
"""Being chunked and embedded."""
Expand All @@ -75,24 +75,24 @@ class ResourceStatus(StrEnum):


class AdversarialStatus(StrEnum):
"""CIS adversarial verification status (second dimension of trust).
"""Membrane adversarial verification status (second dimension of trust).

Orthogonal to TrustTier (organizational confidence).
Effective RAG weight = trust_tier_weight * adversarial_multiplier.
"""

UNVERIFIED = "unverified"
"""Not yet screened by CIS. Default for legacy content. Multiplier: 0.7x."""
"""Not yet screened by Membrane. Default for legacy content. Multiplier: 0.7x."""

VERIFIED = "verified"
"""Passed all CIS stages. Multiplier: 1.0x."""
"""Passed all Membrane stages. Multiplier: 1.0x."""

SUSPICIOUS = "suspicious"
"""Flagged by one or more CIS stages. Multiplier: 0.3x."""
"""Flagged by one or more Membrane stages. Multiplier: 0.3x."""


class CISStage(StrEnum):
"""Content Immune System pipeline stages."""
"""Membrane pipeline stages."""

INGEST = "ingest"
"""Stage 1: Provenance recording, format validation."""
Expand Down Expand Up @@ -120,9 +120,9 @@ class CISStage(StrEnum):


class CISResult(StrEnum):
"""Result of a single CIS stage evaluation."""
"""Result of a single Membrane stage evaluation."""

PASS = "pass" # nosec B105 — CIS stage result, not a password
PASS = "pass" # nosec B105 — Membrane stage result, not a password
"""Content cleared this stage."""

FLAG = "flag"
Expand Down
108 changes: 108 additions & 0 deletions src/qp_vault/integrity/detector.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import math
from collections import Counter
from datetime import UTC, datetime
from typing import Any

from qp_vault.models import HealthScore, Resource

Expand Down Expand Up @@ -106,6 +107,113 @@ def find_orphans(
return orphans


def find_near_duplicates(
resources: list[Resource],
chunks_by_resource: dict[str, list[Any]] | None = None,
*,
similarity_threshold: float = 0.85,
) -> list[tuple[Resource, Resource, float]]:
"""Find semantically similar resources using chunk embedding comparison.

Compares resources by their first chunk's embedding. Returns pairs
above the similarity threshold.

Args:
resources: Resources to compare.
chunks_by_resource: Dict mapping resource_id to list of Chunk objects.
similarity_threshold: Minimum cosine similarity to flag as near-duplicate.

Returns:
List of (resource_a, resource_b, similarity_score) tuples.
"""
if not chunks_by_resource:
return []

# Get first-chunk embeddings per resource
embeddings: dict[str, list[float]] = {}
for r in resources:
chunks = chunks_by_resource.get(r.id, [])
if chunks and hasattr(chunks[0], "embedding") and chunks[0].embedding:
embeddings[r.id] = chunks[0].embedding

# Pairwise comparison
resource_map = {r.id: r for r in resources}
pairs: list[tuple[Resource, Resource, float]] = []
ids = list(embeddings.keys())

for i in range(len(ids)):
for j in range(i + 1, len(ids)):
a, b = embeddings[ids[i]], embeddings[ids[j]]
if len(a) != len(b) or not a:
continue
dot = sum(x * y for x, y in zip(a, b, strict=False))
norm_a = sum(x * x for x in a) ** 0.5
norm_b = sum(x * x for x in b) ** 0.5
if norm_a == 0 or norm_b == 0:
continue
sim = dot / (norm_a * norm_b)
if sim >= similarity_threshold:
pairs.append((resource_map[ids[i]], resource_map[ids[j]], sim))

return sorted(pairs, key=lambda x: x[2], reverse=True)


def detect_contradictions(
resources: list[Resource],
chunks_by_resource: dict[str, list[Any]] | None = None,
) -> list[dict[str, Any]]:
"""Detect potential contradictions between resources.

Looks for resources with similar topics (high embedding similarity)
but different trust tiers or opposing lifecycle states, which may
indicate conflicting information.

This is a heuristic approach. For full NLI-based contradiction
detection, an LLM provider is required (future enhancement).

Args:
resources: Resources to analyze.
chunks_by_resource: Dict mapping resource_id to chunk lists.

Returns:
List of contradiction records with resource pairs and reasons.
"""
contradictions: list[dict[str, Any]] = []

# Find resources that are semantically similar but have different trust tiers
near_dupes = find_near_duplicates(
resources, chunks_by_resource, similarity_threshold=0.75
)

for r_a, r_b, similarity in near_dupes:
tier_a = r_a.trust_tier.value if hasattr(r_a.trust_tier, "value") else str(r_a.trust_tier)
tier_b = r_b.trust_tier.value if hasattr(r_b.trust_tier, "value") else str(r_b.trust_tier)

# Flag if semantically similar but different trust tiers
if tier_a != tier_b:
contradictions.append({
"type": "trust_conflict",
"resource_a": {"id": r_a.id, "name": r_a.name, "trust_tier": tier_a},
"resource_b": {"id": r_b.id, "name": r_b.name, "trust_tier": tier_b},
"similarity": similarity,
"reason": f"Similar content ({similarity:.0%}) with different trust tiers ({tier_a} vs {tier_b})",
})

# Flag if one is active and one is superseded (potential stale reference)
lc_a = r_a.lifecycle.value if hasattr(r_a.lifecycle, "value") else str(r_a.lifecycle)
lc_b = r_b.lifecycle.value if hasattr(r_b.lifecycle, "value") else str(r_b.lifecycle)
if lc_a == "active" and lc_b == "superseded" or lc_a == "superseded" and lc_b == "active":
contradictions.append({
"type": "lifecycle_conflict",
"resource_a": {"id": r_a.id, "name": r_a.name, "lifecycle": lc_a},
"resource_b": {"id": r_b.id, "name": r_b.name, "lifecycle": lc_b},
"similarity": similarity,
"reason": f"Similar content ({similarity:.0%}) but conflicting lifecycle ({lc_a} vs {lc_b})",
})

return contradictions


def compute_health_score(
resources: list[Resource],
) -> HealthScore:
Expand Down
6 changes: 3 additions & 3 deletions src/qp_vault/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ class HealthScore(BaseModel):
resource_count: int = 0


# --- Content Immune System models ---
# --- Membrane models ---


class ContentProvenance(BaseModel):
Expand All @@ -214,7 +214,7 @@ class ContentProvenance(BaseModel):


class CISStageRecord(BaseModel):
"""Result of a single CIS pipeline stage evaluation.
"""Result of a single Membrane pipeline stage evaluation.

Every stage (INGEST through REMEMBER) creates one record per document.
Records are immutable once created.
Expand All @@ -233,7 +233,7 @@ class CISStageRecord(BaseModel):


class CISPipelineStatus(BaseModel):
"""Aggregate status of the CIS pipeline for a single document."""
"""Aggregate status of the Membrane pipeline for a single document."""

resource_id: str = ""
stages: list[CISStageRecord] = Field(default_factory=list)
Expand Down
89 changes: 89 additions & 0 deletions src/qp_vault/streaming.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
# Copyright 2026 Quantum Pipes Technologies, LLC
# SPDX-License-Identifier: Apache-2.0

"""Real-time event streaming for vault mutations.

Allows agents and consumers to subscribe to vault events
and react in real-time to knowledge changes.
"""

from __future__ import annotations

import asyncio
from collections import deque
from typing import TYPE_CHECKING, Any

if TYPE_CHECKING:
from qp_vault.models import VaultEvent


class VaultEventStream:
"""Real-time event stream for vault mutations.

Consumers subscribe and receive VaultEvents as they occur.
Supports multiple concurrent subscribers.

Usage:
stream = VaultEventStream()
vault = AsyncVault("./knowledge", auditor=stream)

# Subscribe in an async context
async for event in stream.subscribe():
print(f"{event.event_type}: {event.resource_name}")
"""

def __init__(self, *, buffer_size: int = 1000) -> None:
self._subscribers: list[asyncio.Queue[VaultEvent]] = []
self._history: deque[VaultEvent] = deque(maxlen=buffer_size)

async def record(self, event: VaultEvent) -> str:
"""Record an event and broadcast to all subscribers.

Implements AuditProvider protocol so it can be used as auditor.
"""
import uuid
event_id = str(uuid.uuid4())
self._history.append(event)

# Broadcast to all subscribers (drop if slow)
import contextlib
for queue in self._subscribers:
with contextlib.suppress(asyncio.QueueFull):
queue.put_nowait(event)

return event_id

async def subscribe(self, *, replay: bool = False) -> Any:
"""Subscribe to the event stream.

Args:
replay: If True, replay recent history before live events.

Yields:
VaultEvent objects as they occur.
"""
queue: asyncio.Queue[VaultEvent] = asyncio.Queue(maxsize=100)
self._subscribers.append(queue)

try:
# Replay history if requested
if replay:
for event in self._history:
yield event

# Stream live events
while True:
event = await queue.get()
yield event
finally:
self._subscribers.remove(queue)

@property
def history(self) -> list[VaultEvent]:
"""Get recent event history."""
return list(self._history)

@property
def subscriber_count(self) -> int:
"""Number of active subscribers."""
return len(self._subscribers)
Loading
Loading