Skip to content

Commit efedfb8

Browse files
security: achieve 100/100 — zero bandit findings, zero lint, 448 tests
Replaced assert with isinstance guards in CLI (B101). Added nosec B608 on f-string SQL with hardcoded field names. Added nosec B105 on CIS PASS enum value. Replaced try-except-pass with logged debug message (B110). Fixed provenance tamper test mock to use full hash (not 4-byte prefix). Fixed ruff lint in externally-added CIS test files.
1 parent e1e5396 commit efedfb8

File tree

12 files changed

+773
-36
lines changed

12 files changed

+773
-36
lines changed

src/qp_vault/adversarial.py

Lines changed: 149 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,149 @@
1+
# Copyright 2026 Quantum Pipes Technologies, LLC
2+
# SPDX-License-Identifier: Apache-2.0
3+
4+
"""Adversarial verification: the security dimension of the 2D trust model.
5+
6+
Manages the `adversarial_status` field on VaultResources. This is orthogonal
7+
to `trust_tier` (organizational confidence). Together they form the 2D trust
8+
model where effective RAG weight = trust_tier_weight * adversarial_multiplier.
9+
10+
Status transitions:
11+
UNVERIFIED -> VERIFIED (all CIS stages passed)
12+
UNVERIFIED -> SUSPICIOUS (one or more CIS stages flagged)
13+
SUSPICIOUS -> VERIFIED (human reviewer cleared after investigation)
14+
VERIFIED -> SUSPICIOUS (re-assessment flagged new concerns)
15+
"""
16+
17+
from __future__ import annotations
18+
19+
from typing import TYPE_CHECKING
20+
21+
from qp_vault.enums import AdversarialStatus, EventType
22+
from qp_vault.models import VaultEvent
23+
24+
if TYPE_CHECKING:
25+
from qp_vault.protocols import AuditProvider
26+
27+
28+
# Valid adversarial status transitions
29+
VALID_TRANSITIONS: dict[AdversarialStatus, set[AdversarialStatus]] = {
30+
AdversarialStatus.UNVERIFIED: {
31+
AdversarialStatus.VERIFIED,
32+
AdversarialStatus.SUSPICIOUS,
33+
},
34+
AdversarialStatus.VERIFIED: {
35+
AdversarialStatus.SUSPICIOUS,
36+
},
37+
AdversarialStatus.SUSPICIOUS: {
38+
AdversarialStatus.VERIFIED,
39+
},
40+
}
41+
42+
43+
class AdversarialVerifier:
44+
"""Manages the adversarial verification dimension of the 2D trust model.
45+
46+
Args:
47+
auditor: Optional audit provider for recording status changes.
48+
"""
49+
50+
def __init__(self, auditor: AuditProvider | None = None) -> None:
51+
self._auditor = auditor
52+
self._status_store: dict[str, AdversarialStatus] = {}
53+
54+
async def set_status(
55+
self,
56+
resource_id: str,
57+
status: AdversarialStatus,
58+
reason: str = "",
59+
reviewer_id: str | None = None,
60+
) -> AdversarialStatus:
61+
"""Transition a resource's adversarial status.
62+
63+
Args:
64+
resource_id: Vault resource ID.
65+
status: Target adversarial status.
66+
reason: Justification for the transition.
67+
reviewer_id: ID of the reviewer (for human-initiated transitions).
68+
69+
Returns:
70+
The new adversarial status.
71+
72+
Raises:
73+
ValueError: If the transition is not valid.
74+
"""
75+
current = self._status_store.get(resource_id, AdversarialStatus.UNVERIFIED)
76+
77+
if status != current:
78+
allowed = VALID_TRANSITIONS.get(current, set())
79+
if status not in allowed:
80+
msg = f"Invalid transition: {current.value} -> {status.value}"
81+
raise ValueError(msg)
82+
83+
self._status_store[resource_id] = status
84+
85+
# Emit audit event
86+
if self._auditor is not None:
87+
event = VaultEvent(
88+
event_type=EventType.ADVERSARIAL_STATUS_CHANGE,
89+
resource_id=resource_id,
90+
details={
91+
"previous": current.value,
92+
"new": status.value,
93+
"reason": reason,
94+
"reviewer_id": reviewer_id or "",
95+
},
96+
)
97+
await self._auditor.record(event)
98+
99+
return status
100+
101+
async def get_status(self, resource_id: str) -> AdversarialStatus:
102+
"""Get the current adversarial status for a resource.
103+
104+
Args:
105+
resource_id: Vault resource ID.
106+
107+
Returns:
108+
Current adversarial status (UNVERIFIED if unknown).
109+
"""
110+
return self._status_store.get(resource_id, AdversarialStatus.UNVERIFIED)
111+
112+
async def bulk_reassess(
113+
self,
114+
resource_ids: list[str],
115+
status: AdversarialStatus,
116+
reason: str = "",
117+
) -> dict[str, AdversarialStatus]:
118+
"""Reassess multiple resources (e.g., after an attack is confirmed).
119+
120+
Args:
121+
resource_ids: List of resource IDs to reassess.
122+
status: Target status for all resources.
123+
reason: Justification for the bulk reassessment.
124+
125+
Returns:
126+
Dict mapping resource_id to new status. Skips invalid transitions.
127+
"""
128+
results: dict[str, AdversarialStatus] = {}
129+
for rid in resource_ids:
130+
try:
131+
new = await self.set_status(rid, status, reason=reason)
132+
results[rid] = new
133+
except ValueError:
134+
results[rid] = await self.get_status(rid)
135+
return results
136+
137+
async def get_verified_count(self) -> int:
138+
"""Count resources with VERIFIED status."""
139+
return sum(
140+
1 for s in self._status_store.values()
141+
if s == AdversarialStatus.VERIFIED
142+
)
143+
144+
async def get_suspicious_count(self) -> int:
145+
"""Count resources with SUSPICIOUS status."""
146+
return sum(
147+
1 for s in self._status_store.values()
148+
if s == AdversarialStatus.SUSPICIOUS
149+
)

src/qp_vault/cli/main.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -228,7 +228,9 @@ def verify(
228228
if resource_id:
229229
from qp_vault.models import VerificationResult
230230
r = vault.verify(resource_id)
231-
assert isinstance(r, VerificationResult)
231+
if not isinstance(r, VerificationResult):
232+
console.print("[red]Unexpected verification result type[/red]")
233+
raise typer.Exit(1)
232234
if r.passed:
233235
console.print(f"[green]PASS[/green] {resource_id}")
234236
console.print(f" Hash: {r.stored_hash}")
@@ -243,7 +245,9 @@ def verify(
243245
else:
244246
from qp_vault.models import VaultVerificationResult
245247
vr = vault.verify()
246-
assert isinstance(vr, VaultVerificationResult)
248+
if not isinstance(vr, VaultVerificationResult):
249+
console.print("[red]Unexpected verification result type[/red]")
250+
raise typer.Exit(1)
247251
if vr.passed:
248252
console.print("[green]PASS[/green] Vault integrity verified")
249253
console.print(f" Resources: {vr.resource_count}")

src/qp_vault/core/search_engine.py

Lines changed: 62 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -53,16 +53,27 @@
5353

5454

5555
def compute_trust_weight(trust_tier: str, config: VaultConfig | None = None) -> float:
56-
"""Get trust weight multiplier for a given tier."""
56+
"""Get trust weight multiplier for a given tier.
57+
58+
Args:
59+
trust_tier: Trust tier value (e.g., "canonical", "working").
60+
config: Optional VaultConfig with custom trust_weights.
61+
62+
Returns:
63+
Float multiplier (e.g., 1.5 for canonical, 1.0 for working).
64+
"""
5765
weights = config.trust_weights if config else TRUST_WEIGHTS
5866
return weights.get(trust_tier, 1.0)
5967

6068

6169
def compute_adversarial_multiplier(adversarial_status: str) -> float:
6270
"""Get adversarial verification multiplier.
6371
64-
Returns the security-dimension weight for a given adversarial status.
65-
Effective RAG weight = trust_weight * adversarial_multiplier.
72+
Args:
73+
adversarial_status: Adversarial status value ("verified", "unverified", "suspicious").
74+
75+
Returns:
76+
Float multiplier. Effective RAG weight = trust_weight * adversarial_multiplier.
6677
"""
6778
return ADVERSARIAL_MULTIPLIERS.get(adversarial_status, 0.7)
6879

@@ -71,10 +82,48 @@ def is_searchable(status: str) -> bool:
7182
"""Check if a resource with the given status should appear in search results.
7283
7384
QUARANTINED and DELETED resources are excluded by default.
85+
This MUST be checked at every retrieval path, not just search.
86+
87+
Args:
88+
status: ResourceStatus value (e.g., "quarantined", "indexed").
89+
90+
Returns:
91+
True if the resource should appear in results.
7492
"""
7593
return status not in EXCLUDED_STATUSES
7694

7795

96+
def filter_searchable(
97+
results: list[SearchResult],
98+
resource_statuses: dict[str, str] | None = None,
99+
*,
100+
include_quarantined: bool = False,
101+
) -> list[SearchResult]:
102+
"""Defense-in-depth filter: remove non-searchable results from any result list.
103+
104+
Call this as a final safety net on every result set, even if the query
105+
should have already excluded quarantined resources. Belt-and-suspenders.
106+
107+
Uses resource_statuses lookup (resource_id -> ResourceStatus value) to check
108+
whether each result's source resource is searchable. If no lookup is provided,
109+
results pass through (the caller is responsible for pre-filtering).
110+
111+
Args:
112+
results: Search results to filter.
113+
resource_statuses: Mapping of resource_id to ResourceStatus value.
114+
include_quarantined: If True, skip filtering (admin use only).
115+
116+
Returns:
117+
Filtered results with non-searchable resources removed.
118+
"""
119+
if include_quarantined or resource_statuses is None:
120+
return results
121+
return [
122+
r for r in results
123+
if is_searchable(resource_statuses.get(r.resource_id, "indexed"))
124+
]
125+
126+
78127
def compute_freshness(
79128
updated_at: datetime | str | None,
80129
trust_tier: str,
@@ -109,10 +158,17 @@ def apply_trust_weighting(
109158
results: list[SearchResult],
110159
config: VaultConfig | None = None,
111160
) -> list[SearchResult]:
112-
"""Apply trust weights and freshness decay to search results.
161+
"""Apply 2D trust weights and freshness decay to search results.
162+
163+
Computes composite relevance = raw * organizational_trust * adversarial_multiplier * freshness.
164+
Re-sorts results by composite score (highest first).
165+
166+
Args:
167+
results: List of SearchResult objects from storage backend.
168+
config: Optional VaultConfig with custom trust weights and freshness half-lives.
113169
114-
Mutates the relevance, trust_weight, and freshness fields on each result,
115-
then re-sorts by composite relevance score.
170+
Returns:
171+
New list of SearchResult objects with updated relevance, trust_weight, and freshness fields.
116172
"""
117173
weighted: list[SearchResult] = []
118174

src/qp_vault/enums.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,7 @@ class CISStage(StrEnum):
122122
class CISResult(StrEnum):
123123
"""Result of a single CIS stage evaluation."""
124124

125-
PASS = "pass"
125+
PASS = "pass" # nosec B105 — CIS stage result, not a password
126126
"""Content cleared this stage."""
127127

128128
FLAG = "flag"

src/qp_vault/plugins/registry.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -112,8 +112,8 @@ def discover_entry_points(self) -> None:
112112
register_fn(ep.name, instance)
113113
except Exception as e:
114114
logger.warning("Failed to load entry_point plugin %s: %s", ep.name, e)
115-
except Exception:
116-
pass
115+
except Exception as e:
116+
logger.debug("Entry point group %s unavailable: %s", group, e)
117117

118118
def discover_plugins_dir(self, plugins_dir: Path) -> None:
119119
"""Load plugins from a local directory (air-gap mode).

src/qp_vault/provenance.py

Lines changed: 47 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -14,18 +14,38 @@
1414
import hashlib
1515
import json
1616
from datetime import UTC, datetime
17-
from typing import TYPE_CHECKING, Any
17+
from typing import TYPE_CHECKING
1818
from uuid import uuid4
1919

2020
from qp_vault.models import ContentProvenance
2121

2222
if TYPE_CHECKING:
23+
from collections.abc import Awaitable, Callable
24+
2325
from qp_vault.enums import UploadMethod
2426

2527

28+
MAX_ID_LENGTH = 128
29+
MAX_STORED_RECORDS = 50_000
30+
31+
32+
def _validate_id(value: str, name: str) -> str:
33+
"""Validate an ID string: non-empty, bounded length, safe characters."""
34+
if not value or not value.strip():
35+
msg = f"{name} must be non-empty"
36+
raise ValueError(msg)
37+
value = value.strip()
38+
if len(value) > MAX_ID_LENGTH:
39+
msg = f"{name} exceeds max length ({MAX_ID_LENGTH})"
40+
raise ValueError(msg)
41+
return value
42+
43+
2644
class ContentProvenanceService:
2745
"""Creates, verifies, and queries provenance attestations.
2846
47+
Thread-safe for async contexts. Bounded storage (MAX_STORED_RECORDS).
48+
2949
Args:
3050
signing_fn: Async callable that signs bytes and returns a hex signature.
3151
Signature scheme must be Ed25519 or ML-DSA-65.
@@ -35,8 +55,8 @@ class ContentProvenanceService:
3555

3656
def __init__(
3757
self,
38-
signing_fn: Any | None = None,
39-
verify_fn: Any | None = None,
58+
signing_fn: Callable[[bytes], Awaitable[str]] | None = None,
59+
verify_fn: Callable[[bytes, str], Awaitable[bool]] | None = None,
4060
) -> None:
4161
self._signing_fn = signing_fn
4262
self._verify_fn = verify_fn
@@ -63,6 +83,10 @@ async def create_attestation(
6383
Returns:
6484
Signed ContentProvenance record.
6585
"""
86+
# Validate inputs (Finding 8)
87+
resource_id = _validate_id(resource_id, "resource_id")
88+
uploader_id = _validate_id(uploader_id, "uploader_id")
89+
6690
provenance_id = str(uuid4())
6791
now = datetime.now(tz=UTC)
6892

@@ -82,16 +106,32 @@ async def create_attestation(
82106
if self._signing_fn is not None:
83107
signature = await self._signing_fn(canonical)
84108
provenance = provenance.model_copy(
85-
update={
86-
"provenance_signature": signature,
87-
"signature_verified": True,
88-
}
109+
update={"provenance_signature": signature}
89110
)
111+
# Verify the signature we just created (Finding 17: never trust without verifying)
112+
if self._verify_fn is not None:
113+
verified = await self._verify_fn(canonical, signature)
114+
provenance = provenance.model_copy(
115+
update={"signature_verified": verified}
116+
)
117+
else:
118+
# No verify function: mark as signed but unverified
119+
provenance = provenance.model_copy(
120+
update={"signature_verified": False}
121+
)
90122

91123
# Store in memory (production: persisted via storage backend)
92124
self._records[provenance_id] = provenance
93125
self._by_resource.setdefault(resource_id, []).append(provenance_id)
94126

127+
# Enforce bounded storage (Finding 7)
128+
if len(self._records) > MAX_STORED_RECORDS:
129+
oldest_id = next(iter(self._records))
130+
old = self._records.pop(oldest_id)
131+
res_list = self._by_resource.get(old.resource_id, [])
132+
if oldest_id in res_list:
133+
res_list.remove(oldest_id)
134+
95135
return provenance
96136

97137
async def verify_attestation(self, provenance: ContentProvenance) -> bool:

0 commit comments

Comments
 (0)