-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathprotocol_validators.py
More file actions
261 lines (209 loc) · 9.57 KB
/
protocol_validators.py
File metadata and controls
261 lines (209 loc) · 9.57 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
"""Protocol Validator Pipeline — deterministic pre-settlement checks.
Extends the dispute engine with configurable validators that skills
define in their metadata. Each validator is a pure function: given the
output and a config, it returns pass/fail with no ambiguity, no LLM,
and no cost.
Validators run automatically before settlement. If any fails, the
escrow is auto-refunded with reason VALIDATOR_FAILED and the specific
validator name logged.
Supported validator types:
- schema: JSON Schema Draft-07 (already in dispute_engine)
- length: word/char count bounds on a field
- language: detected language matches expected
- contains: required substrings present
- not_contains: forbidden patterns absent
- non_empty: specified fields are not blank
- regex: field matches a regex pattern
- json_path: value at a JSON path meets a condition
Adding a new type: write a function with signature
``(output: dict, config: dict) -> (bool, str | None)`` and register
it in the ``VALIDATOR_TYPES`` dict.
**Design rationale:** Deterministic validators (as opposed to LLM-based
evaluation) follow Bolton, Katok & Ockenfels (2004) finding that
objective, verifiable quality signals produce higher market efficiency
than subjective review. The verify operation in the Agentic Economy
Interface Specification (agenticeconomy.dev, Section 5, Operation 7)
requires validators to be pure functions with identical results on
identical inputs — no LLM calls, no network requests (except webhook
type). See whitepaper Section 10.8 (Quality Markets) for the 4-layer
quality assurance architecture.
"""
import json
import re
import logging
from typing import Optional
logger = logging.getLogger("botnode.validators.protocol")
# ---------------------------------------------------------------------------
# Individual validator functions
# Each returns (passed: bool, error_message: str | None)
# ---------------------------------------------------------------------------
def _validate_length(output: dict, config: dict) -> tuple[bool, Optional[str]]:
"""Check word or character count of a field."""
field = config.get("field", "")
value = _extract_field(output, field)
if value is None:
return (False, f"Field '{field}' not found in output")
text = str(value)
words = len(text.split())
min_words = config.get("min_words")
max_words = config.get("max_words")
min_chars = config.get("min_chars")
max_chars = config.get("max_chars")
if min_words and words < min_words:
return (False, f"Field '{field}' has {words} words, minimum is {min_words}")
if max_words and words > max_words:
return (False, f"Field '{field}' has {words} words, maximum is {max_words}")
if min_chars and len(text) < min_chars:
return (False, f"Field '{field}' has {len(text)} chars, minimum is {min_chars}")
if max_chars and len(text) > max_chars:
return (False, f"Field '{field}' has {len(text)} chars, maximum is {max_chars}")
return (True, None)
def _validate_language(output: dict, config: dict) -> tuple[bool, Optional[str]]:
"""Detect language of a text field and compare to expected."""
field = config.get("field", "")
expected = config.get("expected", "")
value = _extract_field(output, field)
if value is None:
return (False, f"Field '{field}' not found in output")
try:
from langdetect import detect
detected = detect(str(value))
except ImportError:
logger.warning("langdetect not installed — skipping language validator")
return (True, None) # fail-open if library missing
except Exception:
return (True, None) # fail-open on detection errors
if detected != expected:
return (False, f"Field '{field}' detected as '{detected}', expected '{expected}'")
return (True, None)
def _validate_contains(output: dict, config: dict) -> tuple[bool, Optional[str]]:
"""Check that required substrings are present."""
field = config.get("field")
patterns = config.get("patterns", [])
text = str(_extract_field(output, field) if field else json.dumps(output))
for pattern in patterns:
if pattern not in text:
return (False, f"Required pattern '{pattern}' not found in {'field ' + field if field else 'output'}")
return (True, None)
def _validate_not_contains(output: dict, config: dict) -> tuple[bool, Optional[str]]:
"""Check that forbidden patterns are absent."""
field = config.get("field")
patterns = config.get("patterns", [])
text = str(_extract_field(output, field) if field else json.dumps(output))
for pattern in patterns:
if pattern.lower() in text.lower():
return (False, f"Forbidden pattern '{pattern}' found in {'field ' + field if field else 'output'}")
return (True, None)
def _validate_non_empty(output: dict, config: dict) -> tuple[bool, Optional[str]]:
"""Check that specified fields are not empty/blank."""
fields = config.get("fields", [])
for field in fields:
value = _extract_field(output, field)
if value is None or (isinstance(value, str) and value.strip() == ""):
return (False, f"Field '{field}' is empty or missing")
if isinstance(value, (list, dict)) and len(value) == 0:
return (False, f"Field '{field}' is empty collection")
return (True, None)
def _validate_regex(output: dict, config: dict) -> tuple[bool, Optional[str]]:
"""Check that a field matches a regex pattern."""
field = config.get("field", "")
pattern = config.get("pattern", "")
value = _extract_field(output, field)
if value is None:
return (False, f"Field '{field}' not found")
try:
compiled = re.compile(pattern)
if not compiled.search(str(value)[:10000]):
return (False, f"Field '{field}' does not match pattern '{pattern}'")
except re.error:
return (False, f"Regex pattern invalid")
return (True, None)
def _validate_json_path(output: dict, config: dict) -> tuple[bool, Optional[str]]:
"""Check a value at a specific path meets a condition."""
path = config.get("path", "")
value = _extract_nested(output, path)
if value is None:
return (False, f"Path '{path}' not found in output")
# Check enum
enum = config.get("enum")
if enum and value not in enum:
return (False, f"Value at '{path}' is '{value}', expected one of {enum}")
# Check range
min_val = config.get("min")
max_val = config.get("max")
if min_val is not None and value < min_val:
return (False, f"Value at '{path}' is {value}, minimum is {min_val}")
if max_val is not None and value > max_val:
return (False, f"Value at '{path}' is {value}, maximum is {max_val}")
return (True, None)
# ---------------------------------------------------------------------------
# Registry
# ---------------------------------------------------------------------------
VALIDATOR_TYPES = {
"length": _validate_length,
"language": _validate_language,
"contains": _validate_contains,
"not_contains": _validate_not_contains,
"non_empty": _validate_non_empty,
"regex": _validate_regex,
"json_path": _validate_json_path,
}
"""Map of validator type names to their implementation functions.
``schema`` is handled separately by the dispute engine."""
# ---------------------------------------------------------------------------
# Pipeline runner
# ---------------------------------------------------------------------------
def run_protocol_validators(
output: dict,
validators: list[dict],
) -> tuple[bool, Optional[str], Optional[dict]]:
"""Run a sequence of protocol validators against task output.
Args:
output: The task's output_data.
validators: List of validator configs from skill metadata.
Each has at minimum a ``type`` key.
Returns:
(all_passed, failed_validator_type, error_details)
"""
if not validators:
return (True, None, None)
for v in validators:
vtype = v.get("type", "")
# schema is handled by the dispute engine, skip here
if vtype == "schema":
continue
handler = VALIDATOR_TYPES.get(vtype)
if not handler:
logger.warning(f"Unknown validator type: {vtype} — skipping")
continue
try:
passed, error = handler(output, v)
if not passed:
return (
False,
f"VALIDATOR_FAILED:{vtype}",
{"validator_type": vtype, "config": v, "error": error},
)
except Exception as exc:
logger.error(f"Validator {vtype} raised exception: {exc}")
# fail-open on validator errors to avoid blocking settlement
continue
return (True, None, None)
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _extract_field(output: dict, field: str):
"""Extract a top-level field from output, or the full output if field is empty."""
if not field:
return json.dumps(output) if isinstance(output, dict) else str(output)
return output.get(field) if isinstance(output, dict) else None
def _extract_nested(data: dict, path: str):
"""Extract a value from a nested dict using dot notation."""
parts = path.split(".")
current = data
for part in parts:
if isinstance(current, dict) and part in current:
current = current[part]
else:
return None
return current