diff --git a/src/kapacitor/Daemon/DaemonRunner.cs b/src/kapacitor/Daemon/DaemonRunner.cs index 90b8935..fa56bfc 100644 --- a/src/kapacitor/Daemon/DaemonRunner.cs +++ b/src/kapacitor/Daemon/DaemonRunner.cs @@ -121,6 +121,7 @@ public static async Task RunAsync(string[] args) { builder.Services.AddHttpClient("Attachments", client => client.BaseAddress = new Uri(config.ServerUrl)); builder.Services.AddSingleton(); + builder.Services.AddSingleton(); var host = builder.Build(); var logger = host.Services.GetRequiredService().CreateLogger("kapacitor.Daemon"); @@ -135,6 +136,10 @@ public static async Task RunAsync(string[] args) { await worktreeManager.CleanupOrphanedAsync(); var orchestrator = host.Services.GetRequiredService(); + // Instantiate EvalRunner so it subscribes to OnRunEval on the + // ServerConnection. It's stateless beyond the subscription, so no + // disposal dance is needed. + _ = host.Services.GetRequiredService(); try { await host.RunAsync(); diff --git a/src/kapacitor/Daemon/Services/EvalRunner.cs b/src/kapacitor/Daemon/Services/EvalRunner.cs new file mode 100644 index 0000000..a355f95 --- /dev/null +++ b/src/kapacitor/Daemon/Services/EvalRunner.cs @@ -0,0 +1,152 @@ +using kapacitor.Eval; +using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging; + +namespace kapacitor.Daemon.Services; + +/// +/// DEV-1440 milestone 2 — handles dispatched +/// from the dashboard via SignalR. The daemon pulls an authenticated +/// HTTP client, runs with a +/// that relays progress back over the +/// SignalR connection, and completes asynchronously — the SignalR +/// RunEval hub invocation fires-and-forgets; progress comes back +/// through EvalStarted / EvalQuestionCompleted / etc. +/// +public sealed class EvalRunner { + readonly ServerConnection _connection; + readonly ILogger _logger; + readonly string _baseUrl; + readonly CancellationToken _shutdownToken; + + public EvalRunner( + ServerConnection connection, + DaemonConfig config, + IHostApplicationLifetime lifetime, + ILogger logger + ) { + _connection = connection; + _logger = logger; + _baseUrl = config.ServerUrl.TrimEnd('/'); + _shutdownToken = lifetime.ApplicationStopping; + + _connection.OnRunEval += HandleRunEvalAsync; + } + + async Task HandleRunEvalAsync(RunEvalCommand cmd) { + _logger.LogInformation( + "Dispatched eval {RunId} for session {Sid} (model {Model}, chain={Chain})", + cmd.EvalRunId, cmd.SessionId, cmd.Model, cmd.Chain + ); + + // Fire-and-forget so the SignalR hub invocation doesn't block; the + // eval's own observer callbacks fan progress back to the server. + // Daemon shutdown cancels the token so in-flight evals get a clean + // OnFailed("cancelled") rather than dying mid-judge. Any unhandled + // exception from the run is caught and translated to an EvalFailed + // event so the dashboard learns about it either way. + _ = Task.Run(async () => { + try { + using var httpClient = await HttpClientExtensions.CreateAuthenticatedClientAsync(); + var observer = new DaemonEvalObserver(_connection, cmd.EvalRunId, cmd.SessionId, _logger); + + await EvalService.RunAsync( + _baseUrl, + httpClient, + cmd.SessionId, + cmd.Model, + cmd.Chain, + cmd.ThresholdBytes, + observer, + ct: _shutdownToken, + // Use the dispatched run id so all progress events and + // the persisted aggregate share the same correlation id + // the dashboard already knows about. + evalRunId: cmd.EvalRunId + ); + } catch (Exception ex) { + _logger.LogError(ex, "Unhandled exception running eval {RunId} on session {Sid}", cmd.EvalRunId, cmd.SessionId); + + try { + await _connection.EvalFailedAsync(cmd.EvalRunId, cmd.SessionId, $"daemon error: {ex.GetType().Name}"); + } catch (Exception relayEx) { + _logger.LogError(relayEx, "Failed to relay EvalFailed for eval {RunId}", cmd.EvalRunId); + } + } + }); + } +} + +/// +/// implementation that pushes the shaped +/// callbacks — , , +/// , — over the daemon's +/// SignalR connection so the dashboard can render live progress. Info / +/// per-question-start / per-question-failure / fact-retained callbacks +/// just log locally; they're not interesting enough to justify SignalR +/// chatter for every judge. +/// +sealed class DaemonEvalObserver( + ServerConnection connection, + string evalRunId, + string sessionId, + ILogger logger + ) : IEvalObserver { + // Serialize SignalR relays so concurrent Task.Runs don't interleave; + // the dashboard expects EvalStarted → question completions → + // EvalFinished/EvalFailed in order. SemaphoreSlim suffices because + // the observer is called synchronously from EvalService — only the + // async send to SignalR could otherwise reorder. + readonly SemaphoreSlim _relayLock = new(1, 1); + + public void OnInfo(string message) => + logger.LogDebug("[eval {Run}] {Message}", evalRunId, message); + + public void OnStarted(string runId, string contextSessionId, string judgeModel, int totalQuestions) { + logger.LogInformation("Eval {Run} started on session {Sid} (model {Model}, {Count} questions)", runId, sessionId, judgeModel, totalQuestions); + Relay(() => connection.EvalStartedAsync(runId, sessionId, judgeModel, totalQuestions), "EvalStarted"); + } + + public void OnContextFetched(int traceEntries, int traceChars, int toolResultsTotal, int toolResultsTruncated, long bytesSaved) => + logger.LogDebug("Eval {Run} context fetched: {Entries} entries, {Chars} chars", evalRunId, traceEntries, traceChars); + + public void OnQuestionStarted(int index, int total, string category, string questionId) => + logger.LogDebug("[eval {Run}] [{Index}/{Total}] {Category}/{Question} started", evalRunId, index, total, category, questionId); + + public void OnQuestionCompleted(int index, int total, EvalQuestionVerdict verdict, long inputTokens, long outputTokens) { + logger.LogInformation( + "[eval {Run}] [{Index}/{Total}] {Question} -> {Score} ({Verdict})", + evalRunId, index, total, verdict.QuestionId, verdict.Score, verdict.Verdict + ); + Relay(() => connection.EvalQuestionCompletedAsync(evalRunId, sessionId, index, total, verdict.Category, verdict.QuestionId, verdict.Score, verdict.Verdict), "EvalQuestionCompleted"); + } + + public void OnQuestionFailed(int index, int total, string category, string questionId, string reason) => + logger.LogWarning("[eval {Run}] [{Index}/{Total}] {Category}/{Question} failed: {Reason}", evalRunId, index, total, category, questionId, reason); + + public void OnFactRetained(string category, string fact) => + logger.LogDebug("[eval {Run}] retained fact for {Category}: {Fact}", evalRunId, category, fact); + + public void OnFinished(SessionEvalCompletedPayload aggregate) { + logger.LogInformation("Eval {Run} finished on session {Sid}: {Score}/5", evalRunId, sessionId, aggregate.OverallScore); + Relay(() => connection.EvalFinishedAsync(evalRunId, sessionId, aggregate.OverallScore, aggregate.Summary), "EvalFinished"); + } + + public void OnFailed(string reason) { + logger.LogWarning("Eval {Run} failed on session {Sid}: {Reason}", evalRunId, sessionId, reason); + Relay(() => connection.EvalFailedAsync(evalRunId, sessionId, reason), "EvalFailed"); + } + + void Relay(Func send, string eventName) { + _ = Task.Run(async () => { + await _relayLock.WaitAsync(); + try { + await send(); + } catch (Exception ex) { + logger.LogWarning(ex, "Failed to relay {Event} for eval {Run}", eventName, evalRunId); + } finally { + _relayLock.Release(); + } + }); + } +} diff --git a/src/kapacitor/Daemon/Services/ServerConnection.cs b/src/kapacitor/Daemon/Services/ServerConnection.cs index 4ac780a..5775a95 100644 --- a/src/kapacitor/Daemon/Services/ServerConnection.cs +++ b/src/kapacitor/Daemon/Services/ServerConnection.cs @@ -22,6 +22,7 @@ public partial class ServerConnection : IAsyncDisposable { public event Func? OnSendInput; public event Func? OnSendSpecialKey; // agentId, key public event Func? OnResizeTerminal; + public event Func? OnRunEval; public ServerConnection(DaemonConfig config, ILogger logger) { _config = config; @@ -51,6 +52,7 @@ public ServerConnection(DaemonConfig config, ILogger logger) { _hub.On("SendInput", cmd => OnSendInput?.Invoke(cmd) ?? Task.CompletedTask); _hub.On("SendSpecialKey", (agentId, key) => OnSendSpecialKey?.Invoke(agentId, key) ?? Task.CompletedTask); _hub.On("ResizeTerminal", cmd => OnResizeTerminal?.Invoke(cmd) ?? Task.CompletedTask); + _hub.On("RunEval", cmd => OnRunEval?.Invoke(cmd) ?? Task.CompletedTask); _hub.Reconnected += OnReconnected; _hub.Closed += OnClosed; @@ -177,6 +179,20 @@ public Task LaunchFailedAsync(string agentId, string reason) public Task SendTerminalOutputAsync(string agentId, string base64Data) => _hub.SendAsync("SendTerminalOutput", new TerminalOutput(agentId, base64Data), cancellationToken: _ct); + // ── Eval progress events (DEV-1440) ──────────────────────────────────── + + public Task EvalStartedAsync(string evalRunId, string sessionId, string judgeModel, int totalQuestions) + => _hub.SendAsync("EvalStarted", new EvalStarted(evalRunId, sessionId, judgeModel, totalQuestions), cancellationToken: _ct); + + public Task EvalQuestionCompletedAsync(string evalRunId, string sessionId, int index, int total, string category, string questionId, int score, string verdict) + => _hub.SendAsync("EvalQuestionCompleted", new EvalQuestionCompleted(evalRunId, sessionId, index, total, category, questionId, score, verdict), cancellationToken: _ct); + + public Task EvalFinishedAsync(string evalRunId, string sessionId, int overallScore, string summary) + => _hub.SendAsync("EvalFinished", new EvalFinished(evalRunId, sessionId, overallScore, summary), cancellationToken: _ct); + + public Task EvalFailedAsync(string evalRunId, string sessionId, string reason) + => _hub.SendAsync("EvalFailed", new EvalFailed(evalRunId, sessionId, reason), cancellationToken: _ct); + public Task AppendAgentRunEventAsync(string agentId, object evt) { _eventChannel.Writer.TryWrite(new PendingEvent(agentId, evt)); diff --git a/src/kapacitor/Eval/EvalService.cs b/src/kapacitor/Eval/EvalService.cs index 028fded..86a83e9 100644 --- a/src/kapacitor/Eval/EvalService.cs +++ b/src/kapacitor/Eval/EvalService.cs @@ -34,14 +34,20 @@ internal static class EvalService { bool chain, int? thresholdBytes, IEvalObserver observer, - CancellationToken ct = default + CancellationToken ct = default, + string? evalRunId = null ) { // Wrap the caller-supplied observer so any throw from a callback // (e.g. SignalR push failures in the daemon) is caught and logged // without aborting the eval — IEvalObserver documents this guarantee. observer = new SafeObserver(observer); - var evalRunId = Guid.NewGuid().ToString(); + // The daemon (DEV-1440 M2) passes the dispatched RunEvalCommand.EvalRunId + // so server-side correlation works end-to-end (RunEvalCommand → + // EvalStarted → EvalQuestionCompleted → EvalFinished + persisted + // SessionEvalCompleted all share the same id). Direct CLI invocations + // pass null and the service mints a fresh GUID. + evalRunId ??= Guid.NewGuid().ToString(); // Session IDs are typically UUIDs but meta-session slugs are free-form // user input; escape once and reuse for every session-scoped URL so diff --git a/src/kapacitor/Models.cs b/src/kapacitor/Models.cs index 1485e8f..09dc51c 100644 --- a/src/kapacitor/Models.cs +++ b/src/kapacitor/Models.cs @@ -380,6 +380,11 @@ record RepoEntry { [JsonSerializable(typeof(LaunchAgentCommand))] [JsonSerializable(typeof(SendInputCommand))] [JsonSerializable(typeof(ResizeTerminalCommand))] +[JsonSerializable(typeof(RunEvalCommand))] +[JsonSerializable(typeof(EvalStarted))] +[JsonSerializable(typeof(EvalQuestionCompleted))] +[JsonSerializable(typeof(EvalFinished))] +[JsonSerializable(typeof(EvalFailed))] [JsonSerializable(typeof(DaemonConnect))] [JsonSerializable(typeof(AgentRegistered))] [JsonSerializable(typeof(AgentStatusChanged))] @@ -453,6 +458,52 @@ public readonly record struct TerminalOutput( string Base64Data ); +// ── Eval dispatch (DEV-1440) ────────────────────────────────────────────── + +/// Sent by the server when the dashboard triggers an eval; received by the daemon over SignalR. +public readonly record struct RunEvalCommand( + string EvalRunId, + string SessionId, + string Model, + bool Chain, + int? ThresholdBytes + ); + +/// Daemon → server: eval has fetched context and is about to run the first judge. +public readonly record struct EvalStarted( + string EvalRunId, + string SessionId, + string JudgeModel, + int TotalQuestions + ); + +/// Daemon → server: a judge question completed with a verdict. +public readonly record struct EvalQuestionCompleted( + string EvalRunId, + string SessionId, + int Index, + int Total, + string Category, + string QuestionId, + int Score, + string Verdict + ); + +/// Daemon → server: eval run finished end-to-end and aggregate has been persisted. +public readonly record struct EvalFinished( + string EvalRunId, + string SessionId, + int OverallScore, + string Summary + ); + +/// Daemon → server: eval run failed before producing an aggregate. +public readonly record struct EvalFailed( + string EvalRunId, + string SessionId, + string Reason + ); + /// Agent run events posted to the server HTTP API. record AgentRunStarted( string? Prompt,