Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions docs/devlog/2026-02-11_0900_legacy_policy_loop_strategy.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
# 2026-02-11 09:00 — Legacy policy loop strategy

## Summary
- Added `LegacyPolicyLoopStrategy` to run legacy-mapped specs through the unified attempt pipeline while supporting either structured legacy patch plans or policy-driven patches.
- Reused shared legacy patch parsing utilities so netlist-style outputs are converted to param-only patches before evaluation, keeping guard and topology invariants intact.
- Kept legacy-compatible history/report outputs via the legacy report operator, including consistent stop reasons and sim run accounting.

## Why
- Legacy workflows can supply either a sequence of patch steps or policy-generated updates; this strategy keeps both paths on the same guard/attempt pipeline.
- Prevents full-netlist rewrites by mapping netlist outputs into patch-only updates, preserving safety rules.

## How to use
- Provide a mapped `CircuitSpec` (legacy targets may be included in `spec.notes`) and, optionally, `legacy_patch_plan` steps in `spec.notes` or `cfg.notes`.
- If no `legacy_patch_plan` is supplied, configure the strategy with a policy (LLM or heuristic) to propose patches.
- Netlist-style steps are converted into param-space `Patch` ops before execution.

## Files
- `src/eesizer_core/strategies/legacy_policy_loop.py`
- `src/eesizer_core/strategies/legacy_patch_utils.py`
- `src/eesizer_core/strategies/legacy_patch_plan.py`
- `src/eesizer_core/strategies/__init__.py`
2 changes: 2 additions & 0 deletions src/eesizer_core/strategies/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from .baseline_noopt import NoOptBaselineStrategy
from .corner_search import CornerSearchStrategy
from .grid_search import GridSearchStrategy
from .legacy_policy_loop import LegacyPolicyLoopStrategy
from .legacy_patch_plan import LegacyPatchPlanStrategy
from .multi_agent_orchestrator import MultiAgentOrchestratorStrategy
from .interactive_session import InteractiveSessionStrategy
Expand All @@ -13,6 +14,7 @@
"GridSearchStrategy",
"InteractiveSessionStrategy",
"LegacyPatchPlanStrategy",
"LegacyPolicyLoopStrategy",
"MultiAgentOrchestratorStrategy",
"NoOptBaselineStrategy",
"PatchLoopStrategy",
Expand Down
144 changes: 15 additions & 129 deletions src/eesizer_core/strategies/legacy_patch_plan.py
Original file line number Diff line number Diff line change
@@ -1,25 +1,15 @@
from __future__ import annotations

from dataclasses import dataclass
from typing import Any, Mapping, Sequence

from ..contracts import (
CircuitSource,
CircuitSpec,
MetricsBundle,
Patch,
PatchOp,
RunResult,
SimPlan,
StrategyConfig,
)
from ..contracts.enums import PatchOpType, StopReason
from typing import Any

from ..contracts import CircuitSource, CircuitSpec, MetricsBundle, Patch, RunResult, SimPlan, StrategyConfig
from ..contracts.enums import StopReason
from ..contracts.errors import ValidationError
from ..contracts.guards import GuardCheck, GuardReport
from ..contracts.guards import GuardReport
from ..contracts.provenance import stable_hash_json, stable_hash_str
from ..contracts.strategy import Strategy
from ..domain.spice.params import ParamInferenceRules, infer_param_space_from_ir
from ..domain.spice.patching import extract_param_values
from ..metrics import ComputeMetricsOperator, MetricRegistry, DEFAULT_REGISTRY
from ..operators.guards import (
BehaviorGuardOperator,
Expand All @@ -45,121 +35,17 @@
)
from ..sim import DeckBuildOperator, NgspiceRunOperator
from .attempt_pipeline import AttemptOperators, run_attempt
from .legacy_patch_utils import (
extract_patch_plan,
guard_failure_report,
netlist_to_patch,
parse_patch_step,
)
from .patch_loop.evaluate import MeasureFn, evaluate_metrics, run_baseline
from .patch_loop.planning import group_metric_names_by_kind, extract_sim_plan
from .patch_loop.state import LoopResult, PatchLoopState


def _extract_patch_plan(notes: Mapping[str, Any]) -> list[Mapping[str, Any]]:
raw = notes.get("legacy_patch_plan")
if isinstance(raw, Mapping):
steps = raw.get("steps")
else:
steps = raw
if not isinstance(steps, Sequence) or isinstance(steps, (str, bytes, bytearray)):
return []
return [step for step in steps if isinstance(step, Mapping)]


def _parse_patch_ops(raw_ops: Sequence[Any]) -> list[PatchOp]:
ops: list[PatchOp] = []
for op in raw_ops:
if not isinstance(op, Mapping):
raise ValidationError("patch ops must be objects")
param = op.get("param")
op_type = op.get("op")
value = op.get("value")
why = op.get("why", "")
if not isinstance(param, str) or not param:
raise ValidationError("patch op missing param")
if isinstance(op_type, PatchOpType):
patch_op = op_type
elif isinstance(op_type, str):
try:
patch_op = PatchOpType(op_type)
except ValueError as exc:
raise ValidationError(f"unknown patch op '{op_type}'") from exc
else:
raise ValidationError("patch op missing op")
ops.append(PatchOp(param=param, op=patch_op, value=value, why=str(why)))
return ops


def _parse_patch_step(step: Mapping[str, Any]) -> Patch:
if "patch" in step and isinstance(step.get("patch"), Mapping):
payload = step.get("patch")
else:
payload = step
if not isinstance(payload, Mapping):
raise ValidationError("patch step must be a mapping")
raw_ops = payload.get("ops")
if raw_ops is None:
raw_ops = payload.get("patch")
if raw_ops is None:
raise ValidationError("patch step missing ops")
if not isinstance(raw_ops, Sequence) or isinstance(raw_ops, (str, bytes, bytearray)):
raise ValidationError("patch ops must be a list")
ops = _parse_patch_ops(raw_ops)
stop = bool(payload.get("stop", False))
notes = str(payload.get("notes", ""))
return Patch(ops=tuple(ops), stop=stop, notes=notes)


def _netlist_to_patch(
netlist_text: str,
*,
signature_op: Any,
current_signature: str,
current_ir: Any,
param_ids: Sequence[str],
include_paths: bool,
max_lines: int,
step_notes: str,
) -> tuple[Patch, list[str]]:
sig_res = signature_op.run(
{"netlist_text": netlist_text, "include_paths": include_paths, "max_lines": max_lines},
ctx=None,
)
new_signature = sig_res.outputs["signature"]
new_ir = sig_res.outputs["circuit_ir"]
warnings = list(sig_res.warnings)

if new_signature != current_signature:
raise ValidationError("netlist topology mismatch; use patch/structured plan")

current_vals, current_errors = extract_param_values(current_ir, param_ids=param_ids)
new_vals, new_errors = extract_param_values(new_ir, param_ids=param_ids)
warnings.extend(current_errors)
warnings.extend(new_errors)

ops: list[PatchOp] = []
for param_id in param_ids:
pid = param_id.lower()
if pid not in current_vals or pid not in new_vals:
continue
before = current_vals[pid]
after = new_vals[pid]
if abs(after - before) <= max(1e-12, abs(before) * 1e-9):
continue
ops.append(PatchOp(param=pid, op=PatchOpType.set, value=after, why=step_notes))

if not ops:
return Patch(stop=True, notes="netlist_no_param_updates"), warnings

return Patch(ops=tuple(ops), stop=False, notes=step_notes), warnings


def _guard_failure_report(
guard_chain_op: Any,
reason: str,
recorder: RunRecorder | None,
) -> GuardReport:
check = GuardCheck(name="topology_guard", ok=False, severity="hard", reasons=(reason,))
report_res = guard_chain_op.run({"checks": [check]}, ctx=None)
record_operator_result(recorder, report_res)
return report_res.outputs["report"]


@dataclass
class LegacyPatchPlanStrategy(Strategy):
"""Execute a structured legacy patch plan using the unified attempt pipeline."""
Expand Down Expand Up @@ -398,7 +284,7 @@ def run(self, spec: CircuitSpec, source: CircuitSource, ctx: Any, cfg: StrategyC
if stop_reason is StopReason.reached_target:
return self._finalize_run(history, state, stop_reason, recorder, manifest)

plan_steps = _extract_patch_plan(spec.notes) or _extract_patch_plan(cfg.notes)
plan_steps = extract_patch_plan(spec.notes) or extract_patch_plan(cfg.notes)
max_iters = cfg.budget.max_iterations
attempt_ops = AttemptOperators(
patch_guard_op=self.patch_guard_op,
Expand Down Expand Up @@ -432,7 +318,7 @@ def run(self, spec: CircuitSpec, source: CircuitSource, ctx: Any, cfg: StrategyC
netlist_text = step.get("netlist_text") or step.get("netlist")
if not isinstance(netlist_text, str):
raise ValidationError("netlist_text must be a string")
patch, warnings_i = _netlist_to_patch(
patch, warnings_i = netlist_to_patch(
netlist_text,
signature_op=self.signature_op,
current_signature=state.current_signature,
Expand All @@ -443,9 +329,9 @@ def run(self, spec: CircuitSpec, source: CircuitSource, ctx: Any, cfg: StrategyC
step_notes=str(step.get("notes", "netlist_plan")),
)
else:
patch = _parse_patch_step(step)
patch = parse_patch_step(step)
except ValidationError as exc:
guard_report = _guard_failure_report(self.guard_chain_op, str(exc), recorder)
guard_report = guard_failure_report(self.guard_chain_op, str(exc), recorder)
errors = guard_failures(guard_report)
history.append(
{
Expand Down
122 changes: 122 additions & 0 deletions src/eesizer_core/strategies/legacy_patch_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
from __future__ import annotations

from typing import Any, Mapping, Sequence

from ..contracts import Patch, PatchOp
from ..contracts.enums import PatchOpType
from ..contracts.errors import ValidationError
from ..contracts.guards import GuardCheck, GuardReport
from ..domain.spice.patching import extract_param_values
from ..operators.netlist import TopologySignatureOperator
from ..runtime.recorder import RunRecorder
from ..runtime.recording_utils import record_operator_result


def extract_patch_plan(notes: Mapping[str, Any]) -> list[Mapping[str, Any]]:
raw = notes.get("legacy_patch_plan")
if isinstance(raw, Mapping):
steps = raw.get("steps")
else:
steps = raw
if not isinstance(steps, Sequence) or isinstance(steps, (str, bytes, bytearray)):
return []
return [step for step in steps if isinstance(step, Mapping)]


def parse_patch_ops(raw_ops: Sequence[Any]) -> list[PatchOp]:
ops: list[PatchOp] = []
for op in raw_ops:
if not isinstance(op, Mapping):
raise ValidationError("patch ops must be objects")
param = op.get("param")
op_type = op.get("op")
value = op.get("value")
why = op.get("why", "")
if not isinstance(param, str) or not param:
raise ValidationError("patch op missing param")
if isinstance(op_type, PatchOpType):
patch_op = op_type
elif isinstance(op_type, str):
try:
patch_op = PatchOpType(op_type)
except ValueError as exc:
raise ValidationError(f"unknown patch op '{op_type}'") from exc
else:
raise ValidationError("patch op missing op")
ops.append(PatchOp(param=param, op=patch_op, value=value, why=str(why)))
return ops


def parse_patch_step(step: Mapping[str, Any]) -> Patch:
if "patch" in step and isinstance(step.get("patch"), Mapping):
payload = step.get("patch")
else:
payload = step
if not isinstance(payload, Mapping):
raise ValidationError("patch step must be a mapping")
raw_ops = payload.get("ops")
if raw_ops is None:
raw_ops = payload.get("patch")
if raw_ops is None:
raise ValidationError("patch step missing ops")
if not isinstance(raw_ops, Sequence) or isinstance(raw_ops, (str, bytes, bytearray)):
raise ValidationError("patch ops must be a list")
ops = parse_patch_ops(raw_ops)
stop = bool(payload.get("stop", False))
notes = str(payload.get("notes", ""))
return Patch(ops=tuple(ops), stop=stop, notes=notes)


def netlist_to_patch(
netlist_text: str,
*,
signature_op: TopologySignatureOperator,
current_signature: str,
current_ir: Any,
param_ids: Sequence[str],
include_paths: bool,
max_lines: int,
step_notes: str,
) -> tuple[Patch, list[str]]:
sig_res = signature_op.run(
{"netlist_text": netlist_text, "include_paths": include_paths, "max_lines": max_lines},
ctx=None,
)
new_signature = sig_res.outputs["signature"]
new_ir = sig_res.outputs["circuit_ir"]
warnings = list(sig_res.warnings)

if new_signature != current_signature:
raise ValidationError("netlist topology mismatch; use patch/structured plan")

current_vals, current_errors = extract_param_values(current_ir, param_ids=param_ids)
new_vals, new_errors = extract_param_values(new_ir, param_ids=param_ids)
warnings.extend(current_errors)
warnings.extend(new_errors)

ops: list[PatchOp] = []
for param_id in param_ids:
pid = param_id.lower()
if pid not in current_vals or pid not in new_vals:
continue
before = current_vals[pid]
after = new_vals[pid]
if abs(after - before) <= max(1e-12, abs(before) * 1e-9):
continue
ops.append(PatchOp(param=pid, op=PatchOpType.set, value=after, why=step_notes))

if not ops:
return Patch(stop=True, notes="netlist_no_param_updates"), warnings

return Patch(ops=tuple(ops), stop=False, notes=step_notes), warnings


def guard_failure_report(
guard_chain_op: Any,
reason: str,
recorder: RunRecorder | None,
) -> GuardReport:
check = GuardCheck(name="topology_guard", ok=False, severity="hard", reasons=(reason,))
report_res = guard_chain_op.run({"checks": [check]}, ctx=None)
record_operator_result(recorder, report_res)
return report_res.outputs["report"]
Loading
Loading