From 91548902b7bdaedc622bb192dfb4a4a866a751e3 Mon Sep 17 00:00:00 2001 From: Alexey Zimarev Date: Mon, 13 Apr 2026 16:54:02 +0200 Subject: [PATCH 1/2] [DEV-1440] milestone 2: daemon RunEvalCommand handler MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Daemon side of the dashboard-driven eval pipeline. Pairs with the server M3 endpoint in kurrent-io/Kurrent.Capacitor#477 and depends on the M1 shared eval library in #14. - New SignalR wire types in Models.cs match the server's DaemonCommands.cs: RunEvalCommand (server -> daemon dispatch) plus the four daemon -> server progress events (EvalStarted, EvalQuestionCompleted, EvalFinished, EvalFailed). Registered in KapacitorJsonContext for source-gen serialization. - ServerConnection registers a "RunEval" handler and exposes per-event send methods (EvalStartedAsync etc.) that mirror the existing AgentRegisteredAsync / LaunchFailedAsync pattern. - New EvalRunner singleton subscribes to OnRunEval. Each incoming command spawns a fire-and-forget Task that builds an authenticated HttpClient, instantiates a DaemonEvalObserver bound to the run, and drives EvalService.RunAsync. Unhandled exceptions are caught and translated to an EvalFailed relay so the dashboard learns about daemon-side failures rather than waiting forever. - DaemonEvalObserver maps the IEvalObserver surface to SignalR sends: OnStarted -> EvalStartedAsync, OnQuestionCompleted -> EvalQuestionCompletedAsync, OnFinished -> EvalFinishedAsync, OnFailed -> EvalFailedAsync. Info / per-question-start / per-question-failure / fact-retained callbacks just log locally — they're not interesting enough to justify SignalR chatter for every judge. - Wired into DaemonRunner DI: AddSingleton + an explicit GetRequiredService at startup so the constructor's OnRunEval subscription happens before the host starts taking traffic. Full suite 205/205, AOT publish clean. Co-Authored-By: Claude Opus 4.6 (1M context) --- src/kapacitor/Daemon/DaemonRunner.cs | 5 + src/kapacitor/Daemon/Services/EvalRunner.cs | 127 ++++++++++++++++++ .../Daemon/Services/ServerConnection.cs | 16 +++ src/kapacitor/Models.cs | 51 +++++++ 4 files changed, 199 insertions(+) create mode 100644 src/kapacitor/Daemon/Services/EvalRunner.cs diff --git a/src/kapacitor/Daemon/DaemonRunner.cs b/src/kapacitor/Daemon/DaemonRunner.cs index 90b8935..fa56bfc 100644 --- a/src/kapacitor/Daemon/DaemonRunner.cs +++ b/src/kapacitor/Daemon/DaemonRunner.cs @@ -121,6 +121,7 @@ public static async Task RunAsync(string[] args) { builder.Services.AddHttpClient("Attachments", client => client.BaseAddress = new Uri(config.ServerUrl)); builder.Services.AddSingleton(); + builder.Services.AddSingleton(); var host = builder.Build(); var logger = host.Services.GetRequiredService().CreateLogger("kapacitor.Daemon"); @@ -135,6 +136,10 @@ public static async Task RunAsync(string[] args) { await worktreeManager.CleanupOrphanedAsync(); var orchestrator = host.Services.GetRequiredService(); + // Instantiate EvalRunner so it subscribes to OnRunEval on the + // ServerConnection. It's stateless beyond the subscription, so no + // disposal dance is needed. + _ = host.Services.GetRequiredService(); try { await host.RunAsync(); diff --git a/src/kapacitor/Daemon/Services/EvalRunner.cs b/src/kapacitor/Daemon/Services/EvalRunner.cs new file mode 100644 index 0000000..1ed132d --- /dev/null +++ b/src/kapacitor/Daemon/Services/EvalRunner.cs @@ -0,0 +1,127 @@ +using kapacitor.Eval; +using Microsoft.Extensions.Logging; + +namespace kapacitor.Daemon.Services; + +/// +/// DEV-1440 milestone 2 — handles dispatched +/// from the dashboard via SignalR. The daemon pulls an authenticated +/// HTTP client, runs with a +/// that relays progress back over the +/// SignalR connection, and completes asynchronously — the SignalR +/// RunEval hub invocation fires-and-forgets; progress comes back +/// through EvalStarted / EvalQuestionCompleted / etc. +/// +public sealed class EvalRunner { + readonly ServerConnection _connection; + readonly ILogger _logger; + readonly string _baseUrl; + + public EvalRunner(ServerConnection connection, DaemonConfig config, ILogger logger) { + _connection = connection; + _logger = logger; + _baseUrl = config.ServerUrl.TrimEnd('/'); + + _connection.OnRunEval += HandleRunEvalAsync; + } + + async Task HandleRunEvalAsync(RunEvalCommand cmd) { + _logger.LogInformation( + "Dispatched eval {RunId} for session {Sid} (model {Model}, chain={Chain})", + cmd.EvalRunId, cmd.SessionId, cmd.Model, cmd.Chain + ); + + // Fire-and-forget so the SignalR hub invocation doesn't block; the + // eval's own observer callbacks fan progress back to the server. + // Any unhandled exception from the run is caught and translated to + // an EvalFailed event so the dashboard learns about it. + _ = Task.Run(async () => { + try { + using var httpClient = await HttpClientExtensions.CreateAuthenticatedClientAsync(); + var observer = new DaemonEvalObserver(_connection, cmd.EvalRunId, cmd.SessionId, _logger); + + await EvalService.RunAsync( + _baseUrl, + httpClient, + cmd.SessionId, + cmd.Model, + cmd.Chain, + cmd.ThresholdBytes, + observer + ); + } catch (Exception ex) { + _logger.LogError(ex, "Unhandled exception running eval {RunId} on session {Sid}", cmd.EvalRunId, cmd.SessionId); + + try { + await _connection.EvalFailedAsync(cmd.EvalRunId, cmd.SessionId, $"daemon error: {ex.GetType().Name}"); + } catch (Exception relayEx) { + _logger.LogError(relayEx, "Failed to relay EvalFailed for eval {RunId}", cmd.EvalRunId); + } + } + }); + } +} + +/// +/// implementation that pushes the shaped +/// callbacks — , , +/// , — over the daemon's +/// SignalR connection so the dashboard can render live progress. Info / +/// per-question-start / per-question-failure / fact-retained callbacks +/// just log locally; they're not interesting enough to justify SignalR +/// chatter for every judge. +/// +sealed class DaemonEvalObserver( + ServerConnection connection, + string evalRunId, + string sessionId, + ILogger logger + ) : IEvalObserver { + public void OnInfo(string message) => + logger.LogDebug("[eval {Run}] {Message}", evalRunId, message); + + public void OnStarted(string runId, string contextSessionId, string judgeModel, int totalQuestions) { + logger.LogInformation("Eval {Run} started on session {Sid} (model {Model}, {Count} questions)", runId, sessionId, judgeModel, totalQuestions); + Relay(() => connection.EvalStartedAsync(runId, sessionId, judgeModel, totalQuestions), "EvalStarted"); + } + + public void OnContextFetched(int traceEntries, int traceChars, int toolResultsTotal, int toolResultsTruncated, long bytesSaved) => + logger.LogDebug("Eval {Run} context fetched: {Entries} entries, {Chars} chars", evalRunId, traceEntries, traceChars); + + public void OnQuestionStarted(int index, int total, string category, string questionId) => + logger.LogDebug("[eval {Run}] [{Index}/{Total}] {Category}/{Question} started", evalRunId, index, total, category, questionId); + + public void OnQuestionCompleted(int index, int total, EvalQuestionVerdict verdict, long inputTokens, long outputTokens) { + logger.LogInformation( + "[eval {Run}] [{Index}/{Total}] {Question} -> {Score} ({Verdict})", + evalRunId, index, total, verdict.QuestionId, verdict.Score, verdict.Verdict + ); + Relay(() => connection.EvalQuestionCompletedAsync(evalRunId, sessionId, index, total, verdict.Category, verdict.QuestionId, verdict.Score, verdict.Verdict), "EvalQuestionCompleted"); + } + + public void OnQuestionFailed(int index, int total, string category, string questionId, string reason) => + logger.LogWarning("[eval {Run}] [{Index}/{Total}] {Category}/{Question} failed: {Reason}", evalRunId, index, total, category, questionId, reason); + + public void OnFactRetained(string category, string fact) => + logger.LogDebug("[eval {Run}] retained fact for {Category}: {Fact}", evalRunId, category, fact); + + public void OnFinished(SessionEvalCompletedPayload aggregate) { + logger.LogInformation("Eval {Run} finished on session {Sid}: {Score}/5", evalRunId, sessionId, aggregate.OverallScore); + Relay(() => connection.EvalFinishedAsync(evalRunId, sessionId, aggregate.OverallScore, aggregate.Summary), "EvalFinished"); + } + + public void OnFailed(string reason) { + logger.LogWarning("Eval {Run} failed on session {Sid}: {Reason}", evalRunId, sessionId, reason); + Relay(() => connection.EvalFailedAsync(evalRunId, sessionId, reason), "EvalFailed"); + } + + void Relay(Func send, string eventName) { + _ = Task.Run(async () => { + try { + await send(); + } catch (Exception ex) { + logger.LogWarning(ex, "Failed to relay {Event} for eval {Run}", eventName, evalRunId); + } + }); + } +} diff --git a/src/kapacitor/Daemon/Services/ServerConnection.cs b/src/kapacitor/Daemon/Services/ServerConnection.cs index 4ac780a..5775a95 100644 --- a/src/kapacitor/Daemon/Services/ServerConnection.cs +++ b/src/kapacitor/Daemon/Services/ServerConnection.cs @@ -22,6 +22,7 @@ public partial class ServerConnection : IAsyncDisposable { public event Func? OnSendInput; public event Func? OnSendSpecialKey; // agentId, key public event Func? OnResizeTerminal; + public event Func? OnRunEval; public ServerConnection(DaemonConfig config, ILogger logger) { _config = config; @@ -51,6 +52,7 @@ public ServerConnection(DaemonConfig config, ILogger logger) { _hub.On("SendInput", cmd => OnSendInput?.Invoke(cmd) ?? Task.CompletedTask); _hub.On("SendSpecialKey", (agentId, key) => OnSendSpecialKey?.Invoke(agentId, key) ?? Task.CompletedTask); _hub.On("ResizeTerminal", cmd => OnResizeTerminal?.Invoke(cmd) ?? Task.CompletedTask); + _hub.On("RunEval", cmd => OnRunEval?.Invoke(cmd) ?? Task.CompletedTask); _hub.Reconnected += OnReconnected; _hub.Closed += OnClosed; @@ -177,6 +179,20 @@ public Task LaunchFailedAsync(string agentId, string reason) public Task SendTerminalOutputAsync(string agentId, string base64Data) => _hub.SendAsync("SendTerminalOutput", new TerminalOutput(agentId, base64Data), cancellationToken: _ct); + // ── Eval progress events (DEV-1440) ──────────────────────────────────── + + public Task EvalStartedAsync(string evalRunId, string sessionId, string judgeModel, int totalQuestions) + => _hub.SendAsync("EvalStarted", new EvalStarted(evalRunId, sessionId, judgeModel, totalQuestions), cancellationToken: _ct); + + public Task EvalQuestionCompletedAsync(string evalRunId, string sessionId, int index, int total, string category, string questionId, int score, string verdict) + => _hub.SendAsync("EvalQuestionCompleted", new EvalQuestionCompleted(evalRunId, sessionId, index, total, category, questionId, score, verdict), cancellationToken: _ct); + + public Task EvalFinishedAsync(string evalRunId, string sessionId, int overallScore, string summary) + => _hub.SendAsync("EvalFinished", new EvalFinished(evalRunId, sessionId, overallScore, summary), cancellationToken: _ct); + + public Task EvalFailedAsync(string evalRunId, string sessionId, string reason) + => _hub.SendAsync("EvalFailed", new EvalFailed(evalRunId, sessionId, reason), cancellationToken: _ct); + public Task AppendAgentRunEventAsync(string agentId, object evt) { _eventChannel.Writer.TryWrite(new PendingEvent(agentId, evt)); diff --git a/src/kapacitor/Models.cs b/src/kapacitor/Models.cs index 1485e8f..09dc51c 100644 --- a/src/kapacitor/Models.cs +++ b/src/kapacitor/Models.cs @@ -380,6 +380,11 @@ record RepoEntry { [JsonSerializable(typeof(LaunchAgentCommand))] [JsonSerializable(typeof(SendInputCommand))] [JsonSerializable(typeof(ResizeTerminalCommand))] +[JsonSerializable(typeof(RunEvalCommand))] +[JsonSerializable(typeof(EvalStarted))] +[JsonSerializable(typeof(EvalQuestionCompleted))] +[JsonSerializable(typeof(EvalFinished))] +[JsonSerializable(typeof(EvalFailed))] [JsonSerializable(typeof(DaemonConnect))] [JsonSerializable(typeof(AgentRegistered))] [JsonSerializable(typeof(AgentStatusChanged))] @@ -453,6 +458,52 @@ public readonly record struct TerminalOutput( string Base64Data ); +// ── Eval dispatch (DEV-1440) ────────────────────────────────────────────── + +/// Sent by the server when the dashboard triggers an eval; received by the daemon over SignalR. +public readonly record struct RunEvalCommand( + string EvalRunId, + string SessionId, + string Model, + bool Chain, + int? ThresholdBytes + ); + +/// Daemon → server: eval has fetched context and is about to run the first judge. +public readonly record struct EvalStarted( + string EvalRunId, + string SessionId, + string JudgeModel, + int TotalQuestions + ); + +/// Daemon → server: a judge question completed with a verdict. +public readonly record struct EvalQuestionCompleted( + string EvalRunId, + string SessionId, + int Index, + int Total, + string Category, + string QuestionId, + int Score, + string Verdict + ); + +/// Daemon → server: eval run finished end-to-end and aggregate has been persisted. +public readonly record struct EvalFinished( + string EvalRunId, + string SessionId, + int OverallScore, + string Summary + ); + +/// Daemon → server: eval run failed before producing an aggregate. +public readonly record struct EvalFailed( + string EvalRunId, + string SessionId, + string Reason + ); + /// Agent run events posted to the server HTTP API. record AgentRunStarted( string? Prompt, From 2866c00834d4d1ce524e2d2db72096d6bc3be468 Mon Sep 17 00:00:00 2001 From: Alexey Zimarev Date: Mon, 13 Apr 2026 17:04:35 +0200 Subject: [PATCH 2/2] [DEV-1440] address review feedback on daemon eval runner MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Three findings on PR #15 (the other two — observer-throw guard and judge-fact cancellation propagation — were already addressed by the M1 follow-up in 1f655f4): 1. EvalRunId mismatch (Action required) — server dispatches RunEvalCommand with an EvalRunId, but EvalService generated its own GUID, leading to two different ids in one run's event stream (EvalStarted used the service-generated id; subsequent question / finished / failed events used the dispatched id captured in DaemonEvalObserver). Fixed by adding an optional `evalRunId` parameter to EvalService.RunAsync; CLI passes null (mints a fresh id, current behaviour) and the daemon passes cmd.EvalRunId so the whole run, including the persisted SessionEvalCompleted aggregate, shares one correlation id end-to-end. 2. Out-of-order progress events (Recommended) — DaemonEvalObserver's per-event Task.Run can interleave concurrent SignalR sends. Added a SemaphoreSlim(1,1) gate inside Relay so the background sends drain in their enqueue order — the dashboard sees EvalStarted before any question completion, and EvalFinished/EvalFailed last, deterministically. 3. Daemon evals not cancellable on shutdown (Recommended) — EvalRunner spawned Task.Run with no link to the host lifecycle. Now injects IHostApplicationLifetime, captures ApplicationStopping, and passes it as ct to EvalService.RunAsync. M1's outer try/catch turns in-flight cancellation into a clean OnFailed("cancelled") relay so the dashboard learns the eval stopped instead of waiting forever. Full suite 205/205, AOT publish clean. Co-Authored-By: Claude Opus 4.6 (1M context) --- src/kapacitor/Daemon/Services/EvalRunner.cs | 47 ++++++++++++++++----- src/kapacitor/Eval/EvalService.cs | 10 ++++- 2 files changed, 44 insertions(+), 13 deletions(-) diff --git a/src/kapacitor/Daemon/Services/EvalRunner.cs b/src/kapacitor/Daemon/Services/EvalRunner.cs index 1ed132d..a355f95 100644 --- a/src/kapacitor/Daemon/Services/EvalRunner.cs +++ b/src/kapacitor/Daemon/Services/EvalRunner.cs @@ -1,4 +1,5 @@ using kapacitor.Eval; +using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; namespace kapacitor.Daemon.Services; @@ -13,14 +14,21 @@ namespace kapacitor.Daemon.Services; /// through EvalStarted / EvalQuestionCompleted / etc. /// public sealed class EvalRunner { - readonly ServerConnection _connection; - readonly ILogger _logger; - readonly string _baseUrl; - - public EvalRunner(ServerConnection connection, DaemonConfig config, ILogger logger) { - _connection = connection; - _logger = logger; - _baseUrl = config.ServerUrl.TrimEnd('/'); + readonly ServerConnection _connection; + readonly ILogger _logger; + readonly string _baseUrl; + readonly CancellationToken _shutdownToken; + + public EvalRunner( + ServerConnection connection, + DaemonConfig config, + IHostApplicationLifetime lifetime, + ILogger logger + ) { + _connection = connection; + _logger = logger; + _baseUrl = config.ServerUrl.TrimEnd('/'); + _shutdownToken = lifetime.ApplicationStopping; _connection.OnRunEval += HandleRunEvalAsync; } @@ -33,8 +41,10 @@ async Task HandleRunEvalAsync(RunEvalCommand cmd) { // Fire-and-forget so the SignalR hub invocation doesn't block; the // eval's own observer callbacks fan progress back to the server. - // Any unhandled exception from the run is caught and translated to - // an EvalFailed event so the dashboard learns about it. + // Daemon shutdown cancels the token so in-flight evals get a clean + // OnFailed("cancelled") rather than dying mid-judge. Any unhandled + // exception from the run is caught and translated to an EvalFailed + // event so the dashboard learns about it either way. _ = Task.Run(async () => { try { using var httpClient = await HttpClientExtensions.CreateAuthenticatedClientAsync(); @@ -47,7 +57,12 @@ await EvalService.RunAsync( cmd.Model, cmd.Chain, cmd.ThresholdBytes, - observer + observer, + ct: _shutdownToken, + // Use the dispatched run id so all progress events and + // the persisted aggregate share the same correlation id + // the dashboard already knows about. + evalRunId: cmd.EvalRunId ); } catch (Exception ex) { _logger.LogError(ex, "Unhandled exception running eval {RunId} on session {Sid}", cmd.EvalRunId, cmd.SessionId); @@ -77,6 +92,13 @@ sealed class DaemonEvalObserver( string sessionId, ILogger logger ) : IEvalObserver { + // Serialize SignalR relays so concurrent Task.Runs don't interleave; + // the dashboard expects EvalStarted → question completions → + // EvalFinished/EvalFailed in order. SemaphoreSlim suffices because + // the observer is called synchronously from EvalService — only the + // async send to SignalR could otherwise reorder. + readonly SemaphoreSlim _relayLock = new(1, 1); + public void OnInfo(string message) => logger.LogDebug("[eval {Run}] {Message}", evalRunId, message); @@ -117,10 +139,13 @@ public void OnFailed(string reason) { void Relay(Func send, string eventName) { _ = Task.Run(async () => { + await _relayLock.WaitAsync(); try { await send(); } catch (Exception ex) { logger.LogWarning(ex, "Failed to relay {Event} for eval {Run}", eventName, evalRunId); + } finally { + _relayLock.Release(); } }); } diff --git a/src/kapacitor/Eval/EvalService.cs b/src/kapacitor/Eval/EvalService.cs index 028fded..86a83e9 100644 --- a/src/kapacitor/Eval/EvalService.cs +++ b/src/kapacitor/Eval/EvalService.cs @@ -34,14 +34,20 @@ internal static class EvalService { bool chain, int? thresholdBytes, IEvalObserver observer, - CancellationToken ct = default + CancellationToken ct = default, + string? evalRunId = null ) { // Wrap the caller-supplied observer so any throw from a callback // (e.g. SignalR push failures in the daemon) is caught and logged // without aborting the eval — IEvalObserver documents this guarantee. observer = new SafeObserver(observer); - var evalRunId = Guid.NewGuid().ToString(); + // The daemon (DEV-1440 M2) passes the dispatched RunEvalCommand.EvalRunId + // so server-side correlation works end-to-end (RunEvalCommand → + // EvalStarted → EvalQuestionCompleted → EvalFinished + persisted + // SessionEvalCompleted all share the same id). Direct CLI invocations + // pass null and the service mints a fresh GUID. + evalRunId ??= Guid.NewGuid().ToString(); // Session IDs are typically UUIDs but meta-session slugs are free-form // user input; escape once and reuse for every session-scoped URL so