diff --git a/.gitignore b/.gitignore
index 2b94a6f..3543b3c 100644
--- a/.gitignore
+++ b/.gitignore
@@ -19,6 +19,9 @@ _site/
*.pyc
__pycache__/
+# Generated local policy bundle artifacts
+implementations/acr-control-plane/var/policy_bundles/
+
# Optional: uncomment if using a doc generator
# node_modules/
# .venv/
diff --git a/implementations/acr-control-plane/.env.production.example b/implementations/acr-control-plane/.env.production.example
index 997ef4b..0dc4cd5 100644
--- a/implementations/acr-control-plane/.env.production.example
+++ b/implementations/acr-control-plane/.env.production.example
@@ -19,6 +19,7 @@ JWT_ALGORITHM=HS256
JWT_TOKEN_EXPIRE_MINUTES=30
KILLSWITCH_SECRET=CHANGE_ME
+AUDIT_SIGNING_SECRET=CHANGE_ME
OPERATOR_SESSION_SECRET=CHANGE_ME
WEBHOOK_HMAC_SECRET=CHANGE_ME
EXECUTOR_HMAC_SECRET=CHANGE_ME
diff --git a/implementations/acr-control-plane/pyproject.toml b/implementations/acr-control-plane/pyproject.toml
index 0888270..b2e3fa3 100644
--- a/implementations/acr-control-plane/pyproject.toml
+++ b/implementations/acr-control-plane/pyproject.toml
@@ -4,7 +4,7 @@ build-backend = "hatchling.build"
[project]
name = "acr-control-plane"
-version = "1.0.0"
+version = "1.0.1"
description = "ACR Reference Control Plane — governance gateway for autonomous AI systems"
readme = "README.md"
requires-python = ">=3.11"
diff --git a/implementations/acr-control-plane/scripts/generate_secrets.py b/implementations/acr-control-plane/scripts/generate_secrets.py
index 7f018f3..2f59a21 100644
--- a/implementations/acr-control-plane/scripts/generate_secrets.py
+++ b/implementations/acr-control-plane/scripts/generate_secrets.py
@@ -16,6 +16,7 @@ def build_secret_bundle() -> dict[str, str]:
return {
"JWT_SECRET_KEY": _token_hex(),
"KILLSWITCH_SECRET": _token_hex(),
+ "AUDIT_SIGNING_SECRET": _token_hex(),
"OPERATOR_SESSION_SECRET": _token_hex(),
"WEBHOOK_HMAC_SECRET": _token_hex(),
"EXECUTOR_HMAC_SECRET": _token_hex(),
diff --git a/implementations/acr-control-plane/src/acr/__init__.py b/implementations/acr-control-plane/src/acr/__init__.py
index a848e09..f7d7e45 100644
--- a/implementations/acr-control-plane/src/acr/__init__.py
+++ b/implementations/acr-control-plane/src/acr/__init__.py
@@ -1,3 +1,3 @@
# ACR Control Plane — Reference Implementation
# Apache 2.0 License
-__version__ = "1.0.0"
+__version__ = "1.0.1"
diff --git a/implementations/acr-control-plane/src/acr/common/errors.py b/implementations/acr-control-plane/src/acr/common/errors.py
index 5122e28..4da89a2 100644
--- a/implementations/acr-control-plane/src/acr/common/errors.py
+++ b/implementations/acr-control-plane/src/acr/common/errors.py
@@ -84,6 +84,16 @@ class PolicyEngineError(ACRError):
error_code = "POLICY_ENGINE_ERROR"
+class RuntimeControlDependencyError(ACRError):
+ status_code = 503
+ error_code = "RUNTIME_CONTROL_DEPENDENCY_ERROR"
+
+
+class AuthoritativeSpendControlError(ACRError):
+ status_code = 503
+ error_code = "AUTHORITATIVE_SPEND_CONTROL_ERROR"
+
+
# ── Approval ──────────────────────────────────────────────────────────────────
class ApprovalPendingError(ACRError):
diff --git a/implementations/acr-control-plane/src/acr/config.py b/implementations/acr-control-plane/src/acr/config.py
index 38f5726..fb57e21 100644
--- a/implementations/acr-control-plane/src/acr/config.py
+++ b/implementations/acr-control-plane/src/acr/config.py
@@ -31,7 +31,7 @@ class Settings(BaseSettings):
# ── Runtime ───────────────────────────────────────────────────────────────
acr_env: str = "development"
log_level: str = "INFO"
- acr_version: str = "1.0"
+ acr_version: str = "1.0.1"
schema_bootstrap_mode: str = "auto"
strict_dependency_startup: bool = False
@@ -47,6 +47,7 @@ class Settings(BaseSettings):
# HMAC-SHA256 key for signing outbound webhook payloads.
# Receivers verify the X-ACR-Signature header to confirm authenticity.
webhook_hmac_secret: str = ""
+ audit_signing_secret: str = "dev_audit_signing_secret_change_me"
# ── OpenTelemetry ─────────────────────────────────────────────────────────
otel_exporter_otlp_endpoint: str = ""
@@ -109,6 +110,7 @@ class Settings(BaseSettings):
"changeme",
"password",
"jwt_secret",
+ "dev_audit_signing_secret_change_me",
}
_MIN_KEY_BYTES = 32 # 256 bits — minimum for HS256
@@ -148,6 +150,12 @@ def assert_production_secrets() -> None:
f"'{settings.acr_env}'. Set a strong random secret before deploying."
)
+ if settings.audit_signing_secret in _WEAK_KEYS or len(settings.audit_signing_secret.encode()) < _MIN_KEY_BYTES:
+ raise RuntimeError(
+ "AUDIT_SIGNING_SECRET must be set to a strong value before deploying "
+ "outside development/test."
+ )
+
if settings.execute_allowed_actions and len(settings.executor_hmac_secret.encode()) < _MIN_KEY_BYTES:
raise RuntimeError(
"EXECUTOR_HMAC_SECRET must be set to a strong value before enabling "
@@ -165,6 +173,12 @@ def assert_production_secrets() -> None:
"OPERATOR_API_KEYS_JSON must be set in non-development environments."
)
+ if not settings.service_operator_api_key:
+ raise RuntimeError(
+ "SERVICE_OPERATOR_API_KEY must be set in non-development environments "
+ "so the gateway can authenticate to the independent kill-switch service."
+ )
+
if settings.oidc_enabled:
if settings.operator_session_secret in _WEAK_KEYS or len(settings.operator_session_secret.encode()) < _MIN_KEY_BYTES:
raise RuntimeError(
@@ -199,6 +213,11 @@ def effective_schema_bootstrap_mode() -> str:
return "create" if settings.acr_env in ("development", "test") else "validate"
+def runtime_dependencies_fail_closed() -> bool:
+ """Enterprise posture: fail closed outside development/test."""
+ return settings.strict_dependency_startup or settings.acr_env not in ("development", "test")
+
+
@lru_cache(maxsize=1)
def operator_api_keys() -> dict[str, dict]:
if not settings.operator_api_keys_json:
diff --git a/implementations/acr-control-plane/src/acr/db/migrations/versions/0011_monthly_partitioning.py b/implementations/acr-control-plane/src/acr/db/migrations/versions/0011_monthly_partitioning.py
index 9fc79f7..70d9f17 100644
--- a/implementations/acr-control-plane/src/acr/db/migrations/versions/0011_monthly_partitioning.py
+++ b/implementations/acr-control-plane/src/acr/db/migrations/versions/0011_monthly_partitioning.py
@@ -6,7 +6,10 @@
"""
from __future__ import annotations
+from datetime import date
+
from alembic import op
+import sqlalchemy as sa
revision = "0011"
down_revision = "0010"
@@ -14,13 +17,84 @@
depends_on = None
+def _first_of_month(value: date) -> date:
+ return value.replace(day=1)
+
+
+def _add_months(value: date, months: int) -> date:
+ month_index = (value.year * 12 + value.month - 1) + months
+ year = month_index // 12
+ month = month_index % 12 + 1
+ return date(year, month, 1)
+
+
+def _partition_bounds(conn, table: str) -> tuple[date, date]:
+ result = conn.execute(
+ sa.text(
+ f"SELECT MIN(created_at)::date AS min_created, "
+ f"MAX(created_at)::date AS max_created FROM {table}"
+ )
+ )
+ row = result.fetchone()
+ today = date.today()
+ min_created = row.min_created if row and row.min_created else today
+ max_created = row.max_created if row and row.max_created else today
+
+ start = _first_of_month(min_created)
+ # Keep a forward partition horizon even for mostly historical data.
+ end = _add_months(_first_of_month(max_created), 4)
+ return start, end
+
+
+def _create_partitioned_copy(conn, table: str, tmp: str) -> None:
+ # Do not copy constraints or indexes onto the partitioned parent; Postgres
+ # rejects inherited PK/UNIQUE constraints that do not include created_at.
+ op.execute(
+ f"CREATE TABLE {tmp} ("
+ f"LIKE {table} INCLUDING DEFAULTS INCLUDING GENERATED "
+ f"INCLUDING IDENTITY INCLUDING STORAGE INCLUDING COMMENTS"
+ f") PARTITION BY RANGE (created_at)"
+ )
+
+ start, end = _partition_bounds(conn, table)
+ cursor = start
+ while cursor < end:
+ next_month = _add_months(cursor, 1)
+ suffix = f"y{cursor.year:04d}m{cursor.month:02d}"
+ part_name = f"{table}_{suffix}"
+ op.execute(
+ f"CREATE TABLE {part_name} PARTITION OF {tmp} "
+ f"FOR VALUES FROM ('{cursor.isoformat()}') TO ('{next_month.isoformat()}')"
+ )
+ cursor = next_month
+
+ op.execute(f"CREATE TABLE {table}_default PARTITION OF {tmp} DEFAULT")
+
+
+def _create_indexes(table: str) -> None:
+ if table == "telemetry_events":
+ op.create_index("ix_telemetry_events_event_id", table, ["event_id"])
+ op.create_index("ix_telemetry_events_correlation_id", table, ["correlation_id"])
+ op.create_index("ix_telemetry_events_agent_id", table, ["agent_id"])
+ op.create_index("ix_telemetry_events_created_at", table, ["created_at"])
+ return
+
+ if table == "drift_metrics":
+ op.create_index("ix_drift_metrics_agent_id", table, ["agent_id"])
+ op.create_index("ix_drift_metrics_created_at", table, ["created_at"])
+ op.create_index("ix_drift_metrics_agent_created", table, ["agent_id", "created_at"])
+ return
+
+ raise ValueError(f"unsupported partitioned table: {table}")
+
+
def upgrade() -> None:
conn = op.get_bind()
for table in ("telemetry_events", "drift_metrics"):
# Check if table is already partitioned (Postgres-specific)
result = conn.execute(
- __import__("sqlalchemy").text(
+ sa.text(
"SELECT c.relkind FROM pg_class c "
"JOIN pg_namespace n ON n.oid = c.relnamespace "
"WHERE c.relname = :tbl AND n.nspname = 'public'"
@@ -34,35 +108,16 @@ def upgrade() -> None:
tmp = f"{table}_new"
- # 1. Create partitioned table with same schema
- op.execute(
- f"CREATE TABLE {tmp} (LIKE {table} INCLUDING ALL) "
- f"PARTITION BY RANGE (created_at)"
- )
+ # 1. Create a partitioned copy without invalid inherited constraints.
+ _create_partitioned_copy(conn, table, tmp)
- # 2. Create initial partitions: current month + next 3 months
- partitions = [
- ("2026-04-01", "2026-05-01", "y2026m04"),
- ("2026-05-01", "2026-06-01", "y2026m05"),
- ("2026-06-01", "2026-07-01", "y2026m06"),
- ("2026-07-01", "2026-08-01", "y2026m07"),
- ]
- for start, end, suffix in partitions:
- part_name = f"{table}_{suffix}"
- op.execute(
- f"CREATE TABLE {part_name} PARTITION OF {tmp} "
- f"FOR VALUES FROM ('{start}') TO ('{end}')"
- )
- op.execute(
- f"CREATE INDEX ix_{part_name}_created_at ON {part_name} (created_at)"
- )
-
- # 3. Migrate data
+ # 2. Migrate data
op.execute(f"INSERT INTO {tmp} SELECT * FROM {table}")
- # 4. Swap tables
+ # 3. Swap tables and recreate parent indexes.
op.execute(f"DROP TABLE {table}")
op.execute(f"ALTER TABLE {tmp} RENAME TO {table}")
+ _create_indexes(table)
def downgrade() -> None:
@@ -72,7 +127,7 @@ def downgrade() -> None:
for table in ("telemetry_events", "drift_metrics"):
result = conn.execute(
- __import__("sqlalchemy").text(
+ sa.text(
"SELECT c.relkind FROM pg_class c "
"JOIN pg_namespace n ON n.oid = c.relnamespace "
"WHERE c.relname = :tbl AND n.nspname = 'public'"
diff --git a/implementations/acr-control-plane/src/acr/db/migrations/versions/0013_policy_decision_modify.py b/implementations/acr-control-plane/src/acr/db/migrations/versions/0013_policy_decision_modify.py
new file mode 100644
index 0000000..3e08c37
--- /dev/null
+++ b/implementations/acr-control-plane/src/acr/db/migrations/versions/0013_policy_decision_modify.py
@@ -0,0 +1,33 @@
+"""Allow MODIFY decisions in policy_decisions.
+
+Revision ID: 0013
+Revises: 0012
+Create Date: 2026-04-08
+"""
+from __future__ import annotations
+
+from alembic import op
+
+
+revision = "0013"
+down_revision = "0012"
+branch_labels = None
+depends_on = None
+
+
+def upgrade() -> None:
+ op.drop_constraint("ck_policy_decisions_decision", "policy_decisions", type_="check")
+ op.create_check_constraint(
+ "ck_policy_decisions_decision",
+ "policy_decisions",
+ "decision IN ('allow', 'deny', 'modify', 'escalate')",
+ )
+
+
+def downgrade() -> None:
+ op.drop_constraint("ck_policy_decisions_decision", "policy_decisions", type_="check")
+ op.create_check_constraint(
+ "ck_policy_decisions_decision",
+ "policy_decisions",
+ "decision IN ('allow', 'deny', 'escalate')",
+ )
diff --git a/implementations/acr-control-plane/src/acr/db/models.py b/implementations/acr-control-plane/src/acr/db/models.py
index b874558..1a51274 100644
--- a/implementations/acr-control-plane/src/acr/db/models.py
+++ b/implementations/acr-control-plane/src/acr/db/models.py
@@ -97,7 +97,7 @@ class PolicyDecisionRecord(Base):
__tablename__ = "policy_decisions"
__table_args__ = (
CheckConstraint(
- "decision IN ('allow', 'deny', 'escalate')",
+ "decision IN ('allow', 'deny', 'modify', 'escalate')",
name="ck_policy_decisions_decision",
),
)
@@ -108,7 +108,7 @@ class PolicyDecisionRecord(Base):
String(128), ForeignKey("agents.agent_id", ondelete="CASCADE"), nullable=False, index=True
)
policy_id: Mapped[str] = mapped_column(String(128), nullable=False)
- decision: Mapped[str] = mapped_column(String(16), nullable=False) # allow | deny | escalate
+ decision: Mapped[str] = mapped_column(String(16), nullable=False) # allow | deny | modify | escalate
reason: Mapped[str | None] = mapped_column(Text, nullable=True)
tool_name: Mapped[str | None] = mapped_column(String(128), nullable=True)
latency_ms: Mapped[int | None] = mapped_column(Integer, nullable=True)
diff --git a/implementations/acr-control-plane/src/acr/gateway/router.py b/implementations/acr-control-plane/src/acr/gateway/router.py
index 4f094c8..957e5bd 100644
--- a/implementations/acr-control-plane/src/acr/gateway/router.py
+++ b/implementations/acr-control-plane/src/acr/gateway/router.py
@@ -31,18 +31,27 @@
ACRError,
AgentKilledError,
AgentNotRegisteredError,
+ AuthoritativeSpendControlError,
PolicyEngineError,
+ RuntimeControlDependencyError,
)
from acr.common.redis_client import get_redis_or_none
from acr.common.time import iso_utcnow
-from acr.config import settings
+from acr.config import runtime_dependencies_fail_closed, settings
from acr.db.database import BackgroundSessionLocal, async_session_factory, get_db
+from acr.db.models import PolicyDecisionRecord
from acr.gateway.auth import require_agent_token
from acr.gateway.executor import execute_action
+from acr.gateway.spend_control import (
+ adjust_authoritative_spend,
+ get_authoritative_projected_spend,
+ resolve_action_cost_usd,
+)
from acr.pillar1_identity.registry import get_manifest
from acr.pillar1_identity.validator import validate_agent_identity
from acr.pillar2_policy.engine import evaluate_policy
-from acr.pillar2_policy.output_filter import filter_parameters
+from acr.pillar2_policy.models import PolicyDecision
+from acr.pillar2_policy.output_filter import ParameterFilterResult, filter_parameters
from acr.pillar3_drift.baseline import record_metric_sample
from acr.pillar4_observability.otel import acr_span, get_meter
from acr.pillar4_observability.schema import LatencyBreakdown, PolicyResult
@@ -146,12 +155,43 @@ async def _persist_telemetry(event_dict: dict) -> None:
logger.error("Background telemetry persist failed", error=str(exc), exc_info=True)
+async def _persist_policy_decisions(
+ *,
+ agent_id: str,
+ correlation_id: str,
+ tool_name: str,
+ decisions: list[dict],
+) -> None:
+ async with BackgroundSessionLocal() as db:
+ try:
+ for item in decisions:
+ db.add(
+ PolicyDecisionRecord(
+ correlation_id=correlation_id,
+ agent_id=agent_id,
+ policy_id=str(item["policy_id"]),
+ decision=str(item["decision"]),
+ reason=item.get("reason"),
+ tool_name=tool_name,
+ latency_ms=item.get("latency_ms"),
+ )
+ )
+ await db.commit()
+ except Exception as exc:
+ await db.rollback()
+ logger.error("Background policy decision persist failed", error=str(exc), exc_info=True)
+
+
# ── Helpers ───────────────────────────────────────────────────────────────────
async def _get_rate_count(agent_id: str) -> int:
"""Increment and return the server-side action counter for this agent/minute."""
redis = get_redis_or_none()
if redis is None:
+ if runtime_dependencies_fail_closed():
+ raise RuntimeControlDependencyError(
+ "Authoritative rate-limit counter unavailable: Redis is not initialized"
+ )
return 0
now = datetime.now(timezone.utc)
minute_bucket = now.strftime("%Y%m%d%H%M")
@@ -160,10 +200,33 @@ async def _get_rate_count(agent_id: str) -> int:
count = await redis.incr(key)
await redis.expire(key, 120) # expire after 2 minutes
return count
- except Exception:
+ except Exception as exc:
+ if runtime_dependencies_fail_closed():
+ raise RuntimeControlDependencyError(
+ f"Authoritative rate-limit counter unavailable: {exc}"
+ ) from exc
return 0
+def _resolve_action_cost_usd(manifest, tool_name: str) -> float:
+ return resolve_action_cost_usd(manifest, tool_name)
+
+
+async def _get_authoritative_projected_spend(agent_id: str, estimated_action_cost_usd: float) -> float:
+ return await get_authoritative_projected_spend(agent_id, estimated_action_cost_usd)
+
+
+async def _adjust_authoritative_spend(agent_id: str, delta_usd: float) -> None:
+ await adjust_authoritative_spend(agent_id, delta_usd)
+
+
+def _filter_reason(filter_result: ParameterFilterResult) -> str | None:
+ if not filter_result.was_modified:
+ return None
+ redacted = ", ".join(filter_result.redacted_types)
+ return f"Sensitive fields were redacted by output controls ({redacted})"
+
+
async def _get_cached_drift_score(agent_id: str) -> float | None:
redis = get_redis_or_none()
if redis is None:
@@ -287,7 +350,17 @@ async def evaluate(
effective_count = int(server_count * (100 / enriched_context_throttle_pct))
else:
effective_count = server_count
- enriched_context = {**req.context, "actions_this_minute": effective_count}
+
+ estimated_action_cost_usd = _resolve_action_cost_usd(manifest, req.action.tool_name)
+ projected_hourly_spend_usd = await _get_authoritative_projected_spend(
+ req.agent_id,
+ estimated_action_cost_usd,
+ )
+ enriched_context = {
+ **req.context,
+ "actions_this_minute": effective_count,
+ "hourly_spend_usd": projected_hourly_spend_usd,
+ }
# ── [4] Policy evaluation (OPA) ───────────────────────────────────────
with acr_span("policy_evaluation", {"agent.id": req.agent_id, "action.tool": req.action.tool_name}):
@@ -300,11 +373,48 @@ async def evaluate(
policy_ms = int((time.monotonic() - t0) * 1000)
# ── [5] Output filter — PII redaction ─────────────────────────────────
- filtered_params = filter_parameters(
- req.action.tool_name,
- req.action.parameters,
+ effective_action = req.action.model_dump()
+ if policy_result.final_decision == "modify":
+ if policy_result.modified_action is not None:
+ candidate_tool_name = policy_result.modified_action.get("tool_name", req.action.tool_name)
+ if candidate_tool_name != req.action.tool_name:
+ return JSONResponse(
+ status_code=403,
+ content={
+ "decision": "deny",
+ "correlation_id": correlation_id,
+ "reason": "Runtime policy attempted to modify the action tool_name",
+ "error_code": "INVALID_MODIFY_DECISION",
+ },
+ )
+ effective_action.update(policy_result.modified_action)
+ effective_action.setdefault("description", req.action.description)
+ elif policy_result.modified_parameters is not None:
+ effective_action["parameters"] = policy_result.modified_parameters
+
+ filter_result = filter_parameters(
+ effective_action["tool_name"],
+ effective_action.get("parameters") or {},
correlation_id,
)
+ effective_action["parameters"] = filter_result.parameters
+
+ response_decision = policy_result.final_decision
+ response_reason = policy_result.reason
+ response_decisions = list(policy_result.decisions)
+ if response_decision == "allow" and filter_result.was_modified:
+ response_decision = "modify"
+ response_reason = _filter_reason(filter_result)
+ response_decisions.append(
+ PolicyDecision(
+ policy_id="acr-output-modify",
+ decision="modify",
+ reason=response_reason,
+ latency_ms=0,
+ )
+ )
+ elif response_decision == "modify" and not response_reason:
+ response_reason = _filter_reason(filter_result) or "Action transformed by runtime policy"
total_ms = int((time.monotonic() - t_start) * 1000)
end_time = iso_utcnow()
@@ -316,27 +426,30 @@ async def evaluate(
reason=d.reason,
latency_ms=d.latency_ms,
)
- for d in policy_result.decisions
+ for d in response_decisions
]
# Fetch cached drift score for inclusion in response
drift_score = await _get_cached_drift_score(req.agent_id)
# Record OTEL metrics for all decisions
- _record_evaluate_metrics(req.agent_id, policy_result.final_decision, total_ms)
+ _record_evaluate_metrics(req.agent_id, response_decision, total_ms)
# ── [6] Escalate → approval queue ─────────────────────────────────────
- if policy_result.final_decision == "escalate":
+ if response_decision == "escalate":
approval = await create_approval_request(
db,
correlation_id=correlation_id,
agent_id=req.agent_id,
- tool_name=req.action.tool_name,
- parameters=filtered_params,
- description=req.action.description,
+ tool_name=effective_action["tool_name"],
+ parameters=effective_action["parameters"],
+ description=effective_action.get("description"),
approval_queue=policy_result.approval_queue or "default",
sla_minutes=policy_result.sla_minutes or 240,
)
+ # Make the approval request visible before returning its request_id.
+ await db.commit()
+ await db.refresh(approval)
_queue_background_tasks(
background_tasks,
agent_id=req.agent_id,
@@ -349,7 +462,7 @@ async def evaluate(
manifest=manifest,
correlation_id=correlation_id,
req=req,
- filtered_params=filtered_params,
+ filtered_params=effective_action["parameters"],
start_time=start_time,
end_time=end_time,
total_ms=total_ms,
@@ -357,29 +470,46 @@ async def evaluate(
policy_ms=policy_ms,
telemetry_policies=telemetry_policies,
output_decision="escalate",
- output_reason=policy_result.reason,
+ output_reason=response_reason,
approval_request_id=approval.request_id,
drift_score=drift_score,
+ output_filtered=filter_result.was_modified,
+ filter_reason=_filter_reason(filter_result),
+ modified_parameters=effective_action["parameters"] if response_decision == "modify" else None,
+ tool_name=effective_action["tool_name"],
+ description=effective_action.get("description"),
+ custom={
+ "estimated_cost_usd": estimated_action_cost_usd,
+ "authoritative_hourly_spend_usd": projected_hourly_spend_usd,
+ "cost_basis": "authorization_estimate",
+ },
),
)
+ background_tasks.add_task(
+ _persist_policy_decisions,
+ agent_id=req.agent_id,
+ correlation_id=correlation_id,
+ tool_name=effective_action["tool_name"],
+ decisions=[d.model_dump(mode="json") for d in response_decisions],
+ )
return JSONResponse(
status_code=202,
content={
"decision": "escalate",
"correlation_id": correlation_id,
"approval_request_id": approval.request_id,
- "reason": policy_result.reason,
+ "reason": response_reason,
"approval_queue": approval.approval_queue,
"sla_minutes": approval.sla_minutes,
},
)
# ── [7] Deny ──────────────────────────────────────────────────────────
- if policy_result.final_decision == "deny":
+ if response_decision == "deny":
_queue_background_tasks(
background_tasks,
agent_id=req.agent_id,
- tool_name=req.action.tool_name,
+ tool_name=effective_action["tool_name"],
policy_denied=True,
total_ms=total_ms,
correlation_id=correlation_id,
@@ -388,7 +518,7 @@ async def evaluate(
manifest=manifest,
correlation_id=correlation_id,
req=req,
- filtered_params=filtered_params,
+ filtered_params=effective_action["parameters"],
start_time=start_time,
end_time=end_time,
total_ms=total_ms,
@@ -396,38 +526,62 @@ async def evaluate(
policy_ms=policy_ms,
telemetry_policies=telemetry_policies,
output_decision="deny",
- output_reason=policy_result.reason,
+ output_reason=response_reason,
approval_request_id=None,
drift_score=drift_score,
+ output_filtered=filter_result.was_modified,
+ filter_reason=_filter_reason(filter_result),
+ modified_parameters=effective_action["parameters"] if response_decision == "modify" else None,
+ tool_name=effective_action["tool_name"],
+ description=effective_action.get("description"),
+ custom={
+ "estimated_cost_usd": estimated_action_cost_usd,
+ "authoritative_hourly_spend_usd": projected_hourly_spend_usd,
+ "cost_basis": "authorization_estimate",
+ },
),
)
+ background_tasks.add_task(
+ _persist_policy_decisions,
+ agent_id=req.agent_id,
+ correlation_id=correlation_id,
+ tool_name=effective_action["tool_name"],
+ decisions=[d.model_dump(mode="json") for d in response_decisions],
+ )
return JSONResponse(
status_code=403,
content={
"decision": "deny",
"correlation_id": correlation_id,
- "reason": policy_result.reason,
+ "reason": response_reason,
"policy_decisions": [
{"policy_id": d.policy_id, "decision": d.decision, "reason": d.reason}
- for d in policy_result.decisions
+ for d in response_decisions
],
},
)
- # ── [8] Allow ─────────────────────────────────────────────────────────
+ # ── [8] Allow / Modify ────────────────────────────────────────────────
execution_result = None
if settings.execute_allowed_actions:
- execution_result = await execute_action(
- agent_id=req.agent_id,
- tool_name=req.action.tool_name,
- parameters=req.action.parameters,
- description=req.action.description,
- correlation_id=correlation_id,
- )
+ await _adjust_authoritative_spend(req.agent_id, estimated_action_cost_usd)
+ try:
+ execution_result = await execute_action(
+ agent_id=req.agent_id,
+ tool_name=effective_action["tool_name"],
+ parameters=effective_action["parameters"],
+ description=effective_action.get("description"),
+ correlation_id=correlation_id,
+ )
+ except Exception:
+ await _adjust_authoritative_spend(req.agent_id, -estimated_action_cost_usd)
+ raise
+ else:
+ await _adjust_authoritative_spend(req.agent_id, estimated_action_cost_usd)
_queue_background_tasks(
background_tasks,
agent_id=req.agent_id,
- tool_name=req.action.tool_name,
+ tool_name=effective_action["tool_name"],
policy_denied=False,
total_ms=total_ms,
correlation_id=correlation_id,
@@ -436,29 +590,56 @@ async def evaluate(
manifest=manifest,
correlation_id=correlation_id,
req=req,
- filtered_params=filtered_params,
+ filtered_params=effective_action["parameters"],
start_time=start_time,
end_time=end_time,
total_ms=total_ms,
identity_ms=identity_ms,
policy_ms=policy_ms,
telemetry_policies=telemetry_policies,
- output_decision="allow",
- output_reason=None,
+ output_decision=response_decision,
+ output_reason=response_reason,
approval_request_id=None,
drift_score=drift_score,
+ output_filtered=filter_result.was_modified,
+ filter_reason=_filter_reason(filter_result),
+ modified_parameters=effective_action["parameters"] if response_decision == "modify" else None,
+ tool_name=effective_action["tool_name"],
+ description=effective_action.get("description"),
+ custom={
+ "estimated_cost_usd": estimated_action_cost_usd,
+ "authoritative_hourly_spend_usd": projected_hourly_spend_usd,
+ "cost_basis": "authorization_estimate",
+ },
),
)
+ background_tasks.add_task(
+ _persist_policy_decisions,
+ agent_id=req.agent_id,
+ correlation_id=correlation_id,
+ tool_name=effective_action["tool_name"],
+ decisions=[d.model_dump(mode="json") for d in response_decisions],
+ )
content = {
- "decision": "allow",
+ "decision": response_decision,
"correlation_id": correlation_id,
"policy_decisions": [
- {"policy_id": d.policy_id, "decision": d.decision}
- for d in policy_result.decisions
+ {"policy_id": d.policy_id, "decision": d.decision, "reason": d.reason}
+ for d in response_decisions
],
"drift_score": drift_score,
"latency_ms": total_ms,
+ "estimated_cost_usd": estimated_action_cost_usd,
+ "authoritative_hourly_spend_usd": projected_hourly_spend_usd,
}
+ if response_reason:
+ content["reason"] = response_reason
+ if response_decision == "modify":
+ content["modified_action"] = {
+ "tool_name": effective_action["tool_name"],
+ "parameters": effective_action["parameters"],
+ "description": effective_action.get("description"),
+ }
if execution_result is not None:
content["execution_result"] = execution_result
return JSONResponse(status_code=200, content=content)
@@ -541,6 +722,12 @@ async def _emit_telemetry_event(
output_reason,
approval_request_id,
drift_score,
+ output_filtered=False,
+ filter_reason=None,
+ modified_parameters=None,
+ custom=None,
+ tool_name=None,
+ description=None,
) -> None:
event = build_event(
event_type=event_type,
@@ -549,9 +736,9 @@ async def _emit_telemetry_event(
agent_capabilities=manifest.allowed_tools,
correlation_id=correlation_id,
session_id=req.context.get("session_id"),
- tool_name=req.action.tool_name,
+ tool_name=tool_name or req.action.tool_name,
parameters=filtered_params,
- description=req.action.description,
+ description=description if description is not None else req.action.description,
context=req.context,
intent=req.intent.model_dump(exclude_none=True) if req.intent else {},
start_time=start_time,
@@ -563,6 +750,10 @@ async def _emit_telemetry_event(
output_reason=output_reason,
approval_request_id=approval_request_id,
drift_score=drift_score,
+ output_filtered=output_filtered,
+ filter_reason=filter_reason,
+ modified_parameters=modified_parameters,
+ custom=custom,
)
log_event(event)
await _persist_telemetry(event.model_dump(mode="json"))
diff --git a/implementations/acr-control-plane/src/acr/gateway/spend_control.py b/implementations/acr-control-plane/src/acr/gateway/spend_control.py
new file mode 100644
index 0000000..5806577
--- /dev/null
+++ b/implementations/acr-control-plane/src/acr/gateway/spend_control.py
@@ -0,0 +1,96 @@
+"""Shared authoritative spend-control helpers for gateway and approval execution."""
+from __future__ import annotations
+
+from datetime import datetime, timezone
+from typing import Any
+
+from acr.common.errors import AuthoritativeSpendControlError, RuntimeControlDependencyError
+from acr.common.redis_client import get_redis_or_none
+from acr.config import runtime_dependencies_fail_closed, settings
+
+_SPEND_KEY_PREFIX = "acr:spend:"
+
+
+def _hour_bucket() -> str:
+ now = datetime.now(timezone.utc)
+ return now.strftime("%Y%m%d%H")
+
+
+def _extract_boundaries(manifest: Any) -> dict[str, Any]:
+ if hasattr(manifest, "boundaries"):
+ boundaries = getattr(manifest, "boundaries")
+ if hasattr(boundaries, "model_dump"):
+ return boundaries.model_dump()
+ if isinstance(boundaries, dict):
+ return boundaries
+ if isinstance(manifest, dict):
+ boundaries = manifest.get("boundaries")
+ if isinstance(boundaries, dict):
+ return boundaries
+ return {}
+
+
+def resolve_action_cost_usd(manifest: Any, tool_name: str) -> float:
+ boundaries = _extract_boundaries(manifest)
+ tool_costs = boundaries.get("tool_costs_usd") or {}
+ if tool_name in tool_costs:
+ return float(tool_costs[tool_name])
+
+ default_cost = boundaries.get("default_action_cost_usd")
+ if default_cost is not None:
+ return float(default_cost)
+
+ if settings.acr_env in ("development", "test"):
+ return 0.0
+
+ raise AuthoritativeSpendControlError(
+ f"No authoritative spend estimate configured for tool '{tool_name}'"
+ )
+
+
+async def get_authoritative_projected_spend(
+ agent_id: str,
+ estimated_action_cost_usd: float,
+) -> float:
+ redis = get_redis_or_none()
+ if redis is None:
+ if runtime_dependencies_fail_closed():
+ raise RuntimeControlDependencyError(
+ "Authoritative spend ledger unavailable: Redis is not initialized"
+ )
+ return round(float(estimated_action_cost_usd), 4)
+
+ key = f"{_SPEND_KEY_PREFIX}{agent_id}:{_hour_bucket()}"
+ try:
+ current = await redis.get(key)
+ current_spend = float(current) if current else 0.0
+ return round(current_spend + float(estimated_action_cost_usd), 4)
+ except Exception as exc:
+ if runtime_dependencies_fail_closed():
+ raise RuntimeControlDependencyError(
+ f"Authoritative spend ledger unavailable: {exc}"
+ ) from exc
+ return round(float(estimated_action_cost_usd), 4)
+
+
+async def adjust_authoritative_spend(agent_id: str, delta_usd: float) -> None:
+ if delta_usd == 0:
+ return
+
+ redis = get_redis_or_none()
+ if redis is None:
+ if runtime_dependencies_fail_closed():
+ raise RuntimeControlDependencyError(
+ "Authoritative spend ledger unavailable during commit: Redis is not initialized"
+ )
+ return
+
+ key = f"{_SPEND_KEY_PREFIX}{agent_id}:{_hour_bucket()}"
+ try:
+ await redis.incrbyfloat(key, float(delta_usd))
+ await redis.expire(key, 7200)
+ except Exception as exc:
+ if runtime_dependencies_fail_closed():
+ raise RuntimeControlDependencyError(
+ f"Authoritative spend ledger unavailable during commit: {exc}"
+ ) from exc
diff --git a/implementations/acr-control-plane/src/acr/main.py b/implementations/acr-control-plane/src/acr/main.py
index d767aaf..161d5ba 100644
--- a/implementations/acr-control-plane/src/acr/main.py
+++ b/implementations/acr-control-plane/src/acr/main.py
@@ -193,7 +193,7 @@ def _missing_tables(sync_conn):
app = FastAPI(
title="ACR Control Plane",
description="Agentic Control at Runtime",
- version="1.0.0",
+ version="1.0.1",
lifespan=lifespan,
docs_url=None,
redoc_url=None,
diff --git a/implementations/acr-control-plane/src/acr/operator_console/static/app.js b/implementations/acr-control-plane/src/acr/operator_console/static/app.js
index 2547d1b..ac1616d 100644
--- a/implementations/acr-control-plane/src/acr/operator_console/static/app.js
+++ b/implementations/acr-control-plane/src/acr/operator_console/static/app.js
@@ -1,6 +1,6 @@
const state = {
- operatorApiKey: localStorage.getItem("acr.operatorApiKey") || "",
- killswitchSecret: localStorage.getItem("acr.killswitchSecret") || "",
+ operatorApiKey: "",
+ killswitchSecret: "",
currentPolicyDraftId: null,
sessionPrincipal: null,
};
@@ -124,6 +124,8 @@ function generatePolicyStarter(formData) {
const approvalQueue = (formData.approval_queue || template.approvalQueue || "default").trim();
const spendLimit = Number(formData.max_cost_per_hour_usd || 5);
const actionLimit = Number(formData.max_actions_per_minute || 30);
+ const defaultActionCost = formData.default_action_cost_usd ? Number(formData.default_action_cost_usd) : null;
+ const toolCosts = parseJsonField(formData.tool_costs_usd_json, {});
const escalationThreshold = formData.escalate_over_amount ? Number(formData.escalate_over_amount) : null;
const manifest = {
@@ -136,6 +138,8 @@ function generatePolicyStarter(formData) {
boundaries: {
max_actions_per_minute: actionLimit,
max_cost_per_hour_usd: spendLimit,
+ default_action_cost_usd: defaultActionCost,
+ tool_costs_usd: toolCosts,
credential_rotation_days: 90,
allowed_regions: [],
},
@@ -447,16 +451,14 @@ function installSessionControls() {
document.getElementById("save-session").addEventListener("click", () => {
state.operatorApiKey = document.getElementById("operator-api-key").value.trim();
state.killswitchSecret = document.getElementById("killswitch-secret").value.trim();
- localStorage.setItem("acr.operatorApiKey", state.operatorApiKey);
- localStorage.setItem("acr.killswitchSecret", state.killswitchSecret);
updateSessionUI();
- setMessage("Operator session saved.");
+ setMessage("Operator session loaded for this tab only.");
});
document.getElementById("clear-session").addEventListener("click", () => {
state.operatorApiKey = "";
state.killswitchSecret = "";
- localStorage.removeItem("acr.operatorApiKey");
- localStorage.removeItem("acr.killswitchSecret");
+ document.getElementById("operator-api-key").value = "";
+ document.getElementById("killswitch-secret").value = "";
updateSessionUI();
setMessage("Operator session cleared.");
});
@@ -476,6 +478,8 @@ function installForms() {
boundaries: {
max_actions_per_minute: Number(data.max_actions_per_minute || 30),
max_cost_per_hour_usd: Number(data.max_cost_per_hour_usd || 5),
+ default_action_cost_usd: data.default_action_cost_usd ? Number(data.default_action_cost_usd) : null,
+ tool_costs_usd: parseJsonField(data.tool_costs_usd_json, {}),
credential_rotation_days: 90,
allowed_regions: [],
},
diff --git a/implementations/acr-control-plane/src/acr/operator_console/static/index.html b/implementations/acr-control-plane/src/acr/operator_console/static/index.html
index 24bc179..ca129a3 100644
--- a/implementations/acr-control-plane/src/acr/operator_console/static/index.html
+++ b/implementations/acr-control-plane/src/acr/operator_console/static/index.html
@@ -29,7 +29,7 @@
Operator Session
- Use SSO when configured. API keys are still supported for bootstrap and break-glass access.
+ Use SSO when configured. API keys are supported for bootstrap and break-glass access and are kept in memory only for this tab.
Session not configured
@@ -115,6 +115,10 @@ Register Agent
+
+
+
+
@@ -210,6 +214,10 @@ Guided Policy Setup
+
+
+
+
diff --git a/implementations/acr-control-plane/src/acr/pillar1_identity/models.py b/implementations/acr-control-plane/src/acr/pillar1_identity/models.py
index b083569..7e03d3c 100644
--- a/implementations/acr-control-plane/src/acr/pillar1_identity/models.py
+++ b/implementations/acr-control-plane/src/acr/pillar1_identity/models.py
@@ -4,7 +4,7 @@
from datetime import datetime
from typing import Literal
-from pydantic import BaseModel, Field
+from pydantic import BaseModel, Field, field_validator
# Lifecycle: agents move draft → active → deprecated → retired.
# - draft: registered but not yet allowed to evaluate (initial setup)
@@ -26,9 +26,35 @@ class DataAccessEntry(BaseModel):
class AgentBoundaries(BaseModel):
max_actions_per_minute: int = 30
max_cost_per_hour_usd: float = 5.0
+ default_action_cost_usd: float | None = None
+ tool_costs_usd: dict[str, float] = Field(default_factory=dict)
allowed_regions: list[str] = Field(default_factory=list)
credential_rotation_days: int = 90
+ @field_validator("max_cost_per_hour_usd")
+ @classmethod
+ def _validate_max_cost(cls, value: float) -> float:
+ if value < 0:
+ raise ValueError("max_cost_per_hour_usd cannot be negative")
+ return value
+
+ @field_validator("default_action_cost_usd")
+ @classmethod
+ def _validate_default_cost(cls, value: float | None) -> float | None:
+ if value is not None and value < 0:
+ raise ValueError("default_action_cost_usd cannot be negative")
+ return value
+
+ @field_validator("tool_costs_usd")
+ @classmethod
+ def _validate_tool_costs(cls, value: dict[str, float]) -> dict[str, float]:
+ normalized: dict[str, float] = {}
+ for tool_name, cost in value.items():
+ if cost < 0:
+ raise ValueError(f"tool_costs_usd[{tool_name!r}] cannot be negative")
+ normalized[str(tool_name)] = float(cost)
+ return normalized
+
class AgentManifest(BaseModel):
"""ACR agent manifest — matches the spec YAML schema."""
diff --git a/implementations/acr-control-plane/src/acr/pillar2_policy/engine.py b/implementations/acr-control-plane/src/acr/pillar2_policy/engine.py
index 8b98c27..d054f00 100644
--- a/implementations/acr-control-plane/src/acr/pillar2_policy/engine.py
+++ b/implementations/acr-control-plane/src/acr/pillar2_policy/engine.py
@@ -4,6 +4,7 @@
import asyncio
import time
from datetime import timedelta
+from typing import Any
import httpx
import structlog
@@ -28,20 +29,24 @@
# Module-level connection-pooled client (avoids creating a new TCP connection per call)
_opa_client: httpx.AsyncClient | None = None
+_opa_client_factory: object | None = None
def get_opa_client() -> httpx.AsyncClient:
- global _opa_client
- if _opa_client is None:
+ global _opa_client, _opa_client_factory
+ current_factory = httpx.AsyncClient
+ if _opa_client is None or _opa_client_factory is not current_factory:
_opa_client = httpx.AsyncClient(base_url=settings.opa_url, timeout=_TIMEOUT)
+ _opa_client_factory = current_factory
return _opa_client
async def close_opa_client() -> None:
- global _opa_client
+ global _opa_client, _opa_client_factory
if _opa_client is not None:
await _opa_client.aclose()
_opa_client = None
+ _opa_client_factory = None
# Circuit breaker: opens after 5 failures in 60 seconds, fails secure
@@ -146,9 +151,12 @@ async def evaluate_policy(
deny_reasons: list[str] = result.get("deny", [])
allow: bool = result.get("allow", False)
+ modify: bool = result.get("modify", False)
escalate: bool = result.get("escalate", False)
escalate_queue: str = result.get("escalate_queue", "default")
escalate_sla: int = result.get("escalate_sla_minutes", 240)
+ modified_action = _coerce_dict(result.get("modified_action"))
+ modified_parameters = _coerce_dict(result.get("modified_parameters"))
decisions: list[PolicyDecision] = []
@@ -171,6 +179,26 @@ async def evaluate_policy(
reason=reason,
latency_ms=elapsed_ms,
))
+ elif modify:
+ modified_payload = modified_action or modified_parameters
+ if modified_payload is None:
+ final = "deny"
+ reason = "Policy requested modify without supplying a transformed payload"
+ decisions.append(PolicyDecision(
+ policy_id="acr-invalid-modify",
+ decision="deny",
+ reason=reason,
+ latency_ms=elapsed_ms,
+ ))
+ else:
+ final = "modify"
+ reason = "Action transformed by runtime policy"
+ decisions.append(PolicyDecision(
+ policy_id="acr-modify",
+ decision="modify",
+ reason=reason,
+ latency_ms=elapsed_ms,
+ ))
elif allow:
final = "allow"
reason = None
@@ -204,5 +232,15 @@ async def evaluate_policy(
reason=reason,
approval_queue=escalate_queue if escalate else None,
sla_minutes=escalate_sla if escalate else None,
+ modified_action=modified_action if final == "modify" else None,
+ modified_parameters=modified_parameters if final == "modify" else None,
latency_ms=elapsed_ms,
)
+
+
+def _coerce_dict(value: Any) -> dict[str, Any] | None:
+ if value is None:
+ return None
+ if not isinstance(value, dict):
+ return None
+ return value
diff --git a/implementations/acr-control-plane/src/acr/pillar2_policy/models.py b/implementations/acr-control-plane/src/acr/pillar2_policy/models.py
index e39fbdd..7bb2ae2 100644
--- a/implementations/acr-control-plane/src/acr/pillar2_policy/models.py
+++ b/implementations/acr-control-plane/src/acr/pillar2_policy/models.py
@@ -8,7 +8,7 @@
class PolicyDecision(BaseModel):
policy_id: str
- decision: Literal["allow", "deny", "escalate"] = "allow"
+ decision: Literal["allow", "deny", "modify", "escalate"] = "allow"
reason: str | None = None
latency_ms: int | None = None
@@ -28,9 +28,11 @@ class OPAResponse(BaseModel):
class PolicyEvaluationResult(BaseModel):
"""Aggregated result from OPA evaluation."""
- final_decision: Literal["allow", "deny", "escalate"]
+ final_decision: Literal["allow", "deny", "modify", "escalate"]
decisions: list[PolicyDecision]
reason: str | None = None
approval_queue: str | None = None
sla_minutes: int | None = None
+ modified_action: dict | None = None
+ modified_parameters: dict | None = None
latency_ms: int = 0
diff --git a/implementations/acr-control-plane/src/acr/pillar2_policy/output_filter.py b/implementations/acr-control-plane/src/acr/pillar2_policy/output_filter.py
index e40d856..d95ec28 100644
--- a/implementations/acr-control-plane/src/acr/pillar2_policy/output_filter.py
+++ b/implementations/acr-control-plane/src/acr/pillar2_policy/output_filter.py
@@ -2,6 +2,8 @@
from __future__ import annotations
import re
+from dataclasses import dataclass
+from collections.abc import Mapping, Iterator
from typing import Any
import structlog
@@ -19,6 +21,28 @@
_REDACTION = "[REDACTED]"
+@dataclass(frozen=True)
+class ParameterFilterResult(Mapping[str, Any]):
+ parameters: dict[str, Any]
+ redacted_types: tuple[str, ...]
+
+ @property
+ def was_modified(self) -> bool:
+ return bool(self.redacted_types)
+
+ def __getitem__(self, key: str) -> Any:
+ return self.parameters[key]
+
+ def __iter__(self) -> Iterator[str]:
+ return iter(self.parameters)
+
+ def __len__(self) -> int:
+ return len(self.parameters)
+
+ def get(self, key: str, default: Any = None) -> Any:
+ return self.parameters.get(key, default)
+
+
def _redact_string(value: str) -> tuple[str, list[str]]:
"""Redact PII patterns from a string. Returns (cleaned_value, list_of_redacted_types)."""
found: list[str] = []
@@ -63,7 +87,7 @@ def filter_parameters(
tool_name: str,
parameters: dict[str, Any],
correlation_id: str,
-) -> dict[str, Any]:
+) -> ParameterFilterResult:
"""
Apply PII redaction to action parameters before forwarding.
Logs any redactions made.
@@ -76,4 +100,7 @@ def filter_parameters(
correlation_id=correlation_id,
redacted_types=redacted_types,
)
- return cleaned
+ return ParameterFilterResult(
+ parameters=cleaned,
+ redacted_types=tuple(sorted(redacted_types)),
+ )
diff --git a/implementations/acr-control-plane/src/acr/pillar4_observability/evidence.py b/implementations/acr-control-plane/src/acr/pillar4_observability/evidence.py
index 4b2a579..25d6f3d 100644
--- a/implementations/acr-control-plane/src/acr/pillar4_observability/evidence.py
+++ b/implementations/acr-control-plane/src/acr/pillar4_observability/evidence.py
@@ -1,6 +1,7 @@
from __future__ import annotations
import hashlib
+import hmac
import io
import json
import zipfile
@@ -8,6 +9,8 @@
from acr.common.errors import EvidenceBundleNotFoundError
from acr.common.time import iso_utcnow
+from acr.pillar4_observability.integrity import verify_event_chain
+from acr.config import settings
@dataclass(frozen=True)
@@ -22,6 +25,15 @@ def _sha256_bytes(data: bytes) -> str:
return hashlib.sha256(data).hexdigest()
+def _bundle_signature(manifest_bytes: bytes, events_jsonl: bytes) -> str:
+ signed = manifest_bytes + b"\n" + events_jsonl
+ return hmac.new(
+ settings.audit_signing_secret.encode(),
+ signed,
+ hashlib.sha256,
+ ).hexdigest()
+
+
def build_evidence_bundle(*, correlation_id: str, events: list[dict]) -> EvidenceBundleArtifact:
if not events:
raise EvidenceBundleNotFoundError(
@@ -42,6 +54,7 @@ def build_evidence_bundle(*, correlation_id: str, events: list[dict]) -> Evidenc
if event.get("output", {}).get("decision")
}
)
+ integrity_summary = verify_event_chain(events)
events_jsonl = "\n".join(json.dumps(event, sort_keys=True) for event in events).encode() + b"\n"
manifest = {
@@ -56,18 +69,21 @@ def build_evidence_bundle(*, correlation_id: str, events: list[dict]) -> Evidenc
"start": first.get("timestamp"),
"end": last.get("timestamp"),
},
+ "integrity": integrity_summary,
}
manifest_bytes = json.dumps(manifest, indent=2, sort_keys=True).encode() + b"\n"
checksums = (
f"{_sha256_bytes(manifest_bytes)} manifest.json\n"
f"{_sha256_bytes(events_jsonl)} events.jsonl\n"
).encode()
+ bundle_signature = _bundle_signature(manifest_bytes, events_jsonl).encode() + b"\n"
buffer = io.BytesIO()
with zipfile.ZipFile(buffer, mode="w", compression=zipfile.ZIP_DEFLATED) as archive:
archive.writestr("manifest.json", manifest_bytes)
archive.writestr("events.jsonl", events_jsonl)
archive.writestr("checksums.sha256", checksums)
+ archive.writestr("bundle.signature", bundle_signature)
bytes_data = buffer.getvalue()
return EvidenceBundleArtifact(
diff --git a/implementations/acr-control-plane/src/acr/pillar4_observability/integrity.py b/implementations/acr-control-plane/src/acr/pillar4_observability/integrity.py
new file mode 100644
index 0000000..8221fa9
--- /dev/null
+++ b/implementations/acr-control-plane/src/acr/pillar4_observability/integrity.py
@@ -0,0 +1,88 @@
+from __future__ import annotations
+
+import copy
+import hashlib
+import hmac
+import json
+from typing import Any
+
+from acr.config import settings
+
+
+def canonical_json_bytes(payload: dict[str, Any]) -> bytes:
+ return json.dumps(payload, sort_keys=True, separators=(",", ":")).encode()
+
+
+def payload_sha256(payload: dict[str, Any]) -> str:
+ return hashlib.sha256(canonical_json_bytes(payload)).hexdigest()
+
+
+def remove_integrity_metadata(payload: dict[str, Any]) -> dict[str, Any]:
+ stripped = copy.deepcopy(payload)
+ metadata = stripped.get("metadata")
+ if isinstance(metadata, dict):
+ metadata.pop("integrity", None)
+ return stripped
+
+
+def sign_payload_hash(payload_hash: str, previous_event_sha256: str | None) -> str:
+ message = f"{payload_hash}:{previous_event_sha256 or ''}".encode()
+ return hmac.new(
+ settings.audit_signing_secret.encode(),
+ message,
+ hashlib.sha256,
+ ).hexdigest()
+
+
+def extract_payload_hash(payload: dict[str, Any]) -> str | None:
+ metadata = payload.get("metadata")
+ if not isinstance(metadata, dict):
+ return None
+ integrity = metadata.get("integrity")
+ if not isinstance(integrity, dict):
+ return None
+ payload_hash = integrity.get("payload_sha256")
+ return str(payload_hash) if payload_hash else None
+
+
+def verify_event_chain(events: list[dict[str, Any]]) -> dict[str, Any]:
+ previous_hash: str | None = None
+ verified = 0
+ signed = 0
+ invalid_reasons: list[str] = []
+
+ for index, event in enumerate(events):
+ metadata = event.get("metadata")
+ integrity = metadata.get("integrity") if isinstance(metadata, dict) else None
+ if not isinstance(integrity, dict):
+ invalid_reasons.append(f"event[{index}] missing integrity metadata")
+ break
+
+ expected_hash = payload_sha256(remove_integrity_metadata(event))
+ actual_hash = integrity.get("payload_sha256")
+ if actual_hash != expected_hash:
+ invalid_reasons.append(f"event[{index}] payload hash mismatch")
+ break
+
+ actual_previous = integrity.get("previous_event_sha256")
+ if actual_previous != previous_hash:
+ invalid_reasons.append(f"event[{index}] previous-event hash mismatch")
+ break
+
+ expected_signature = sign_payload_hash(expected_hash, previous_hash)
+ actual_signature = integrity.get("record_signature")
+ if not actual_signature or not hmac.compare_digest(str(actual_signature), expected_signature):
+ invalid_reasons.append(f"event[{index}] record signature mismatch")
+ break
+
+ verified += 1
+ signed += 1
+ previous_hash = expected_hash
+
+ return {
+ "chain_valid": not invalid_reasons,
+ "verified_events": verified,
+ "signed_events": signed,
+ "root_event_sha256": previous_hash,
+ "invalid_reasons": invalid_reasons,
+ }
diff --git a/implementations/acr-control-plane/src/acr/pillar4_observability/schema.py b/implementations/acr-control-plane/src/acr/pillar4_observability/schema.py
index 2bce635..6f41f8b 100644
--- a/implementations/acr-control-plane/src/acr/pillar4_observability/schema.py
+++ b/implementations/acr-control-plane/src/acr/pillar4_observability/schema.py
@@ -55,18 +55,19 @@ class TriggeredRule(BaseModel):
class PolicyResult(BaseModel):
policy_id: str
policy_name: str | None = None
- decision: Literal["allow", "deny", "escalate", "warn"] = "allow"
+ decision: Literal["allow", "deny", "modify", "escalate", "warn"] = "allow"
reason: str | None = None
latency_ms: int | None = None
triggered_rules: list[TriggeredRule] = Field(default_factory=list)
class OutputObject(BaseModel):
- decision: Literal["allow", "deny", "escalate"]
+ decision: Literal["allow", "deny", "modify", "escalate"]
reason: str | None = None
approval_request_id: str | None = None
filtered: bool = False
filter_reason: str | None = None
+ modified_parameters: dict[str, Any] | None = None
class AcrControlPlaneMetadata(BaseModel):
@@ -74,12 +75,20 @@ class AcrControlPlaneMetadata(BaseModel):
enforcement_point: str = "gateway"
+class IntegrityMetadata(BaseModel):
+ payload_sha256: str
+ previous_event_sha256: str | None = None
+ record_signature: str | None = None
+ signature_algorithm: Literal["hmac-sha256"] = "hmac-sha256"
+
+
class TelemetryMetadata(BaseModel):
environment: str
acr_control_plane: AcrControlPlaneMetadata
drift_score: float | None = None
anomaly_flags: list[str] = Field(default_factory=list)
compliance_tags: list[str] = Field(default_factory=list)
+ integrity: IntegrityMetadata | None = None
# ── Top-level event ───────────────────────────────────────────────────────────
diff --git a/implementations/acr-control-plane/src/acr/pillar4_observability/telemetry.py b/implementations/acr-control-plane/src/acr/pillar4_observability/telemetry.py
index e14f053..49e9430 100644
--- a/implementations/acr-control-plane/src/acr/pillar4_observability/telemetry.py
+++ b/implementations/acr-control-plane/src/acr/pillar4_observability/telemetry.py
@@ -2,10 +2,17 @@
from __future__ import annotations
import structlog
+from sqlalchemy import desc, select
from sqlalchemy.ext.asyncio import AsyncSession
from acr.config import settings
from acr.db.models import TelemetryEventRecord
+from acr.pillar4_observability.integrity import (
+ extract_payload_hash,
+ payload_sha256,
+ remove_integrity_metadata,
+ sign_payload_hash,
+)
from acr.pillar4_observability.schema import (
ACRTelemetryEvent,
AcrControlPlaneMetadata,
@@ -44,6 +51,9 @@ def build_event(
output_reason: str | None,
approval_request_id: str | None,
drift_score: float | None,
+ output_filtered: bool = False,
+ filter_reason: str | None = None,
+ modified_parameters: dict | None = None,
custom: dict | None = None,
) -> ACRTelemetryEvent:
return ACRTelemetryEvent(
@@ -73,6 +83,9 @@ def build_event(
decision=output_decision, # type: ignore[arg-type]
reason=output_reason,
approval_request_id=approval_request_id,
+ filtered=output_filtered,
+ filter_reason=filter_reason,
+ modified_parameters=modified_parameters,
),
metadata=TelemetryMetadata(
environment=settings.acr_env,
@@ -85,12 +98,22 @@ def build_event(
async def persist_event(db: AsyncSession, event: ACRTelemetryEvent) -> None:
"""Write the telemetry event to PostgreSQL (async, non-blocking in background task)."""
+ payload = event.model_dump(mode="json")
+ previous_hash = await _latest_correlation_event_hash(db, event.request.request_id)
+ payload_hash = payload_sha256(remove_integrity_metadata(payload))
+ payload.setdefault("metadata", {})["integrity"] = {
+ "payload_sha256": payload_hash,
+ "previous_event_sha256": previous_hash,
+ "record_signature": sign_payload_hash(payload_hash, previous_hash),
+ "signature_algorithm": "hmac-sha256",
+ }
+
record = TelemetryEventRecord(
event_id=event.event_id,
correlation_id=event.request.request_id,
event_type=event.event_type,
agent_id=event.agent.agent_id,
- payload=event.model_dump(),
+ payload=payload,
)
db.add(record)
await db.flush()
@@ -110,3 +133,16 @@ def log_event(event: ACRTelemetryEvent) -> None:
duration_ms=event.execution.duration_ms,
drift_score=event.metadata.drift_score,
)
+
+
+async def _latest_correlation_event_hash(db: AsyncSession, correlation_id: str) -> str | None:
+ result = await db.execute(
+ select(TelemetryEventRecord.payload)
+ .where(TelemetryEventRecord.correlation_id == correlation_id)
+ .order_by(desc(TelemetryEventRecord.created_at))
+ .limit(1)
+ )
+ payload = result.scalar_one_or_none()
+ if not isinstance(payload, dict):
+ return None
+ return extract_payload_hash(payload)
diff --git a/implementations/acr-control-plane/src/acr/pillar5_containment/service.py b/implementations/acr-control-plane/src/acr/pillar5_containment/service.py
index 42d1657..5776784 100644
--- a/implementations/acr-control-plane/src/acr/pillar5_containment/service.py
+++ b/implementations/acr-control-plane/src/acr/pillar5_containment/service.py
@@ -45,7 +45,7 @@ async def lifespan(app: FastAPI):
await _redis.aclose()
-app = FastAPI(title="ACR Kill Switch Service", version="1.0.0", lifespan=lifespan)
+app = FastAPI(title="ACR Kill Switch Service", version="1.0.1", lifespan=lifespan)
def _redis_client() -> aioredis.Redis:
diff --git a/implementations/acr-control-plane/src/acr/pillar6_authority/approval.py b/implementations/acr-control-plane/src/acr/pillar6_authority/approval.py
index a79b8cb..c52d9dc 100644
--- a/implementations/acr-control-plane/src/acr/pillar6_authority/approval.py
+++ b/implementations/acr-control-plane/src/acr/pillar6_authority/approval.py
@@ -17,6 +17,8 @@
from acr.config import settings
from acr.db.models import ApprovalRequestRecord
from acr.gateway.executor import execute_action
+from acr.gateway.spend_control import adjust_authoritative_spend, resolve_action_cost_usd
+from acr.pillar1_identity.registry import get_manifest
logger = structlog.get_logger(__name__)
@@ -192,22 +194,32 @@ async def override(db: AsyncSession, request_id: str, decided_by: str, reason: s
return record
-async def execute_approval(record: ApprovalRequestRecord) -> dict:
+async def execute_approval(db: AsyncSession, record: ApprovalRequestRecord) -> dict:
"""Execute an approved/overridden action when downstream execution is enabled."""
- result = await execute_action(
- agent_id=record.agent_id,
- tool_name=record.tool_name,
- parameters=record.parameters or {},
- description=record.description,
- correlation_id=record.correlation_id,
- approval_request_id=record.request_id,
- )
+ manifest = await get_manifest(db, record.agent_id)
+ estimated_cost_usd = resolve_action_cost_usd(manifest, record.tool_name)
+
+ await adjust_authoritative_spend(record.agent_id, estimated_cost_usd)
+ try:
+ result = await execute_action(
+ agent_id=record.agent_id,
+ tool_name=record.tool_name,
+ parameters=record.parameters or {},
+ description=record.description,
+ correlation_id=record.correlation_id,
+ approval_request_id=record.request_id,
+ )
+ except Exception:
+ await adjust_authoritative_spend(record.agent_id, -estimated_cost_usd)
+ raise
+
record.execution_result = result
logger.info(
"approval_executed",
request_id=record.request_id,
agent_id=record.agent_id,
tool_name=record.tool_name,
+ estimated_cost_usd=estimated_cost_usd,
)
return result
diff --git a/implementations/acr-control-plane/src/acr/pillar6_authority/router.py b/implementations/acr-control-plane/src/acr/pillar6_authority/router.py
index 6df37b6..fd485cc 100644
--- a/implementations/acr-control-plane/src/acr/pillar6_authority/router.py
+++ b/implementations/acr-control-plane/src/acr/pillar6_authority/router.py
@@ -63,7 +63,9 @@ async def approve(
) -> ApprovalResponse:
record = await ap.approve(db, request_id, principal.subject, body.reason)
if settings.execute_allowed_actions:
- await ap.execute_approval(record)
+ await ap.execute_approval(db, record)
+ await db.commit()
+ await db.refresh(record)
return _to_response(record)
@@ -75,6 +77,8 @@ async def deny(
principal: OperatorPrincipal = Depends(require_operator_roles("approver", "security_admin")),
) -> ApprovalResponse:
record = await ap.deny(db, request_id, principal.subject, body.reason)
+ await db.commit()
+ await db.refresh(record)
return _to_response(record)
@@ -91,5 +95,7 @@ async def override(
raise HTTPException(status_code=422, detail="Break-glass override requires a reason")
record = await ap.override(db, request_id, principal.subject, body.reason)
if settings.execute_allowed_actions:
- await ap.execute_approval(record)
+ await ap.execute_approval(db, record)
+ await db.commit()
+ await db.refresh(record)
return _to_response(record)
diff --git a/implementations/acr-control-plane/tests/conftest.py b/implementations/acr-control-plane/tests/conftest.py
index 3b93a2e..656c649 100644
--- a/implementations/acr-control-plane/tests/conftest.py
+++ b/implementations/acr-control-plane/tests/conftest.py
@@ -115,9 +115,14 @@ async def sample_agent(db: AsyncSession):
owner="support-engineering@example.com",
purpose="Handle customer support tickets",
risk_tier="medium",
- allowed_tools=["query_customer_db", "send_email", "create_ticket"],
+ allowed_tools=["query_customer_db", "send_email", "create_ticket", "issue_refund"],
forbidden_tools=["delete_customer"],
- boundaries=AgentBoundaries(max_actions_per_minute=30, max_cost_per_hour_usd=5.0),
+ boundaries=AgentBoundaries(
+ max_actions_per_minute=30,
+ max_cost_per_hour_usd=5.0,
+ default_action_cost_usd=0.05,
+ tool_costs_usd={"query_customer_db": 0.01, "issue_refund": 0.25},
+ ),
)
record = await register_agent(db, req)
await db.commit()
diff --git a/implementations/acr-control-plane/tests/integration/test_integration.py b/implementations/acr-control-plane/tests/integration/test_integration.py
index 2b2e688..e7e3ebc 100644
--- a/implementations/acr-control-plane/tests/integration/test_integration.py
+++ b/implementations/acr-control-plane/tests/integration/test_integration.py
@@ -158,7 +158,7 @@ async def test_approval_flow_real_stack(async_client, db: AsyncSession):
f"/acr/approvals/{approval_id}/approve",
json={"notes": "Integration test approval"},
)
- assert approve_resp.status_code in (200, 404)
+ assert approve_resp.status_code == 200
# If denied, that's also valid — the policy blocked a forbidden tool
elif data.get("decision") == "deny":
diff --git a/implementations/acr-control-plane/tests/test_authority.py b/implementations/acr-control-plane/tests/test_authority.py
index 70712d5..07718fd 100644
--- a/implementations/acr-control-plane/tests/test_authority.py
+++ b/implementations/acr-control-plane/tests/test_authority.py
@@ -128,7 +128,7 @@ async def test_approve_via_http(
)
assert resp.status_code == 200
assert resp.json()["status"] == "approved"
- assert resp.json()["decided_by"] == "ops@example.com"
+ assert resp.json()["decided_by"] == "test-operator"
async def test_approve_via_http_executes_when_enabled(
self, async_client: AsyncClient, db: AsyncSession, sample_agent
diff --git a/implementations/acr-control-plane/tests/test_enterprise_hardening.py b/implementations/acr-control-plane/tests/test_enterprise_hardening.py
new file mode 100644
index 0000000..b666219
--- /dev/null
+++ b/implementations/acr-control-plane/tests/test_enterprise_hardening.py
@@ -0,0 +1,228 @@
+from __future__ import annotations
+
+import hmac
+import io
+import json
+import zipfile
+from unittest.mock import AsyncMock, MagicMock, patch
+
+import pytest
+from httpx import AsyncClient
+from sqlalchemy import select
+from sqlalchemy.ext.asyncio import AsyncSession
+
+from acr.common.errors import AuthoritativeSpendControlError
+from acr.config import settings
+from acr.db.models import TelemetryEventRecord
+from acr.gateway.spend_control import resolve_action_cost_usd
+from acr.pillar2_policy.engine import evaluate_policy
+from acr.pillar2_policy.models import PolicyDecision, PolicyEvaluationResult
+from acr.pillar4_observability.evidence import build_evidence_bundle
+from acr.pillar4_observability.integrity import verify_event_chain
+from acr.pillar4_observability.schema import LatencyBreakdown
+from acr.pillar4_observability.telemetry import build_event, persist_event
+from acr.pillar6_authority.approval import create_approval_request, execute_approval
+
+
+def _mock_opa_response(result: dict) -> AsyncMock:
+ response = MagicMock()
+ response.status_code = 200
+ response.json.return_value = {"result": result}
+ response.raise_for_status = MagicMock()
+ response.request = MagicMock()
+
+ client = AsyncMock()
+ client.post = AsyncMock(return_value=response)
+ client.__aenter__ = AsyncMock(return_value=client)
+ client.__aexit__ = AsyncMock(return_value=False)
+ return client
+
+
+class TestModifyDecision:
+ async def test_policy_engine_returns_modify_with_transformed_parameters(self) -> None:
+ client = _mock_opa_response(
+ {
+ "allow": False,
+ "deny": [],
+ "modify": True,
+ "modified_parameters": {"customer_id": "C-001", "note": "[REDACTED]"},
+ }
+ )
+
+ with patch("acr.pillar2_policy.engine.httpx.AsyncClient", return_value=client):
+ result = await evaluate_policy(
+ {"agent_id": "agent-1", "allowed_tools": ["query_customer_db"]},
+ {"tool_name": "query_customer_db", "parameters": {"customer_id": "C-001", "note": "secret"}},
+ {},
+ )
+
+ assert result.final_decision == "modify"
+ assert result.modified_parameters == {"customer_id": "C-001", "note": "[REDACTED]"}
+ assert result.decisions[0].decision == "modify"
+
+ async def test_gateway_returns_modified_action_payload(
+ self,
+ async_client: AsyncClient,
+ sample_agent,
+ ) -> None:
+ modify_result = PolicyEvaluationResult(
+ final_decision="modify",
+ decisions=[PolicyDecision(policy_id="acr-modify", decision="modify", reason="Transform before release")],
+ reason="Transform before release",
+ modified_parameters={"customer_id": "C-001", "note": "[REDACTED]"},
+ latency_ms=7,
+ )
+
+ with patch("acr.gateway.router.evaluate_policy", new_callable=AsyncMock, return_value=modify_result):
+ response = await async_client.post(
+ "/acr/evaluate",
+ json={
+ "agent_id": sample_agent.agent_id,
+ "action": {
+ "tool_name": "query_customer_db",
+ "parameters": {"customer_id": "C-001", "note": "call me at 555-123-4567"},
+ },
+ "context": {},
+ },
+ )
+
+ assert response.status_code == 200
+ payload = response.json()
+ assert payload["decision"] == "modify"
+ assert payload["modified_action"]["tool_name"] == "query_customer_db"
+ assert payload["modified_action"]["parameters"]["note"] == "[REDACTED]"
+
+ async def test_gateway_denies_modify_that_changes_tool_name(
+ self,
+ async_client: AsyncClient,
+ sample_agent,
+ ) -> None:
+ modify_result = PolicyEvaluationResult(
+ final_decision="modify",
+ decisions=[PolicyDecision(policy_id="acr-modify", decision="modify", reason="Transform before release")],
+ reason="Transform before release",
+ modified_action={"tool_name": "send_email", "parameters": {}},
+ latency_ms=5,
+ )
+
+ with patch("acr.gateway.router.evaluate_policy", new_callable=AsyncMock, return_value=modify_result):
+ response = await async_client.post(
+ "/acr/evaluate",
+ json={
+ "agent_id": sample_agent.agent_id,
+ "action": {"tool_name": "query_customer_db", "parameters": {"customer_id": "C-001"}},
+ "context": {},
+ },
+ )
+
+ assert response.status_code == 403
+ assert response.json()["error_code"] == "INVALID_MODIFY_DECISION"
+
+
+class TestEvidenceIntegrity:
+ async def test_persisted_events_form_a_signed_chain_and_bundle(
+ self,
+ db: AsyncSession,
+ sample_agent,
+ ) -> None:
+ correlation_id = "corr-signed-bundle"
+
+ for idx, tool_name in enumerate(("query_customer_db", "send_email"), start=1):
+ event = build_event(
+ event_type="ai_inference",
+ agent_id=sample_agent.agent_id,
+ agent_purpose=sample_agent.purpose,
+ agent_capabilities=sample_agent.allowed_tools or [],
+ correlation_id=correlation_id,
+ session_id="sess-1",
+ tool_name=tool_name,
+ parameters={"step": idx},
+ description=f"step {idx}",
+ context={"session_id": "sess-1"},
+ intent={"goal": "test integrity"},
+ start_time="2026-03-17T00:00:00Z",
+ end_time="2026-03-17T00:00:01Z",
+ duration_ms=10,
+ latency_breakdown=LatencyBreakdown(identity_ms=1, policy_ms=2, total_ms=3),
+ policies=[],
+ output_decision="allow",
+ output_reason=None,
+ approval_request_id=None,
+ drift_score=0.1,
+ )
+ await persist_event(db, event)
+
+ await db.commit()
+ result = await db.execute(
+ select(TelemetryEventRecord.payload)
+ .where(TelemetryEventRecord.correlation_id == correlation_id)
+ .order_by(TelemetryEventRecord.created_at.asc())
+ )
+ events = list(result.scalars().all())
+
+ integrity = verify_event_chain(events)
+ assert integrity["chain_valid"] is True
+ assert integrity["verified_events"] == 2
+
+ bundle = build_evidence_bundle(correlation_id=correlation_id, events=events)
+ with zipfile.ZipFile(io.BytesIO(bundle.bytes_data)) as archive:
+ names = set(archive.namelist())
+ assert "bundle.signature" in names
+ manifest_bytes = archive.read("manifest.json")
+ events_bytes = archive.read("events.jsonl")
+ signature = archive.read("bundle.signature").decode().strip()
+
+ expected_signature = hmac.new(
+ settings.audit_signing_secret.encode(),
+ manifest_bytes + b"\n" + events_bytes,
+ "sha256",
+ ).hexdigest()
+ assert signature == expected_signature
+
+
+class TestSpendControls:
+ def test_resolve_action_cost_prefers_tool_specific_boundaries(self) -> None:
+ manifest = {
+ "boundaries": {
+ "default_action_cost_usd": 0.10,
+ "tool_costs_usd": {"send_email": 0.02},
+ }
+ }
+
+ assert resolve_action_cost_usd(manifest, "send_email") == 0.02
+ assert resolve_action_cost_usd(manifest, "query_customer_db") == 0.10
+
+ def test_resolve_action_cost_requires_authoritative_config_in_production(self, monkeypatch) -> None:
+ monkeypatch.setattr(settings, "acr_env", "production")
+ with pytest.raises(AuthoritativeSpendControlError):
+ resolve_action_cost_usd({"boundaries": {}}, "send_email")
+
+ async def test_execute_approval_reserves_authoritative_spend(
+ self,
+ db: AsyncSession,
+ sample_agent,
+ ) -> None:
+ record = await create_approval_request(
+ db,
+ correlation_id="corr-approval-spend",
+ agent_id=sample_agent.agent_id,
+ tool_name="issue_refund",
+ parameters={"amount": 42},
+ description="Refund approved by finance",
+ approval_queue="finance",
+ sla_minutes=60,
+ )
+ await db.commit()
+
+ with (
+ patch("acr.pillar6_authority.approval.adjust_authoritative_spend", new_callable=AsyncMock) as adjust_spend,
+ patch(
+ "acr.pillar6_authority.approval.execute_action",
+ new_callable=AsyncMock,
+ return_value={"status": "executed"},
+ ),
+ ):
+ result = await execute_approval(db, record)
+
+ assert result["status"] == "executed"
+ adjust_spend.assert_awaited_once_with(sample_agent.agent_id, 0.25)
diff --git a/implementations/acr-control-plane/tests/test_executor.py b/implementations/acr-control-plane/tests/test_executor.py
index 1d26b04..a86f7ba 100644
--- a/implementations/acr-control-plane/tests/test_executor.py
+++ b/implementations/acr-control-plane/tests/test_executor.py
@@ -194,9 +194,11 @@ def test_production_requires_executor_secret_for_live_execution(monkeypatch) ->
monkeypatch.setattr(settings, "acr_env", "production")
monkeypatch.setattr(settings, "execute_allowed_actions", True)
monkeypatch.setattr(settings, "executor_hmac_secret", "")
+ monkeypatch.setattr(settings, "audit_signing_secret", "a" * 64)
monkeypatch.setattr(settings, "jwt_secret_key", "z" * 64)
monkeypatch.setattr(settings, "killswitch_secret", "k" * 64)
monkeypatch.setattr(settings, "operator_api_keys_json", '{"bootstrap":{"subject":"ops","roles":["security_admin"]}}')
+ monkeypatch.setattr(settings, "service_operator_api_key", "svc_" + ("x" * 40))
monkeypatch.setattr(settings, "oidc_enabled", False)
with pytest.raises(RuntimeError, match="EXECUTOR_HMAC_SECRET"):
@@ -207,9 +209,11 @@ def test_production_requires_strong_executor_credential_secret(monkeypatch) -> N
monkeypatch.setattr(settings, "acr_env", "production")
monkeypatch.setattr(settings, "execute_allowed_actions", False)
monkeypatch.setattr(settings, "executor_credential_secret", "short")
+ monkeypatch.setattr(settings, "audit_signing_secret", "a" * 64)
monkeypatch.setattr(settings, "jwt_secret_key", "z" * 64)
monkeypatch.setattr(settings, "killswitch_secret", "k" * 64)
monkeypatch.setattr(settings, "operator_api_keys_json", '{"bootstrap":{"subject":"ops","roles":["security_admin"]}}')
+ monkeypatch.setattr(settings, "service_operator_api_key", "svc_" + ("x" * 40))
monkeypatch.setattr(settings, "oidc_enabled", False)
with pytest.raises(RuntimeError, match="EXECUTOR_CREDENTIAL_SECRET"):
diff --git a/implementations/acr-control-plane/tests/test_gateway.py b/implementations/acr-control-plane/tests/test_gateway.py
index 8e947d0..0cd708c 100644
--- a/implementations/acr-control-plane/tests/test_gateway.py
+++ b/implementations/acr-control-plane/tests/test_gateway.py
@@ -112,6 +112,10 @@ async def test_escalate_decision(self, async_client: AsyncClient, sample_agent)
assert data["decision"] == "escalate"
assert "approval_request_id" in data
+ approval_resp = await async_client.get(f"/acr/approvals/{data['approval_request_id']}")
+ assert approval_resp.status_code == 200
+ assert approval_resp.json()["status"] == "pending"
+
async def test_allow_executes_downstream_when_enabled(self, async_client: AsyncClient, sample_agent) -> None:
with (
patch.object(__import__("acr.gateway.router", fromlist=["settings"]).settings, "execute_allowed_actions", True),
diff --git a/implementations/acr-control-plane/tests/test_observability.py b/implementations/acr-control-plane/tests/test_observability.py
index 78b0bc9..dfb0278 100644
--- a/implementations/acr-control-plane/tests/test_observability.py
+++ b/implementations/acr-control-plane/tests/test_observability.py
@@ -3,6 +3,7 @@
import pytest
+from acr.config import settings
from acr.pillar4_observability.schema import ACRTelemetryEvent
from acr.pillar4_observability.telemetry import build_event
from acr.pillar4_observability.schema import LatencyBreakdown
@@ -32,7 +33,7 @@ def test_build_allow_event(self) -> None:
approval_request_id=None,
drift_score=None,
)
- assert event.acr_version == "1.0"
+ assert event.acr_version == settings.acr_version
assert event.event_type == "ai_inference"
assert event.agent.agent_id == "test-agent"
assert event.output.decision == "allow"
diff --git a/implementations/acr-control-plane/tests/test_secret_hygiene.py b/implementations/acr-control-plane/tests/test_secret_hygiene.py
index 45f4400..55136c0 100644
--- a/implementations/acr-control-plane/tests/test_secret_hygiene.py
+++ b/implementations/acr-control-plane/tests/test_secret_hygiene.py
@@ -11,6 +11,7 @@ def test_generate_secrets_returns_expected_keys() -> None:
assert secret_bundle["JWT_SECRET_KEY"]
assert secret_bundle["KILLSWITCH_SECRET"]
+ assert secret_bundle["AUDIT_SIGNING_SECRET"]
assert secret_bundle["OPERATOR_SESSION_SECRET"]
assert secret_bundle["EXECUTOR_HMAC_SECRET"]
assert "platform-admin" in secret_bundle["OPERATOR_API_KEYS_JSON"]
diff --git a/implementations/acr-control-plane/tests/test_security_and_policy_services.py b/implementations/acr-control-plane/tests/test_security_and_policy_services.py
new file mode 100644
index 0000000..2272386
--- /dev/null
+++ b/implementations/acr-control-plane/tests/test_security_and_policy_services.py
@@ -0,0 +1,343 @@
+from __future__ import annotations
+
+from types import SimpleNamespace
+from unittest.mock import AsyncMock, MagicMock, patch
+
+import pytest
+from sqlalchemy.ext.asyncio import AsyncSession
+
+from acr.common.errors import PolicyDraftNotFoundError, PolicyReleaseNotFoundError, PolicyValidationError, UnauthorizedOperatorError
+from acr.common.oidc import (
+ _JWKS_CACHE,
+ _extract_claim_values,
+ _fetch_jwks,
+ _map_oidc_roles,
+ build_oidc_authorize_url,
+ create_signed_payload,
+ decode_signed_payload,
+ exchange_code_for_tokens,
+ new_oidc_state,
+ oidc_is_enabled,
+ validate_oidc_token,
+)
+from acr.config import oidc_role_mapping, settings
+from acr.db.models import PolicyReleaseRecord
+from acr.gateway.spend_control import adjust_authoritative_spend, get_authoritative_projected_spend
+from acr.policy_studio.models import PolicyDraftUpsertRequest
+from acr.policy_studio.releases import (
+ activate_policy_release,
+ get_policy_release,
+ list_active_policy_releases,
+ list_policy_releases,
+ publish_policy_draft,
+ rollback_policy_release,
+ validate_policy_draft_record,
+)
+from acr.policy_studio.service import create_policy_draft, get_policy_draft, list_policy_drafts, update_policy_draft
+
+
+@pytest.fixture(autouse=True)
+def _reset_oidc_cache() -> None:
+ _JWKS_CACHE["expires_at"] = 0.0
+ _JWKS_CACHE["keys"] = {}
+ oidc_role_mapping.cache_clear()
+
+
+class TestOIDCHelpers:
+ def test_oidc_is_enabled_reflects_settings(self, monkeypatch) -> None:
+ monkeypatch.setattr(settings, "oidc_enabled", True)
+ assert oidc_is_enabled() is True
+ monkeypatch.setattr(settings, "oidc_enabled", False)
+ assert oidc_is_enabled() is False
+
+ def test_extract_claim_values_handles_scalar_list_and_missing(self) -> None:
+ assert _extract_claim_values({"roles": "admin"}, "roles") == ["admin"]
+ assert _extract_claim_values({"roles": ["admin", 2]}, "roles") == ["admin", "2"]
+ assert _extract_claim_values({}, "roles") == []
+
+ def test_map_oidc_roles_uses_mapping_or_falls_back(self, monkeypatch) -> None:
+ monkeypatch.setattr(settings, "oidc_roles_claim", "groups")
+ monkeypatch.setattr(settings, "oidc_role_mapping_json", '{"grp-admin":["security_admin","auditor"]}')
+ oidc_role_mapping.cache_clear()
+ assert _map_oidc_roles({"groups": ["grp-admin"]}) == frozenset({"security_admin", "auditor"})
+
+ monkeypatch.setattr(settings, "oidc_role_mapping_json", "")
+ oidc_role_mapping.cache_clear()
+ assert _map_oidc_roles({"groups": ["plain-role"]}) == frozenset({"plain-role"})
+
+ def test_signed_payload_round_trip(self, monkeypatch) -> None:
+ monkeypatch.setattr(settings, "operator_session_secret", "s" * 64)
+ token = create_signed_payload({"subject": "alice", "roles": ["auditor"]}, ttl_seconds=60)
+ claims = decode_signed_payload(token)
+ assert claims["subject"] == "alice"
+ assert claims["roles"] == ["auditor"]
+
+ def test_build_oidc_authorize_url_and_state(self, monkeypatch) -> None:
+ monkeypatch.setattr(settings, "oidc_client_id", "client-123")
+ monkeypatch.setattr(settings, "oidc_redirect_uri", "https://example.com/callback")
+ monkeypatch.setattr(settings, "oidc_scopes", "openid email")
+ monkeypatch.setattr(settings, "oidc_authorize_url", "https://idp.example.com/authorize")
+ state, nonce = new_oidc_state()
+ url = build_oidc_authorize_url(state=state, nonce=nonce)
+ assert "client_id=client-123" in url
+ assert "state=" in url
+ assert "nonce=" in url
+ assert state and nonce and state != nonce
+
+ async def test_fetch_jwks_caches_successful_response(self, monkeypatch) -> None:
+ response = MagicMock()
+ response.json.return_value = {"keys": [{"kid": "k1"}]}
+ response.raise_for_status = MagicMock()
+ client = AsyncMock()
+ client.get = AsyncMock(return_value=response)
+ client.__aenter__ = AsyncMock(return_value=client)
+ client.__aexit__ = AsyncMock(return_value=False)
+
+ with patch("acr.common.oidc.httpx.AsyncClient", return_value=client):
+ first = await _fetch_jwks()
+ second = await _fetch_jwks()
+
+ assert first == second
+ assert client.get.await_count == 1
+
+ async def test_fetch_jwks_rejects_invalid_shape(self) -> None:
+ response = MagicMock()
+ response.json.return_value = {"unexpected": True}
+ response.raise_for_status = MagicMock()
+ client = AsyncMock()
+ client.get = AsyncMock(return_value=response)
+ client.__aenter__ = AsyncMock(return_value=client)
+ client.__aexit__ = AsyncMock(return_value=False)
+
+ with (
+ patch("acr.common.oidc.httpx.AsyncClient", return_value=client),
+ pytest.raises(UnauthorizedOperatorError, match="JWKS response is invalid"),
+ ):
+ await _fetch_jwks()
+
+ async def test_validate_oidc_token_success_and_nonce_check(self, monkeypatch) -> None:
+ monkeypatch.setattr(settings, "oidc_enabled", True)
+ monkeypatch.setattr(settings, "oidc_client_id", "client-123")
+ monkeypatch.setattr(settings, "oidc_issuer", "https://idp.example.com")
+ monkeypatch.setattr(settings, "oidc_subject_claim", "email")
+ monkeypatch.setattr(settings, "oidc_roles_claim", "groups")
+ monkeypatch.setattr(settings, "oidc_role_mapping_json", '{"grp-admin":["security_admin"]}')
+ oidc_role_mapping.cache_clear()
+
+ with (
+ patch("acr.common.oidc._fetch_jwks", new_callable=AsyncMock, return_value={"keys": [{"kid": "kid-1"}]}),
+ patch("acr.common.oidc.jwt.get_unverified_header", return_value={"kid": "kid-1"}),
+ patch(
+ "acr.common.oidc.jwt.decode",
+ return_value={"email": "alice@example.com", "groups": ["grp-admin"], "nonce": "n-123"},
+ ),
+ ):
+ principal = await validate_oidc_token("token-123", nonce="n-123")
+
+ assert principal.subject == "alice@example.com"
+ assert principal.roles == frozenset({"security_admin"})
+
+ async def test_validate_oidc_token_rejects_missing_key_or_nonce(self, monkeypatch) -> None:
+ monkeypatch.setattr(settings, "oidc_enabled", True)
+ with (
+ patch("acr.common.oidc._fetch_jwks", new_callable=AsyncMock, return_value={"keys": []}),
+ patch("acr.common.oidc.jwt.get_unverified_header", return_value={"kid": "missing"}),
+ pytest.raises(UnauthorizedOperatorError, match="signing key not found"),
+ ):
+ await validate_oidc_token("token-123")
+
+ with (
+ patch("acr.common.oidc._fetch_jwks", new_callable=AsyncMock, return_value={"keys": [{"kid": "kid-1"}]}),
+ patch("acr.common.oidc.jwt.get_unverified_header", return_value={"kid": "kid-1"}),
+ patch("acr.common.oidc.jwt.decode", return_value={"sub": "alice", "nonce": "bad"}),
+ pytest.raises(UnauthorizedOperatorError, match="nonce validation failed"),
+ ):
+ await validate_oidc_token("token-123", nonce="expected")
+
+ async def test_exchange_code_for_tokens_success_and_missing_id_token(self, monkeypatch) -> None:
+ monkeypatch.setattr(settings, "oidc_redirect_uri", "https://example.com/callback")
+ monkeypatch.setattr(settings, "oidc_client_id", "client-123")
+ monkeypatch.setattr(settings, "oidc_client_secret", "secret-123")
+ monkeypatch.setattr(settings, "oidc_token_url", "https://idp.example.com/token")
+
+ good_response = MagicMock()
+ good_response.raise_for_status = MagicMock()
+ good_response.json.return_value = {"id_token": "jwt", "access_token": "access"}
+
+ bad_response = MagicMock()
+ bad_response.raise_for_status = MagicMock()
+ bad_response.json.return_value = {"access_token": "access"}
+
+ client = AsyncMock()
+ client.post = AsyncMock(side_effect=[good_response, bad_response])
+ client.__aenter__ = AsyncMock(return_value=client)
+ client.__aexit__ = AsyncMock(return_value=False)
+
+ with patch("acr.common.oidc.httpx.AsyncClient", return_value=client):
+ tokens = await exchange_code_for_tokens("code-1")
+ assert tokens["id_token"] == "jwt"
+ with pytest.raises(UnauthorizedOperatorError, match="did not include an id_token"):
+ await exchange_code_for_tokens("code-2")
+
+
+class TestSpendControlEdges:
+ async def test_projected_spend_and_adjust_use_redis_when_available(self) -> None:
+ redis = AsyncMock()
+ redis.get = AsyncMock(return_value="1.25")
+ redis.incrbyfloat = AsyncMock()
+ redis.expire = AsyncMock()
+
+ with patch("acr.gateway.spend_control.get_redis_or_none", return_value=redis):
+ projected = await get_authoritative_projected_spend("agent-1", 0.5)
+ await adjust_authoritative_spend("agent-1", 0.5)
+
+ assert projected == 1.75
+ redis.incrbyfloat.assert_awaited()
+ redis.expire.assert_awaited()
+
+
+class TestPolicyDraftServices:
+ async def test_create_list_get_and_update_policy_draft_directly(self, db: AsyncSession) -> None:
+ req = PolicyDraftUpsertRequest(
+ name="Draft One",
+ agent_id="agent-1",
+ template="customer_support",
+ manifest={"agent_id": "agent-1", "purpose": "Help customers", "allowed_tools": ["query_customer_db"]},
+ rego_policy="package acr\nallow if true",
+ wizard_inputs={"template": "customer_support"},
+ )
+ record = await create_policy_draft(db, req=req, actor="alice")
+ drafts = await list_policy_drafts(db)
+ fetched = await get_policy_draft(db, record.draft_id)
+
+ assert drafts[0].draft_id == record.draft_id
+ assert fetched.created_by == "alice"
+
+ updated = await update_policy_draft(
+ db,
+ draft_id=record.draft_id,
+ req=req.model_copy(update={"name": "Draft Two"}),
+ actor="bob",
+ )
+ assert updated.name == "Draft Two"
+ assert updated.updated_by == "bob"
+
+ with pytest.raises(PolicyDraftNotFoundError):
+ await get_policy_draft(db, "pdr-missing")
+
+
+class TestPolicyReleaseServices:
+ async def test_validate_policy_draft_record_catches_issues_and_warnings(self, db: AsyncSession) -> None:
+ draft = await create_policy_draft(
+ db,
+ req=PolicyDraftUpsertRequest(
+ name="Bad Draft",
+ agent_id="agent-2",
+ template="customer_support",
+ manifest={"agent_id": "", "purpose": "", "risk_tier": "high", "allowed_tools": []},
+ rego_policy="",
+ wizard_inputs={"escalate_tool": "issue_refund"},
+ ),
+ actor="alice",
+ )
+ validation = validate_policy_draft_record(draft)
+ assert validation.valid is False
+ assert validation.issues
+ assert validation.warnings
+
+ async def test_publish_activate_and_rollback_policy_release(self, db: AsyncSession) -> None:
+ req = PolicyDraftUpsertRequest(
+ name="Good Draft",
+ agent_id="agent-3",
+ template="customer_support",
+ manifest={"agent_id": "agent-3", "purpose": "Help customers", "allowed_tools": ["query_customer_db"]},
+ rego_policy="package acr\nallow if true",
+ wizard_inputs={"template": "customer_support"},
+ )
+ draft = await create_policy_draft(db, req=req, actor="alice")
+
+ published_artifact = SimpleNamespace(uri="s3://bundle-1.tar.gz", sha256="abc123", backend="s3")
+ active_artifact = SimpleNamespace(uri="s3://active-bundle.tar.gz")
+
+ with (
+ patch("acr.policy_studio.releases.build_policy_bundle", return_value=b"bundle-bytes"),
+ patch("acr.policy_studio.releases.publish_policy_bundle", return_value=published_artifact),
+ patch("acr.policy_studio.releases.publish_active_policy_bundle", return_value=active_artifact),
+ ):
+ release = await publish_policy_draft(db, draft=draft, actor="alice", notes="first publish")
+ await db.commit()
+ activated = await activate_policy_release(db, release_id=release.release_id, actor="bob")
+ await db.commit()
+ rolled_back = await rollback_policy_release(db, release_id=release.release_id, actor="carol")
+
+ releases = await list_policy_releases(db)
+ active_releases = await list_active_policy_releases(db)
+ assert release.artifact_uri == "s3://bundle-1.tar.gz"
+ assert activated.active_bundle_uri == "s3://active-bundle.tar.gz"
+ assert any(item.release_id == rolled_back.release_id for item in releases)
+ assert any(item.release_id == activated.release_id for item in active_releases)
+ assert rolled_back.rollback_from_release_id == release.release_id
+
+ async def test_publish_rejects_invalid_policy_and_release_lookup_not_found(self, db: AsyncSession) -> None:
+ invalid_draft = await create_policy_draft(
+ db,
+ req=PolicyDraftUpsertRequest(
+ name="Invalid Draft",
+ agent_id="agent-4",
+ template="customer_support",
+ manifest={"agent_id": "agent-4", "purpose": "Missing tools"},
+ rego_policy="package acr",
+ wizard_inputs={},
+ ),
+ actor="alice",
+ )
+
+ with pytest.raises(PolicyValidationError):
+ await publish_policy_draft(db, draft=invalid_draft, actor="alice")
+
+ with pytest.raises(PolicyReleaseNotFoundError):
+ await get_policy_release(db, "prl-missing")
+
+ async def test_activate_policy_release_deactivates_previous_active_release(self, db: AsyncSession) -> None:
+ previous = PolicyReleaseRecord(
+ release_id="prl-old",
+ draft_id="pdr-old",
+ agent_id="agent-5",
+ version=1,
+ name="Old Release",
+ template="customer_support",
+ manifest={"agent_id": "agent-5", "purpose": "Help customers", "allowed_tools": ["query_customer_db"]},
+ rego_policy="package acr\nallow if true",
+ status="published",
+ activation_status="active",
+ active_bundle_uri="s3://old-active.tar.gz",
+ activated_by="alice",
+ )
+ current = PolicyReleaseRecord(
+ release_id="prl-new",
+ draft_id="pdr-new",
+ agent_id="agent-5",
+ version=2,
+ name="New Release",
+ template="customer_support",
+ manifest={"agent_id": "agent-5", "purpose": "Help customers", "allowed_tools": ["query_customer_db"]},
+ rego_policy="package acr\nallow if true",
+ status="published",
+ activation_status="inactive",
+ )
+ db.add(previous)
+ db.add(current)
+ await db.flush()
+
+ with (
+ patch("acr.policy_studio.releases.build_policy_bundle", return_value=b"bundle-bytes"),
+ patch(
+ "acr.policy_studio.releases.publish_active_policy_bundle",
+ return_value=SimpleNamespace(uri="s3://new-active.tar.gz"),
+ ),
+ ):
+ activated = await activate_policy_release(db, release_id="prl-new", actor="bob")
+
+ assert activated.activation_status == "active"
+ assert previous.activation_status == "inactive"
+ assert previous.active_bundle_uri is None
diff --git a/implementations/acr-control-plane/var/policy_bundles/publish-bot-01/active/current.tar.gz b/implementations/acr-control-plane/var/policy_bundles/publish-bot-01/active/current.tar.gz
deleted file mode 100644
index 54897a9..0000000
Binary files a/implementations/acr-control-plane/var/policy_bundles/publish-bot-01/active/current.tar.gz and /dev/null differ
diff --git a/implementations/acr-control-plane/var/policy_bundles/publish-bot-01/v1/publish-bot-01-v1.tar.gz b/implementations/acr-control-plane/var/policy_bundles/publish-bot-01/v1/publish-bot-01-v1.tar.gz
deleted file mode 100644
index 54897a9..0000000
Binary files a/implementations/acr-control-plane/var/policy_bundles/publish-bot-01/v1/publish-bot-01-v1.tar.gz and /dev/null differ
diff --git a/implementations/acr-control-plane/var/policy_bundles/publish-bot-01/v2/publish-bot-01-v2.tar.gz b/implementations/acr-control-plane/var/policy_bundles/publish-bot-01/v2/publish-bot-01-v2.tar.gz
deleted file mode 100644
index de9e894..0000000
Binary files a/implementations/acr-control-plane/var/policy_bundles/publish-bot-01/v2/publish-bot-01-v2.tar.gz and /dev/null differ
diff --git a/implementations/acr-control-plane/var/policy_bundles/runtime-bot-01/active/current.tar.gz b/implementations/acr-control-plane/var/policy_bundles/runtime-bot-01/active/current.tar.gz
deleted file mode 100644
index 4ea2f51..0000000
Binary files a/implementations/acr-control-plane/var/policy_bundles/runtime-bot-01/active/current.tar.gz and /dev/null differ
diff --git a/implementations/acr-control-plane/var/policy_bundles/runtime-bot-01/v1/runtime-bot-01-v1.tar.gz b/implementations/acr-control-plane/var/policy_bundles/runtime-bot-01/v1/runtime-bot-01-v1.tar.gz
deleted file mode 100644
index 4ea2f51..0000000
Binary files a/implementations/acr-control-plane/var/policy_bundles/runtime-bot-01/v1/runtime-bot-01-v1.tar.gz and /dev/null differ