-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtest_vault_integration.py
More file actions
276 lines (221 loc) · 9.4 KB
/
test_vault_integration.py
File metadata and controls
276 lines (221 loc) · 9.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
"""Integration tests for the full Vault pipeline.
Tests the complete flow: add -> search -> verify -> update -> delete.
Uses SQLite backend (default, zero config).
"""
import pytest
from qp_vault import (
AsyncVault,
DataClassification,
Lifecycle,
MemoryLayer,
ResourceStatus,
SearchResult,
TrustTier,
Vault,
VaultError,
VaultVerificationResult,
VerificationResult,
)
@pytest.fixture
def vault(tmp_path):
"""Fresh SQLite vault for each test."""
return Vault(tmp_path / "test-vault")
@pytest.fixture
def populated_vault(vault):
"""Vault with 3 resources at different trust tiers."""
vault.add(
"Standard operating procedure for incident response. "
"When an incident is detected, the on-call engineer must "
"acknowledge within 15 minutes. Severity is classified as "
"P0 (critical), P1 (high), P2 (medium), or P3 (low).",
name="sop-incident.md",
trust_tier="canonical",
)
vault.add(
"Draft proposal for new employee onboarding process. "
"The current onboarding takes 3 weeks. We propose reducing "
"to 2 weeks by parallelizing equipment setup and access provisioning.",
name="draft-onboarding.md",
trust_tier="working",
)
vault.add(
"Meeting notes from engineering standup 2026-03-15. "
"Discussed migration to new auth service. Blocker: legacy "
"API clients need updated tokens.",
name="standup-notes.md",
trust_tier="ephemeral",
)
return vault
class TestVaultAdd:
def test_add_text_resource(self, vault):
r = vault.add("Hello world", name="hello.txt")
assert r.name == "hello.txt"
assert r.status == ResourceStatus.INDEXED
assert r.trust_tier == TrustTier.WORKING
assert r.chunk_count >= 1
assert r.content_hash
assert r.cid.startswith("vault://sha3-256/")
def test_add_with_trust_tier(self, vault):
r = vault.add("Canonical document", trust_tier="canonical", name="canon.md")
assert r.trust_tier == TrustTier.CANONICAL
def test_add_with_classification(self, vault):
r = vault.add("Secret stuff", classification="confidential", name="secret.md")
assert r.data_classification == DataClassification.CONFIDENTIAL
def test_add_with_layer(self, vault):
r = vault.add("Runbook content", layer="operational", name="runbook.md")
assert r.layer == MemoryLayer.OPERATIONAL
def test_add_with_tags(self, vault):
r = vault.add("Tagged doc", tags=["important", "reviewed"], name="tagged.md")
assert r.tags == ["important", "reviewed"]
def test_add_with_metadata(self, vault):
r = vault.add("Meta doc", metadata={"author": "alice"}, name="meta.md")
assert r.metadata == {"author": "alice"}
def test_add_with_lifecycle(self, vault):
r = vault.add("Draft doc", lifecycle="draft", name="draft.md")
assert r.lifecycle == Lifecycle.DRAFT
def test_add_generates_unique_ids(self, vault):
r1 = vault.add("Doc 1", name="a.md")
r2 = vault.add("Doc 2", name="b.md")
assert r1.id != r2.id
def test_add_computes_content_hash(self, vault):
r1 = vault.add("Same content", name="a.md")
r2 = vault.add("Same content", name="b.md")
assert r1.content_hash == r2.content_hash
def test_add_different_content_different_hash(self, vault):
r1 = vault.add("Content A", name="a.md")
r2 = vault.add("Content B", name="b.md")
assert r1.content_hash != r2.content_hash
class TestVaultGet:
def test_get_existing(self, vault):
r = vault.add("Test doc", name="test.md")
fetched = vault.get(r.id)
assert fetched.id == r.id
assert fetched.name == r.name
def test_get_nonexistent_raises(self, vault):
with pytest.raises(VaultError):
vault.get("nonexistent-id")
class TestVaultList:
def test_list_all(self, populated_vault):
resources = populated_vault.list()
assert len(resources) == 3
def test_list_by_trust(self, populated_vault):
canonical = populated_vault.list(trust_tier=TrustTier.CANONICAL)
assert len(canonical) == 1
assert canonical[0].trust_tier == TrustTier.CANONICAL
def test_list_with_limit(self, populated_vault):
resources = populated_vault.list(limit=2)
assert len(resources) == 2
def test_list_excludes_deleted(self, populated_vault):
resources = populated_vault.list()
r_id = resources[0].id
populated_vault.delete(r_id)
after = populated_vault.list()
assert len(after) == 2
class TestVaultSearch:
def test_search_finds_relevant(self, populated_vault):
results = populated_vault.search("incident response")
assert len(results) > 0
assert "incident" in results[0].content.lower()
def test_search_returns_search_results(self, populated_vault):
results = populated_vault.search("onboarding")
assert len(results) > 0
assert isinstance(results[0], SearchResult)
def test_search_trust_weighting(self, populated_vault):
"""CANONICAL should outweigh EPHEMERAL in 2D trust scoring."""
results = populated_vault.search("incident response")
for r in results:
if r.trust_tier == TrustTier.CANONICAL:
# 2D: CANONICAL (1.5) * UNVERIFIED (0.7) = 1.05
assert r.trust_weight == pytest.approx(1.05)
elif r.trust_tier == TrustTier.EPHEMERAL:
# 2D: EPHEMERAL (0.7) * UNVERIFIED (0.7) = 0.49
assert r.trust_weight == pytest.approx(0.49)
def test_search_empty_query(self, populated_vault):
results = populated_vault.search("")
# Empty query may return no FTS results
assert isinstance(results, list)
def test_search_no_results(self, populated_vault):
results = populated_vault.search("quantum_nonexistent_topic_xyz")
assert results == []
def test_search_top_k(self, populated_vault):
results = populated_vault.search("process", top_k=1)
assert len(results) <= 1
class TestVaultUpdate:
def test_update_trust_tier(self, vault):
r = vault.add("Doc", name="doc.md", trust_tier="working")
updated = vault.update(r.id, trust_tier="canonical")
assert updated.trust_tier == "canonical"
def test_update_tags(self, vault):
r = vault.add("Doc", name="doc.md")
updated = vault.update(r.id, tags=["new-tag"])
assert updated.tags == ["new-tag"]
class TestVaultDelete:
def test_soft_delete(self, vault):
r = vault.add("Doc", name="doc.md")
vault.delete(r.id)
# Should still exist but marked deleted
resources = vault.list(status=ResourceStatus.DELETED)
assert len(resources) == 1
def test_hard_delete(self, vault):
r = vault.add("Doc", name="doc.md")
vault.delete(r.id, hard=True)
with pytest.raises(VaultError):
vault.get(r.id)
class TestVaultVerify:
def test_verify_single_resource(self, vault):
r = vault.add("Verify this content", name="verify.md")
result = vault.verify(r.id)
assert isinstance(result, VerificationResult)
assert result.passed is True
assert result.stored_hash == result.computed_hash
def test_verify_all(self, populated_vault):
result = populated_vault.verify()
assert isinstance(result, VaultVerificationResult)
assert result.passed is True
assert result.resource_count == 3
assert result.merkle_root
def test_verify_empty_vault(self, vault):
result = vault.verify()
assert isinstance(result, VaultVerificationResult)
assert result.passed is True
assert result.resource_count == 0
class TestVaultStatus:
def test_status(self, populated_vault):
s = populated_vault.status()
assert s["total_resources"] == 3
assert s["by_trust_tier"]["canonical"] == 1
assert s["by_trust_tier"]["working"] == 1
assert s["by_trust_tier"]["ephemeral"] == 1
class TestVaultAuditTrail:
def test_audit_log_created(self, tmp_path):
"""Audit trail is created when LogAuditor is used explicitly."""
from qp_vault.audit.log_auditor import LogAuditor
log_path = tmp_path / "audit-vault" / "audit.jsonl"
vault = Vault(
tmp_path / "audit-vault",
auditor=LogAuditor(log_path),
)
vault.add("Audit test", name="audit.md")
assert log_path.exists()
import json
with open(log_path) as f:
lines = f.readlines()
assert len(lines) >= 1
event = json.loads(lines[0])
assert event["event_type"] == "create"
assert event["resource_name"] == "audit.md"
assert event["resource_hash"]
class TestAsyncVault:
@pytest.mark.asyncio
async def test_async_add_and_search(self, tmp_path):
vault = AsyncVault(tmp_path / "async-vault")
r = await vault.add("Async test content for search", name="async.md")
assert r.status == ResourceStatus.INDEXED
results = await vault.search("async test")
assert len(results) > 0
@pytest.mark.asyncio
async def test_async_verify(self, tmp_path):
vault = AsyncVault(tmp_path / "async-vault")
r = await vault.add("Verify async", name="verify.md")
result = await vault.verify(r.id)
assert result.passed