Complete Python SDK for qp-vault v1.6.0.
Vault(
path: str | Path,
*,
storage: StorageBackend | None = None, # Default: SQLite
embedder: EmbeddingProvider | None = None, # Default: None
auditor: AuditProvider | None = None, # Default: LogAuditor (auto-detects CapsuleAuditor)
parsers: list[ParserProvider] | None = None,
policies: list[PolicyProvider] | None = None,
config: VaultConfig | None = None,
plugins_dir: str | Path | None = None, # Air-gap plugin directory
tenant_id: str | None = None, # Lock vault to single tenant
role: str | None = None, # RBAC: "reader", "writer", "admin", or None
)When tenant_id is set, the vault enforces tenant isolation: operations auto-inject the locked tenant, and operations with a mismatched tenant_id raise VaultError.
When role is set, all operations are checked against the RBAC permission matrix. Operations exceeding the role's permissions raise VaultError with code VAULT_700.
Vault.from_postgres(dsn: str, **kwargs) -> Vault
Vault.from_config(config_path: str | Path) -> Vaultvault.add(
source: str | Path | bytes,
*,
name: str | None = None,
trust_tier: TrustTier | str = "working",
classification: DataClassification | str = "internal",
layer: MemoryLayer | str | None = None,
collection: str | None = None,
tags: list[str] | None = None, # Max 50, 100 chars each
metadata: dict[str, Any] | None = None, # Max 100 keys, alphanumeric
lifecycle: Lifecycle | str = "active",
valid_from: date | None = None,
valid_until: date | None = None,
tenant_id: str | None = None,
) -> ResourceContent is screened by the Membrane pipeline before indexing. Flagged content is quarantined.
vault.add_batch(
sources: list[str | Path | bytes],
*,
trust_tier: TrustTier | str = "working",
tenant_id: str | None = None,
**kwargs,
) -> list[Resource]vault.get(resource_id: str) -> Resourcevault.get_multiple(resource_ids: list[str]) -> list[Resource]Batch retrieval in a single query. Missing IDs are silently omitted.
vault.get_content(resource_id: str) -> strReassembles chunks in order to return the full text content. Quarantined resources raise VaultError.
vault.reprocess(resource_id: str) -> ResourceRe-chunks and re-embeds an existing resource. Useful when the embedding model changes or chunking parameters are updated. The resource content is preserved; only chunks and embeddings are regenerated.
# After switching embedding models
updated = vault.reprocess(resource.id)
assert updated.status == "indexed"Emits an UPDATE subscriber event with details={"reprocessed": True}.
vault.list(
*,
tenant_id: str | None = None,
trust_tier: TrustTier | str | None = None,
classification: DataClassification | str | None = None,
layer: MemoryLayer | str | None = None,
collection: str | None = None,
lifecycle: Lifecycle | str | None = None,
status: ResourceStatus | str | None = None,
tags: list[str] | None = None,
limit: int = 50,
offset: int = 0,
) -> list[Resource]vault.find_by_name(
name: str,
*,
tenant_id: str | None = None,
collection_id: str | None = None,
) -> Resource | NoneCase-insensitive name lookup. Returns the first matching non-deleted resource, or None.
resource = vault.find_by_name("STRATEGY.md")
# Also matches "strategy.md", "Strategy.MD"vault.update(
resource_id: str,
*,
name: str | None = None,
trust_tier: TrustTier | str | None = None,
classification: DataClassification | str | None = None,
tags: list[str] | None = None,
metadata: dict[str, Any] | None = None,
) -> Resourcevault.delete(resource_id: str, *, hard: bool = False) -> Nonevault.replace(
resource_id: str,
new_content: str,
*,
reason: str | None = None,
) -> tuple[Resource, Resource]Creates a new resource with the new content and supersedes the old one. Returns (old, new).
vault.upsert(
source: str | Path | bytes,
*,
name: str | None = None,
trust_tier: TrustTier | str = "working",
tenant_id: str | None = None,
**kwargs,
) -> ResourceAdd-or-replace atomically. If a resource with the same name and tenant exists, supersedes it. Otherwise creates new.
vault.search(
query: str,
*,
tenant_id: str | None = None,
top_k: int = 10,
offset: int = 0, # Pagination
threshold: float = 0.0,
min_trust_tier: TrustTier | str | None = None,
layer: MemoryLayer | str | None = None,
collection: str | None = None,
as_of: date | None = None, # Point-in-time
deduplicate: bool = True, # One result per resource
explain: bool = False, # Include scoring breakdown
graph_boost: bool = False, # Boost docs mentioning detected entities
) -> list[SearchResult]When no embedder is configured, search automatically falls back to text-only mode (vector_weight=0.0, text_weight=1.0). This ensures search works on day one without requiring an embedding model.
When graph_boost=True and vault.graph is available, search detects entities in the query text, fetches their backlinks, and applies a 15% relevance boost to documents that mention those entities. Off by default. Best-effort: any failure falls back to standard search.
vault.search_with_facets(query: str, **kwargs) -> dict[str, Any]Returns {"results": [...], "total": N, "facets": {"trust_tier": {...}, "resource_type": {...}}}.
vault.grep(
keywords: list[str],
*,
tenant_id: str | None = None,
top_k: int = 20,
max_keywords: int = 20,
) -> list[SearchResult]Multi-keyword OR search with three-signal blended scoring. Executes a single FTS5 OR query (SQLite) or ILIKE+trigram query (PostgreSQL) regardless of keyword count.
Scoring formula: coverage * (0.7 * text_rank + 0.3 * proximity) where:
- Coverage (Lucene coord factor):
matched_keywords / total_keywords, applied as a multiplier. 3/3 = full score, 1/3 = 33%. - Text rank: native FTS5 bm25 or pg_trgm similarity (0.0-1.0).
- Proximity: how close matched keywords appear to each other within the chunk.
results = vault.grep(["revenue", "Q3", "forecast"])
# Results sorted by blended relevance (coverage * text_rank + proximity)
# explain_metadata includes: matched_keywords, hit_density, text_rank, proximity, snippet
print(results[0].explain_metadata["snippet"])
# "...discussed **Q3** **revenue** **forecast** projections..."No embedder required. Single database query. Results deduplicated by resource and trust-weighted.
SearchResult fields:
| Field | Type | Description |
|---|---|---|
chunk_id |
str | Chunk identifier |
resource_id |
str | Parent resource |
resource_name |
str | Display name |
content |
str | Chunk text |
vector_similarity |
float | Cosine similarity (0-1) |
text_rank |
float | Full-text match score |
trust_weight |
float | Trust tier x adversarial multiplier |
freshness |
float | Decay factor |
relevance |
float | Composite score |
updated_at |
str | Resource timestamp (for freshness) |
resource_type |
str | Document type |
data_classification |
str | Sensitivity level |
trust_tier |
TrustTier | Resource trust tier |
adversarial_status |
AdversarialStatus | Membrane verification status |
cid |
str | Chunk content ID (SHA3-256) |
lifecycle |
Lifecycle | Resource lifecycle state |
vault.transition(resource_id: str, target: Lifecycle | str, *, reason: str | None = None) -> Resourcevault.supersede(old_id: str, new_id: str) -> tuple[Resource, Resource]vault.expiring(*, days: int = 90) -> list[Resource]vault.chain(resource_id: str) -> list[Resource]Max chain length: 1000 (cycle protection).
vault.verify(resource_id: str | None = None) -> VerificationResult | VaultVerificationResultvault.export_proof(resource_id: str) -> MerkleProofvault.get_provenance(resource_id: str) -> list[dict[str, Any]]vault.set_adversarial_status(resource_id: str, status: str) -> ResourceStatus values: "unverified", "verified", "suspicious".
vault.create_collection(name: str, *, description: str = "", tenant_id: str | None = None) -> dictvault.list_collections(*, tenant_id: str | None = None) -> list[dict]vault.layer(name: MemoryLayer | str) -> LayerViewvault.health(resource_id: str | None = None) -> HealthScorePass resource_id for per-resource health, or None for vault-wide.
vault.export_vault(path: str | Path) -> dict[str, Any]vault.import_vault(path: str | Path) -> list[Resource]Access via vault.graph. Returns GraphEngine when the storage backend supports graphs, None otherwise.
vault.graph -> GraphEngine | NoneFull documentation: Knowledge Graph Guide
Quick reference:
# Nodes
node = await vault.graph.create_node(name="Alice", entity_type="person")
node = await vault.graph.get_node(node_id)
nodes, total = await vault.graph.list_nodes(entity_type="person", limit=20)
results = await vault.graph.search_nodes("Alice")
updated = await vault.graph.update_node(node_id, name="Alice Smith")
await vault.graph.delete_node(node_id)
# Edges
edge = await vault.graph.create_edge(source_id=a.id, target_id=b.id, relation_type="knows")
edges = await vault.graph.get_edges(node_id, direction="outgoing")
await vault.graph.delete_edge(edge_id)
# Traversal + context
neighbors = await vault.graph.neighbors(node_id, depth=2)
context = await vault.graph.context_for([node_id])
# Mentions
await vault.graph.track_mention(node_id, resource_id, context_snippet="...")
backlinks = await vault.graph.get_backlinks(node_id)
# Cross-space + merge
await vault.graph.add_to_space(node_id, space_id)
merged = await vault.graph.merge_nodes(keep_id, merge_id)
# Scan
job = await vault.graph.scan(space_id)vault.status() -> dict[str, Any]vault.subscribe(callback: Callable[[VaultEvent], Any]) -> Callable[[], None]Register a callback for vault mutation events. Returns an unsubscribe function. Callbacks can be sync or async; async callbacks are awaited directly. Errors in callbacks are logged and never propagated to the caller.
from qp_vault import AsyncVault, VaultEvent
vault = AsyncVault("./knowledge")
# Sync callback
def on_change(event: VaultEvent) -> None:
print(f"{event.event_type}: {event.resource_name}")
unsub = vault.subscribe(on_change)
# Add a resource (callback fires with CREATE event)
vault.add("Content", name="doc.md")
# Stop receiving events
unsub()Events emitted on:
| Operation | EventType |
|---|---|
add() |
CREATE |
update() |
UPDATE |
delete() |
DELETE |
reprocess() |
UPDATE (with details.reprocessed=True) |
transition() |
LIFECYCLE_TRANSITION |
Multiple subscribers are independent. Unsubscribing one does not affect others. Calling unsub() twice is safe.
vault.register_embedder(embedder: EmbeddingProvider) -> None
vault.register_parser(parser: ParserProvider) -> None
vault.register_policy(policy: PolicyProvider) -> None| Enum | Values |
|---|---|
TrustTier |
canonical, working, ephemeral, archived |
DataClassification |
public, internal, confidential, restricted |
ResourceType |
document, image, audio, video, note, code, spreadsheet, transcript, other |
ResourceStatus |
pending, quarantined, processing, indexed, error, deleted |
Lifecycle |
draft, review, active, superseded, expired, archived |
MemoryLayer |
operational, strategic, compliance |
AdversarialStatus |
unverified, verified, suspicious |
MembraneStage |
ingest, innate_scan, adaptive_scan, correlate, release, surveil, present, remember |
MembraneResult |
pass, flag, fail, skip |
EventType |
create, update, delete, restore, trust_change, classification_change, lifecycle_transition, supersede, verify, search, membrane_scan, membrane_release, membrane_flag, adversarial_status_change |
Role |
reader, writer, admin |
| Code | Exception | When |
|---|---|---|
| VAULT_000 | VaultError |
General error, resource not found |
| VAULT_100 | StorageError |
Database operation failed |
| VAULT_200 | VerificationError |
Integrity check failed |
| VAULT_300 | LifecycleError |
Invalid state transition |
| VAULT_400 | PolicyError |
Policy denied operation |
| VAULT_500 | ChunkingError |
Text chunking failed |
| VAULT_600 | ParsingError |
File parsing failed |
| VAULT_700 | PermissionError |
RBAC permission denied |