Skip to content

Commit aea0846

Browse files
committed
feat(worker): mark submission as running on evaluation start
1 parent 935a9e9 commit aea0846

File tree

8 files changed

+100
-3
lines changed

8 files changed

+100
-3
lines changed

apps/worker/cmd/labkit-worker/dev_fake_handler.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,9 @@ func (h *devFakeHandler) Handle(ctx context.Context, job *sqlc.EvaluationJobs) e
3737
if err != nil {
3838
return err
3939
}
40+
if err := h.persister.MarkRunning(ctx, submission.ID, startedAt); err != nil {
41+
return err
42+
}
4043
labRow, err := h.queries.GetLab(ctx, submission.LabID)
4144
if err != nil {
4245
return err

apps/worker/cmd/labkit-worker/runtime_handler.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ type jobAcknowledger interface {
3636
}
3737

3838
type resultPersister interface {
39+
MarkRunning(context.Context, uuid.UUID, time.Time) error
3940
Persist(context.Context, evaluation.PersistInput) error
4041
}
4142

@@ -72,6 +73,9 @@ func (h *runtimeHandler) Handle(ctx context.Context, job *sqlc.EvaluationJobs) e
7273
if err != nil {
7374
return err
7475
}
76+
if err := h.persister.MarkRunning(ctx, submission.ID, startedAt); err != nil {
77+
return err
78+
}
7579

7680
imageDigest, _ := h.resolveImageDigest(ctx, parsed.Eval.Image)
7781
result, err := h.evaluate(ctx, submission, parsed)

apps/worker/cmd/labkit-worker/runtime_handler_test.go

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,15 @@ func TestRuntimeHandlerPersistsScoredResultAndAcknowledgesDone(t *testing.T) {
8787
t.Fatalf("Handle() error = %v", err)
8888
}
8989

90+
if persister.markRunningCalls != 1 {
91+
t.Fatalf("markRunningCalls = %d, want 1", persister.markRunningCalls)
92+
}
93+
if persister.runningSubmissionID != submissionID {
94+
t.Fatalf("running submission id = %v, want %v", persister.runningSubmissionID, submissionID)
95+
}
96+
if got := persister.runningStartedAt; !got.Equal(time.Date(2026, 4, 1, 12, 0, 0, 0, time.UTC)) {
97+
t.Fatalf("running started_at = %v, want fixed test time", got)
98+
}
9099
if queue.status != jobs.StatusDone {
91100
t.Fatalf("ack status = %q, want %q", queue.status, jobs.StatusDone)
92101
}
@@ -251,7 +260,17 @@ func (q *fakeRuntimeQueue) Acknowledge(_ context.Context, _ uuid.UUID, _ string,
251260
}
252261

253262
type fakeRuntimePersister struct {
254-
input evaluation.PersistInput
263+
input evaluation.PersistInput
264+
markRunningCalls int
265+
runningSubmissionID uuid.UUID
266+
runningStartedAt time.Time
267+
}
268+
269+
func (p *fakeRuntimePersister) MarkRunning(_ context.Context, submissionID uuid.UUID, startedAt time.Time) error {
270+
p.markRunningCalls++
271+
p.runningSubmissionID = submissionID
272+
p.runningStartedAt = startedAt
273+
return nil
255274
}
256275

257276
func (p *fakeRuntimePersister) Persist(_ context.Context, input evaluation.PersistInput) error {

apps/worker/internal/service/evaluation/repo.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,10 @@ func (r *repo) BeginTx(ctx context.Context) (Tx, error) {
2525
return &storeTx{tx: tx, queries: sqlc.New(tx)}, nil
2626
}
2727

28+
func (r *repo) UpdateSubmissionRunning(ctx context.Context, arg sqlc.UpdateSubmissionRunningParams) error {
29+
return sqlc.New(r.pool).UpdateSubmissionRunning(ctx, arg)
30+
}
31+
2832
type storeTx struct {
2933
tx pgx.Tx
3034
queries *sqlc.Queries

apps/worker/internal/service/evaluation/service.go

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,14 @@ import (
1616
)
1717

1818
const (
19-
submissionStatusDone = string(labkit.SubmissionStatusDone)
20-
submissionStatusError = string(labkit.SubmissionStatusError)
19+
submissionStatusRunning = string(labkit.SubmissionStatusRunning)
20+
submissionStatusDone = string(labkit.SubmissionStatusDone)
21+
submissionStatusError = string(labkit.SubmissionStatusError)
2122
)
2223

2324
type Repository interface {
2425
BeginTx(context.Context) (Tx, error)
26+
UpdateSubmissionRunning(context.Context, sqlc.UpdateSubmissionRunningParams) error
2527
}
2628

2729
type Tx interface {
@@ -53,6 +55,17 @@ func NewService(repo Repository) *Service {
5355
}
5456
}
5557

58+
func (s *Service) MarkRunning(ctx context.Context, submissionID uuid.UUID, startedAt time.Time) error {
59+
if s == nil || s.repo == nil {
60+
return fmt.Errorf("evaluation service: repository is required")
61+
}
62+
return s.repo.UpdateSubmissionRunning(ctx, sqlc.UpdateSubmissionRunningParams{
63+
ID: submissionID,
64+
Status: submissionStatusRunning,
65+
StartedAt: timestamptzValue(startedAt),
66+
})
67+
}
68+
5669
func (s *Service) Persist(ctx context.Context, in PersistInput) (err error) {
5770
if s == nil || s.repo == nil {
5871
return fmt.Errorf("evaluation service: repository is required")

apps/worker/internal/service/evaluation/service_test.go

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -195,6 +195,27 @@ func TestPersistLatestScoredSubmissionReplacesLeaderboardPointer(t *testing.T) {
195195
}
196196
}
197197

198+
func TestMarkRunningUpdatesSubmissionStatusAndStartedAt(t *testing.T) {
199+
repo := newFakeRepository()
200+
svc := newTestService(repo)
201+
submission := seededSubmission("66666666-6666-7666-8666-666666666666", 7, "sorting")
202+
submission.Status = "queued"
203+
repo.submissions[submission.ID] = submission
204+
startedAt := time.Date(2026, 3, 31, 13, 30, 0, 0, time.UTC)
205+
206+
if err := svc.MarkRunning(context.Background(), submission.ID, startedAt); err != nil {
207+
t.Fatalf("MarkRunning() error = %v", err)
208+
}
209+
210+
stored := repo.submissions[submission.ID]
211+
if stored.Status != "running" {
212+
t.Fatalf("submission status = %q, want %q", stored.Status, "running")
213+
}
214+
if !stored.StartedAt.Valid || !stored.StartedAt.Time.Equal(startedAt) {
215+
t.Fatalf("submission started_at = %v, want %v", stored.StartedAt, startedAt)
216+
}
217+
}
218+
198219
func newTestService(repo *fakeRepository) *Service {
199220
svc := NewService(repo)
200221
svc.now = func() time.Time { return repo.now }
@@ -279,6 +300,14 @@ func (r *fakeRepository) BeginTx(context.Context) (Tx, error) {
279300
return tx, nil
280301
}
281302

303+
func (r *fakeRepository) UpdateSubmissionRunning(_ context.Context, arg sqlc.UpdateSubmissionRunningParams) error {
304+
row := r.submissions[arg.ID]
305+
row.Status = arg.Status
306+
row.StartedAt = arg.StartedAt
307+
r.submissions[arg.ID] = row
308+
return nil
309+
}
310+
282311
type fakeTx struct {
283312
repo *fakeRepository
284313
updateCalls int
@@ -356,4 +385,5 @@ var _ interface {
356385

357386
var _ interface {
358387
BeginTx(context.Context) (Tx, error)
388+
UpdateSubmissionRunning(context.Context, sqlc.UpdateSubmissionRunningParams) error
359389
} = (*fakeRepository)(nil)

db/queries/submissions.sql

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,12 @@ SET status = $2,
4141
finished_at = $8
4242
WHERE id = $1;
4343

44+
-- name: UpdateSubmissionRunning :exec
45+
UPDATE submissions
46+
SET status = $2,
47+
started_at = $3
48+
WHERE id = $1;
49+
4450
-- name: CreateScore :exec
4551
INSERT INTO scores (submission_id, metric_id, value)
4652
VALUES ($1, $2, $3)

packages/go/db/sqlc/submissions.sql.go

Lines changed: 18 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)