Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
124 changes: 92 additions & 32 deletions cli/trellis
Original file line number Diff line number Diff line change
Expand Up @@ -267,28 +267,89 @@ def yaml_read_nested(text, parent, field):
return None


def parse_numeric_scalar(value):
"""Parse a YAML scalar that should contain a numeric value."""
if value in (None, "", "null", "{}"):
return None
try:
return float(str(value).strip().strip('"').strip("'"))
except (TypeError, ValueError):
return None


def extract_self_eval_score(text):
"""Read the recorded self-eval score from supported spec shapes."""
for parent in ("self_eval", "perf_eval"):
for field in ("total", "score"):
score = parse_numeric_scalar(yaml_read_nested(text, parent, field))
if score is not None:
return score
score = parse_numeric_scalar(yaml_read_field(text, parent))
if score is not None:
return score

return parse_numeric_scalar(yaml_read_field(text, "score"))


def parse_phase_statuses(text):
"""Parse phase statuses from the phases block without counting unrelated statuses."""
lines = text.splitlines()
statuses = []
in_phases = False
i = 0

while i < len(lines):
line = lines[i]
stripped = line.strip()
indent = len(line) - len(line.lstrip())

if not in_phases:
if re.match(r'^phases:\s*$', line):
in_phases = True
i += 1
continue

if stripped and indent == 0:
break

match = re.match(r'^\s+-\s+id:\s*"?(phase\d+)"?', line)
if not match:
i += 1
continue

item_indent = len(match.group(0)) - len(match.group(0).lstrip())
status = "pending"
i += 1

while i < len(lines):
field_line = lines[i]
field_stripped = field_line.strip()
field_indent = len(field_line) - len(field_line.lstrip())

if field_stripped and field_indent <= item_indent:
break

status_match = re.match(
r'^\s+status:\s*"?(pending|in_progress|completed|failed|skipped)"?',
field_line,
)
if status_match:
status = status_match.group(1)

i += 1

statuses.append(status)

return statuses


def count_phases(text):
"""Count phases and their statuses."""
total = 0
completed = 0
failed = 0
in_progress = 0
for m in re.finditer(r'^\s+-\s+id:\s*"?phase\d+"?', text, re.MULTILINE):
total += 1
for m in re.finditer(r'^\s+status:\s*"?completed"?', text, re.MULTILINE):
completed += 1
for m in re.finditer(r'^\s+status:\s*"?failed"?', text, re.MULTILINE):
failed += 1
for m in re.finditer(r'^\s+status:\s*"?in_progress"?', text, re.MULTILINE):
in_progress += 1
# Subtract the top-level status from phase counts
top_status = yaml_read_field(text, "status")
if top_status == "completed":
completed = max(0, completed - 1)
elif top_status == "failed":
failed = max(0, failed - 1)
elif top_status == "in_progress":
in_progress = max(0, in_progress - 1)
phase_statuses = parse_phase_statuses(text)
total = len(phase_statuses)
completed = sum(1 for status in phase_statuses if status == "completed")
failed = sum(1 for status in phase_statuses if status == "failed")
in_progress = sum(1 for status in phase_statuses if status == "in_progress")
return total, completed, failed, in_progress


Expand Down Expand Up @@ -1073,18 +1134,16 @@ def cmd_start(args):

def check_self_eval(text, task_id):
"""Warn if self-eval looks like a rubber stamp."""
# Look for self_eval or perf_eval score
score_match = re.search(r'(?:self_eval|perf_eval|score):\s*(\d+)', text)
if not score_match:
score = extract_self_eval_score(text)
if score is None:
print(f" {c(C_YELLOW, 'warn')}: no self-eval score found in spec")
return

score = int(score_match.group(1))
has_deviations = bool(re.search(r'deviations:', text, re.MULTILINE))
has_improvements = bool(re.search(r'improvements:', text, re.MULTILINE))
score_display = int(score) if float(score).is_integer() else score

if score >= 9 and not has_deviations and not has_improvements:
print(f" {c(C_YELLOW, 'warn')}: self-eval {score}/10 with no deviations or improvements noted")
print(f" {c(C_YELLOW, 'warn')}: self-eval {score_display}/10 with no deviations or improvements noted")
print(f" scores above 8 should document at least one deviation or improvement")
elif score == 10:
print(f" {c(C_YELLOW, 'note')}: perfect 10/10 - are you sure? 10 means flawless with improvements beyond spec")
Expand Down Expand Up @@ -1112,7 +1171,7 @@ def cmd_complete(args):
text = spec.read_text()

# Check for exec results - warn if no criteria were executed
has_results = bool(re.search(r'result:\s*"?(pass|fail)"?', text, re.MULTILINE))
has_results = any(ac.get("result") in ("pass", "fail") for ac in parse_acceptance_criteria(text))
if not has_results:
print(f" {c(C_YELLOW, 'warn')}: no exec results recorded. Run '{c(C_BOLD, f'trellis exec {args.task_id}')}' first")

Expand Down Expand Up @@ -2333,13 +2392,14 @@ def cmd_report(args):
risks[risk] = risks.get(risk, 0) + 1

# Self-eval scores
score_match = re.search(r'total:\s*(\d+)', text)
if score_match:
self_eval_scores.append(int(score_match.group(1)))
score = extract_self_eval_score(text)
if score is not None:
self_eval_scores.append(score)

# Exec results
passes = len(re.findall(r'result:\s*"?pass"?', text))
fails = len(re.findall(r'result:\s*"?fail"?', text))
criteria = parse_acceptance_criteria(text)
passes = sum(1 for ac in criteria if ac.get("result") == "pass")
fails = sum(1 for ac in criteria if ac.get("result") == "fail")
if passes or fails:
exec_pass += passes
exec_fail += fails
Expand Down
Loading
Loading