Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions src/kapacitor/Daemon/DaemonRunner.cs
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ public static async Task<int> RunAsync(string[] args) {

builder.Services.AddHttpClient("Attachments", client => client.BaseAddress = new Uri(config.ServerUrl));
builder.Services.AddSingleton<AgentOrchestrator>();
builder.Services.AddSingleton<EvalRunner>();

var host = builder.Build();
var logger = host.Services.GetRequiredService<ILoggerFactory>().CreateLogger("kapacitor.Daemon");
Expand All @@ -135,6 +136,10 @@ public static async Task<int> RunAsync(string[] args) {
await worktreeManager.CleanupOrphanedAsync();

var orchestrator = host.Services.GetRequiredService<AgentOrchestrator>();
// Instantiate EvalRunner so it subscribes to OnRunEval on the
// ServerConnection. It's stateless beyond the subscription, so no
// disposal dance is needed.
_ = host.Services.GetRequiredService<EvalRunner>();

try {
await host.RunAsync();
Expand Down
152 changes: 152 additions & 0 deletions src/kapacitor/Daemon/Services/EvalRunner.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
using kapacitor.Eval;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;

namespace kapacitor.Daemon.Services;

/// <summary>
/// DEV-1440 milestone 2 — handles <see cref="RunEvalCommand"/> dispatched
/// from the dashboard via SignalR. The daemon pulls an authenticated
/// HTTP client, runs <see cref="EvalService"/> with a
/// <see cref="DaemonEvalObserver"/> that relays progress back over the
/// SignalR connection, and completes asynchronously — the SignalR
/// <c>RunEval</c> hub invocation fires-and-forgets; progress comes back
/// through <c>EvalStarted</c> / <c>EvalQuestionCompleted</c> / etc.
/// </summary>
public sealed class EvalRunner {
readonly ServerConnection _connection;
readonly ILogger<EvalRunner> _logger;
readonly string _baseUrl;
readonly CancellationToken _shutdownToken;

public EvalRunner(
ServerConnection connection,
DaemonConfig config,
IHostApplicationLifetime lifetime,
ILogger<EvalRunner> logger
) {
_connection = connection;
_logger = logger;
_baseUrl = config.ServerUrl.TrimEnd('/');
_shutdownToken = lifetime.ApplicationStopping;

_connection.OnRunEval += HandleRunEvalAsync;
}

async Task HandleRunEvalAsync(RunEvalCommand cmd) {
_logger.LogInformation(
"Dispatched eval {RunId} for session {Sid} (model {Model}, chain={Chain})",
cmd.EvalRunId, cmd.SessionId, cmd.Model, cmd.Chain
);

// Fire-and-forget so the SignalR hub invocation doesn't block; the
// eval's own observer callbacks fan progress back to the server.
// Daemon shutdown cancels the token so in-flight evals get a clean
// OnFailed("cancelled") rather than dying mid-judge. Any unhandled
// exception from the run is caught and translated to an EvalFailed
// event so the dashboard learns about it either way.
_ = Task.Run(async () => {
try {
using var httpClient = await HttpClientExtensions.CreateAuthenticatedClientAsync();
var observer = new DaemonEvalObserver(_connection, cmd.EvalRunId, cmd.SessionId, _logger);

await EvalService.RunAsync(
_baseUrl,
httpClient,
cmd.SessionId,
cmd.Model,
cmd.Chain,
cmd.ThresholdBytes,
observer,
ct: _shutdownToken,
// Use the dispatched run id so all progress events and
// the persisted aggregate share the same correlation id
// the dashboard already knows about.
evalRunId: cmd.EvalRunId
);
} catch (Exception ex) {
_logger.LogError(ex, "Unhandled exception running eval {RunId} on session {Sid}", cmd.EvalRunId, cmd.SessionId);

try {
await _connection.EvalFailedAsync(cmd.EvalRunId, cmd.SessionId, $"daemon error: {ex.GetType().Name}");
} catch (Exception relayEx) {
_logger.LogError(relayEx, "Failed to relay EvalFailed for eval {RunId}", cmd.EvalRunId);
}
}
});
}
}

/// <summary>
/// <see cref="IEvalObserver"/> implementation that pushes the shaped
/// callbacks — <see cref="OnStarted"/>, <see cref="OnQuestionCompleted"/>,
/// <see cref="OnFinished"/>, <see cref="OnFailed"/> — over the daemon's
/// SignalR connection so the dashboard can render live progress. Info /
/// per-question-start / per-question-failure / fact-retained callbacks
/// just log locally; they're not interesting enough to justify SignalR
/// chatter for every judge.
/// </summary>
sealed class DaemonEvalObserver(
ServerConnection connection,
string evalRunId,
string sessionId,
ILogger logger
) : IEvalObserver {
// Serialize SignalR relays so concurrent Task.Runs don't interleave;
// the dashboard expects EvalStarted → question completions →
// EvalFinished/EvalFailed in order. SemaphoreSlim suffices because
// the observer is called synchronously from EvalService — only the
// async send to SignalR could otherwise reorder.
readonly SemaphoreSlim _relayLock = new(1, 1);

public void OnInfo(string message) =>
logger.LogDebug("[eval {Run}] {Message}", evalRunId, message);

public void OnStarted(string runId, string contextSessionId, string judgeModel, int totalQuestions) {
logger.LogInformation("Eval {Run} started on session {Sid} (model {Model}, {Count} questions)", runId, sessionId, judgeModel, totalQuestions);
Relay(() => connection.EvalStartedAsync(runId, sessionId, judgeModel, totalQuestions), "EvalStarted");
}

public void OnContextFetched(int traceEntries, int traceChars, int toolResultsTotal, int toolResultsTruncated, long bytesSaved) =>
logger.LogDebug("Eval {Run} context fetched: {Entries} entries, {Chars} chars", evalRunId, traceEntries, traceChars);

public void OnQuestionStarted(int index, int total, string category, string questionId) =>
logger.LogDebug("[eval {Run}] [{Index}/{Total}] {Category}/{Question} started", evalRunId, index, total, category, questionId);

public void OnQuestionCompleted(int index, int total, EvalQuestionVerdict verdict, long inputTokens, long outputTokens) {
logger.LogInformation(
"[eval {Run}] [{Index}/{Total}] {Question} -> {Score} ({Verdict})",
evalRunId, index, total, verdict.QuestionId, verdict.Score, verdict.Verdict
);
Relay(() => connection.EvalQuestionCompletedAsync(evalRunId, sessionId, index, total, verdict.Category, verdict.QuestionId, verdict.Score, verdict.Verdict), "EvalQuestionCompleted");
}
Comment thread
qodo-code-review[bot] marked this conversation as resolved.

public void OnQuestionFailed(int index, int total, string category, string questionId, string reason) =>
logger.LogWarning("[eval {Run}] [{Index}/{Total}] {Category}/{Question} failed: {Reason}", evalRunId, index, total, category, questionId, reason);

public void OnFactRetained(string category, string fact) =>
logger.LogDebug("[eval {Run}] retained fact for {Category}: {Fact}", evalRunId, category, fact);

public void OnFinished(SessionEvalCompletedPayload aggregate) {
logger.LogInformation("Eval {Run} finished on session {Sid}: {Score}/5", evalRunId, sessionId, aggregate.OverallScore);
Relay(() => connection.EvalFinishedAsync(evalRunId, sessionId, aggregate.OverallScore, aggregate.Summary), "EvalFinished");
}

public void OnFailed(string reason) {
logger.LogWarning("Eval {Run} failed on session {Sid}: {Reason}", evalRunId, sessionId, reason);
Relay(() => connection.EvalFailedAsync(evalRunId, sessionId, reason), "EvalFailed");
}

void Relay(Func<Task> send, string eventName) {
_ = Task.Run(async () => {
await _relayLock.WaitAsync();
try {
await send();
} catch (Exception ex) {
logger.LogWarning(ex, "Failed to relay {Event} for eval {Run}", eventName, evalRunId);
} finally {
_relayLock.Release();
}
});
}
}
16 changes: 16 additions & 0 deletions src/kapacitor/Daemon/Services/ServerConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ public partial class ServerConnection : IAsyncDisposable {
public event Func<SendInputCommand, Task>? OnSendInput;
public event Func<string, string, Task>? OnSendSpecialKey; // agentId, key
public event Func<ResizeTerminalCommand, Task>? OnResizeTerminal;
public event Func<RunEvalCommand, Task>? OnRunEval;

public ServerConnection(DaemonConfig config, ILogger<ServerConnection> logger) {
_config = config;
Expand Down Expand Up @@ -51,6 +52,7 @@ public ServerConnection(DaemonConfig config, ILogger<ServerConnection> logger) {
_hub.On<SendInputCommand>("SendInput", cmd => OnSendInput?.Invoke(cmd) ?? Task.CompletedTask);
_hub.On<string, string>("SendSpecialKey", (agentId, key) => OnSendSpecialKey?.Invoke(agentId, key) ?? Task.CompletedTask);
_hub.On<ResizeTerminalCommand>("ResizeTerminal", cmd => OnResizeTerminal?.Invoke(cmd) ?? Task.CompletedTask);
_hub.On<RunEvalCommand>("RunEval", cmd => OnRunEval?.Invoke(cmd) ?? Task.CompletedTask);

_hub.Reconnected += OnReconnected;
_hub.Closed += OnClosed;
Expand Down Expand Up @@ -177,6 +179,20 @@ public Task LaunchFailedAsync(string agentId, string reason)
public Task SendTerminalOutputAsync(string agentId, string base64Data)
=> _hub.SendAsync("SendTerminalOutput", new TerminalOutput(agentId, base64Data), cancellationToken: _ct);

// ── Eval progress events (DEV-1440) ────────────────────────────────────

public Task EvalStartedAsync(string evalRunId, string sessionId, string judgeModel, int totalQuestions)
=> _hub.SendAsync("EvalStarted", new EvalStarted(evalRunId, sessionId, judgeModel, totalQuestions), cancellationToken: _ct);

public Task EvalQuestionCompletedAsync(string evalRunId, string sessionId, int index, int total, string category, string questionId, int score, string verdict)
=> _hub.SendAsync("EvalQuestionCompleted", new EvalQuestionCompleted(evalRunId, sessionId, index, total, category, questionId, score, verdict), cancellationToken: _ct);

public Task EvalFinishedAsync(string evalRunId, string sessionId, int overallScore, string summary)
=> _hub.SendAsync("EvalFinished", new EvalFinished(evalRunId, sessionId, overallScore, summary), cancellationToken: _ct);

public Task EvalFailedAsync(string evalRunId, string sessionId, string reason)
=> _hub.SendAsync("EvalFailed", new EvalFailed(evalRunId, sessionId, reason), cancellationToken: _ct);

public Task AppendAgentRunEventAsync(string agentId, object evt) {
_eventChannel.Writer.TryWrite(new PendingEvent(agentId, evt));

Expand Down
10 changes: 8 additions & 2 deletions src/kapacitor/Eval/EvalService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,20 @@ internal static class EvalService {
bool chain,
int? thresholdBytes,
IEvalObserver observer,
CancellationToken ct = default
CancellationToken ct = default,
string? evalRunId = null
) {
// Wrap the caller-supplied observer so any throw from a callback
// (e.g. SignalR push failures in the daemon) is caught and logged
// without aborting the eval — IEvalObserver documents this guarantee.
observer = new SafeObserver(observer);

var evalRunId = Guid.NewGuid().ToString();
// The daemon (DEV-1440 M2) passes the dispatched RunEvalCommand.EvalRunId
// so server-side correlation works end-to-end (RunEvalCommand →
// EvalStarted → EvalQuestionCompleted → EvalFinished + persisted
// SessionEvalCompleted all share the same id). Direct CLI invocations
// pass null and the service mints a fresh GUID.
evalRunId ??= Guid.NewGuid().ToString();

// Session IDs are typically UUIDs but meta-session slugs are free-form
// user input; escape once and reuse for every session-scoped URL so
Expand Down
51 changes: 51 additions & 0 deletions src/kapacitor/Models.cs
Original file line number Diff line number Diff line change
Expand Up @@ -380,6 +380,11 @@ record RepoEntry {
[JsonSerializable(typeof(LaunchAgentCommand))]
[JsonSerializable(typeof(SendInputCommand))]
[JsonSerializable(typeof(ResizeTerminalCommand))]
[JsonSerializable(typeof(RunEvalCommand))]
[JsonSerializable(typeof(EvalStarted))]
[JsonSerializable(typeof(EvalQuestionCompleted))]
[JsonSerializable(typeof(EvalFinished))]
[JsonSerializable(typeof(EvalFailed))]
[JsonSerializable(typeof(DaemonConnect))]
[JsonSerializable(typeof(AgentRegistered))]
[JsonSerializable(typeof(AgentStatusChanged))]
Expand Down Expand Up @@ -453,6 +458,52 @@ public readonly record struct TerminalOutput(
string Base64Data
);

// ── Eval dispatch (DEV-1440) ──────────────────────────────────────────────

/// <summary>Sent by the server when the dashboard triggers an eval; received by the daemon over SignalR.</summary>
public readonly record struct RunEvalCommand(
string EvalRunId,
string SessionId,
string Model,
bool Chain,
int? ThresholdBytes
);

/// <summary>Daemon → server: eval has fetched context and is about to run the first judge.</summary>
public readonly record struct EvalStarted(
string EvalRunId,
string SessionId,
string JudgeModel,
int TotalQuestions
);

/// <summary>Daemon → server: a judge question completed with a verdict.</summary>
public readonly record struct EvalQuestionCompleted(
string EvalRunId,
string SessionId,
int Index,
int Total,
string Category,
string QuestionId,
int Score,
string Verdict
);

/// <summary>Daemon → server: eval run finished end-to-end and aggregate has been persisted.</summary>
public readonly record struct EvalFinished(
string EvalRunId,
string SessionId,
int OverallScore,
string Summary
);

/// <summary>Daemon → server: eval run failed before producing an aggregate.</summary>
public readonly record struct EvalFailed(
string EvalRunId,
string SessionId,
string Reason
);

/// <summary>Agent run events posted to the server HTTP API.</summary>
record AgentRunStarted(
string? Prompt,
Expand Down
Loading