Skip to content
Merged
2 changes: 2 additions & 0 deletions bin/ci/run_e2e_smoke.sh
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,6 @@ pytest -v \
tests/e2e/test_health.py \
tests/e2e/test_decision_api.py \
tests/e2e/test_metrics.py \
tests/e2e/test_llm_openai_contract.py \
tests/e2e/test_llm_streaming.py \
--junitxml="${ARTIFACTS_DIR}/junit.xml"
49 changes: 49 additions & 0 deletions spec/integration/features/rule_engine.feature
Original file line number Diff line number Diff line change
Expand Up @@ -53,3 +53,52 @@ Feature: Rule engine golden integration behavior
And request context is path /v1/chat with jwt org_id org-1 and no precomputed descriptors
When I run rule engine evaluation
Then integration decision is reject with reason kill_switch

Rule: Issue #4 regression and P0 coverage
Scenario: header_hint estimator works in full integration for Circuit Breaker
Given the full chain integration is reset with real bundle_loader and token_bucket
And a real bundle with token_bucket_llm rule and header_hint estimator is loaded
And request context is path /v1/chat with jwt org_id org-1
And request context has header X-Token-Estimate 5000
When I run rule engine evaluation
Then integration decision is allow all rules passed
And circuit breaker was checked with cost 6000

Scenario: UC-09 SaaS Unavailable - Edge enforces policy even when audit event queue fails
# Uses a reject decision (tpm_exceeded) so queue_event IS called, proving
# that a saas_client failure does not block or corrupt the enforcement decision.
Given the full chain integration is reset with real bundle_loader and token_bucket
And a real bundle with token_bucket_llm rule and TPM 1000 is loaded
And SaaS client is configured but unreachable
And request context is path /v1/chat with jwt org_id org-1
And request context has header X-Token-Estimate 1500
When I run rule engine evaluation
Then integration decision is reject with reason "tpm_exceeded"
And saas queue_event was attempted but did not block the decision

Scenario: UC-16 OpenAI-compatible rate limiting headers
# Tests rule_engine decision.headers output (pre-HTTP layer).
# X-Fairvisor-Reason is stripped by decision_api in non-debug mode — see decision_api_spec.lua:519.
Given the full chain integration is reset with real bundle_loader and token_bucket
And a real bundle with token_bucket_llm rule and TPM 1000 is loaded
And request context is path /v1/chat with jwt org_id org-1
And request context has header X-Token-Estimate 1500
When I run rule engine evaluation
Then integration decision is reject with reason "tpm_exceeded"
And decision headers include "X-Fairvisor-Reason" with value "tpm_exceeded"
And decision headers include "Retry-After"
And decision headers include "RateLimit-Limit" with value "1000"
And decision headers include "RateLimit-Remaining" with value "0"
And decision headers include "RateLimit-Reset"
And decision headers include "RateLimit" matching pattern p_tpm.*;r=0;t=%d+

Scenario: UC-18 Token usage shadow mode
Given the full chain integration is reset with real bundle_loader and token_bucket
And a real bundle with token_bucket_llm rule in shadow mode is loaded
And request context is path /v1/chat with jwt org_id org-1
And request context has header X-Token-Estimate 15000
When I run rule engine evaluation
Then integration decision is allow
And integration decision mode is "shadow"
And would_reject is true

235 changes: 234 additions & 1 deletion spec/integration/rule_engine_spec.lua
Original file line number Diff line number Diff line change
Expand Up @@ -217,8 +217,22 @@ local function _setup_full_chain(ctx)
local health = require("fairvisor.health")
ctx.loader = loader
ctx.health = health

-- Mock saas_client that can be toggled to fail
ctx.saas_client = {
queue_event = function(event)
ctx.saas_event_attempts = (ctx.saas_event_attempts or 0) + 1
if ctx.saas_unreachable then
return nil, "SaaS is unreachable"
end
ctx.queued_events = ctx.queued_events or {}
ctx.queued_events[#ctx.queued_events + 1] = event
return true
end
}

ctx.engine = require("fairvisor.rule_engine")
ctx.engine.init({ dict = ctx.dict })
ctx.engine.init({ dict = ctx.dict, health = ctx.health, saas_client = ctx.saas_client })
end

runner:given("^the full chain integration is reset with real bundle_loader and token_bucket$", function(ctx)
Expand Down Expand Up @@ -422,6 +436,10 @@ runner:then_("^integration decision is allow all rules passed$", function(ctx)
assert.equals("all_rules_passed", ctx.decision.reason)
end)

runner:then_("^integration decision is allow$", function(ctx)
assert.equals("allow", ctx.decision.action)
end)

runner:then_("^missing descriptor log was emitted$", function(ctx)
local found = false
for i = 1, #ctx.logs do
Expand Down Expand Up @@ -459,4 +477,219 @@ runner:then_("^loop short%-circuited circuit and limiter checks$", function(ctx)
assert.is_true(saw_loop)
end)

runner:given("^a real bundle with token_bucket_llm rule and header_hint estimator is loaded$", function(ctx)
local bundle = mock_bundle.new_bundle({
bundle_version = 102,
policies = {
{
id = "p_llm",
spec = {
selector = { pathPrefix = "/v1/", methods = { "POST" } },
mode = "enforce",
circuit_breaker = {
enabled = true,
spend_rate_threshold_per_minute = 10000,
window_seconds = 60
},
rules = {
{
name = "r_llm",
limit_keys = { "jwt:org_id" },
algorithm = "token_bucket_llm",
algorithm_config = {
tokens_per_minute = 10000,
default_max_completion = 1000,
token_source = { estimator = "header_hint" }
},
},
},
},
},
},
})
local payload = mock_bundle.encode(bundle)
local compiled, err = ctx.loader.load_from_string(payload, nil, nil)
assert.is_table(compiled, tostring(err))
local ok, apply_err = ctx.loader.apply(compiled)
assert.is_true(ok, tostring(apply_err))
ctx.bundle = ctx.loader.get_current()

-- Mock circuit_breaker to record calls
local real_cb = require("fairvisor.circuit_breaker")
ctx.circuit_calls = {}
package.loaded["fairvisor.circuit_breaker"] = {
check = function(dict, config, key, cost, now)
ctx.circuit_calls[#ctx.circuit_calls + 1] = { key = key, cost = cost }
return real_cb.check(dict, config, key, cost, now)
end,
}
-- Reload rule_engine to use mocked circuit_breaker
package.loaded["fairvisor.rule_engine"] = nil
ctx.engine = require("fairvisor.rule_engine")
ctx.engine.init({ dict = ctx.dict })
end)

runner:given("^request context has header X%-Token%-Estimate (%d+)$", function(ctx, estimate)
ctx.request_context.headers["X-Token-Estimate"] = tostring(estimate)
end)

runner:then_("^circuit breaker was checked with cost (%d+)$", function(ctx, expected_cost)
local found = false
for i = 1, #ctx.circuit_calls do
if ctx.circuit_calls[i].cost == tonumber(expected_cost) then
found = true
break
end
end
assert.is_true(found, "Circuit breaker should be called with cost " .. expected_cost)
end)

runner:given("^a real bundle is loaded and applied$", function(ctx)
local bundle = mock_bundle.new_bundle({ bundle_version = 103 })
local payload = mock_bundle.encode(bundle)
local compiled, _ = ctx.loader.load_from_string(payload, nil, nil)
ctx.loader.apply(compiled)
ctx.bundle = ctx.loader.get_current()
end)

runner:given("^SaaS client is configured but unreachable$", function(ctx)
ctx.saas_unreachable = true
end)

runner:then_("^saas queue_event was attempted but did not block the decision$", function(ctx)
-- queue_event was called (reject path always audits) and failed, but decision was still returned
assert.is_true((ctx.saas_event_attempts or 0) > 0, "saas queue_event should have been attempted")
assert.is_table(ctx.decision)
assert.is_nil(ctx.decision.error)
end)

runner:given("^a real bundle with token_bucket_llm rule and TPM (%d+) is loaded$", function(ctx, tpm)
local bundle = mock_bundle.new_bundle({
bundle_version = 104,
policies = {
{
id = "p_tpm",
spec = {
selector = { pathPrefix = "/v1/", methods = { "POST" } },
mode = "enforce",
rules = {
{
name = "r_tpm",
limit_keys = { "jwt:org_id" },
algorithm = "token_bucket_llm",
algorithm_config = {
tokens_per_minute = tonumber(tpm),
default_max_completion = 100,
token_source = { estimator = "header_hint" }
},
},
},
},
},
},
})
local payload = mock_bundle.encode(bundle)
local compiled, _ = ctx.loader.load_from_string(payload, nil, nil)
ctx.loader.apply(compiled)
ctx.bundle = ctx.loader.get_current()
end)

runner:then_('^integration decision is reject with reason "([^"]+)"$', function(ctx, reason)
assert.equals("reject", ctx.decision.action)
assert.equals(reason, ctx.decision.reason)
end)

runner:then_('^decision headers include "([^"]+)" with value "([^"]+)"$', function(ctx, name, value)
assert.equals(value, ctx.decision.headers[name])
end)

runner:then_('^decision headers include "([^"]+)"$', function(ctx, name)
assert.is_not_nil(ctx.decision.headers[name])
end)

runner:then_('^decision headers include "([^"]+)" matching pattern (.+)$', function(ctx, name, pattern)
assert.is_not_nil(ctx.decision.headers[name])
assert.matches(pattern, ctx.decision.headers[name])
end)

runner:given("^a real bundle with token_bucket_llm rule in shadow mode is loaded$", function(ctx)
local bundle = mock_bundle.new_bundle({
bundle_version = 105,
policies = {
{
id = "p_shadow",
spec = {
selector = { pathPrefix = "/v1/", methods = { "POST" } },
mode = "shadow",
rules = {
{
name = "r_shadow",
limit_keys = { "jwt:org_id" },
algorithm = "token_bucket_llm",
algorithm_config = {
tokens_per_minute = 1000,
default_max_completion = 100,
token_source = { estimator = "header_hint" }
},
},
},
},
},
},
})
local payload = mock_bundle.encode(bundle)
local compiled, _ = ctx.loader.load_from_string(payload, nil, nil)
ctx.loader.apply(compiled)
ctx.bundle = ctx.loader.get_current()
end)

runner:then_('^integration decision mode is "([^"]+)"$', function(ctx, mode)
assert.equals(mode, ctx.decision.mode)
end)

runner:then_("^would_reject is true$", function(ctx)
assert.is_true(ctx.decision.would_reject)
end)

runner:given("^a real bundle with token_bucket_llm rule and TPM 0 is loaded$", function(ctx)
local bundle = mock_bundle.new_bundle({
bundle_version = 106,
policies = {
{
id = "p_bot",
spec = {
selector = { pathPrefix = "/v1/", methods = { "POST" } },
mode = "enforce",
rules = {
{
name = "bot_rule",
limit_keys = { "jwt:org_id" },
algorithm = "token_bucket_llm",
algorithm_config = {
tokens_per_minute = 0,
burst_tokens = 0,
default_max_completion = 100,
},
},
},
},
},
},
})
local payload = mock_bundle.encode(bundle)
local compiled, _ = ctx.loader.load_from_string(payload, nil, nil)
ctx.loader.apply(compiled)
ctx.bundle = ctx.loader.get_current()
end)

runner:given("^request context is path /v1/chat with jwt org_id bot%-org$", function(ctx)
ctx.request_context = {
method = "POST",
path = "/v1/chat",
headers = {},
query_params = {},
jwt_claims = { org_id = "bot-org" },
}
end)

runner:feature_file_relative("features/rule_engine.feature")
57 changes: 56 additions & 1 deletion spec/integration/streaming_spec.lua
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,14 @@ runner:given("^the nginx integration mock environment is reset$", function(ctx)
}
end)

runner:given("^stream limit behavior is error_chunk$", function(ctx)
ctx.config.streaming.on_limit_exceeded = "error_chunk"
end)

runner:given("^shadow mode is enabled for the request$", function(ctx)
ctx.reservation.is_shadow = true
end)

runner:when("^I initialize streaming for the request$", function(ctx)
ctx.stream_ctx = streaming.init_stream(ctx.config, ctx.request_context, ctx.reservation)
end)
Expand All @@ -89,11 +97,37 @@ runner:then_("^TK%-005 truncates with length finish reason and done marker$", fu
assert.is_true(ctx.stream_ctx.truncated)
end)

runner:then_("^TK%-006 truncates with rate_limit_error and done marker$", function(ctx)
assert.matches('"type":"rate_limit_error"', ctx.out2)
assert.matches('data: %[DONE%]\n\n$', ctx.out2)
assert.is_true(ctx.stream_ctx.truncated)
end)

runner:then_("^TK%-007 passes through both chunks and reconciles later$", function(ctx)
assert.matches('^data: %{"choices":%[%{"delta":%{"content":"y+"%}%}%]%}%\n\n$', ctx.out1)
assert.matches('^data: %{"choices":%[%{"delta":%{"content":"y+"%}%}%]%}%\n\n$', ctx.out2)
assert.is_not_true(ctx.stream_ctx.truncated)
-- End the stream to check reconciliation
streaming.body_filter("data: [DONE]\n\n", false)
assert.equals(1, #reconcile_calls)
assert.equals(120 + ctx.reservation.prompt_tokens, reconcile_calls[1].actual_total)
end)

runner:then_("^TK%-004 completes stream and reconciles once$", function(ctx)
assert.matches('^data: %{"choices":%[%{"delta":%{"content":"y+"%}%}%]%}%\n\n$', ctx.out1)
assert.equals("data: [DONE]\n\n", ctx.out2)
assert.equals(1, #reconcile_calls)
assert.equals(60, reconcile_calls[1].actual_total)
assert.equals(20 + ctx.reservation.prompt_tokens, reconcile_calls[1].actual_total)
end)

runner:given("^include_partial_usage is disabled for the stream$", function(ctx)
ctx.config.streaming.include_partial_usage = false
end)

runner:then_("^TK%-008 truncates without usage fragment$", function(ctx)
assert.matches('finish_reason":"length"', ctx.out2)
assert.not_matches('"usage":', ctx.out2)
assert.matches('data: %[DONE%]\n\n$', ctx.out2)
end)

runner:feature([[
Expand All @@ -110,4 +144,25 @@ Feature: Streaming enforcement integration flows
When I initialize streaming for the request
And I process two chunks with 60 and 60 completion tokens
Then TK-005 truncates with length finish reason and done marker

Scenario: TK-006 Mid-Stream Truncation with Error Chunk
Given the nginx integration mock environment is reset
And stream limit behavior is error_chunk
When I initialize streaming for the request
And I process two chunks with 60 and 60 completion tokens
Then TK-006 truncates with rate_limit_error and done marker

Scenario: TK-007 Shadow Mode does not truncate
Given the nginx integration mock environment is reset
And shadow mode is enabled for the request
When I initialize streaming for the request
And I process two chunks with 60 and 60 completion tokens
Then TK-007 passes through both chunks and reconciles later

Scenario: TK-008 Truncation without partial usage
Given the nginx integration mock environment is reset
And include_partial_usage is disabled for the stream
When I initialize streaming for the request
And I process two chunks with 60 and 60 completion tokens
Then TK-008 truncates without usage fragment
]])
Loading
Loading