From 18f058c595cba6565047d6d59e35b7fb11b60a4f Mon Sep 17 00:00:00 2001 From: Myles Date: Thu, 19 Feb 2026 12:10:24 +0100 Subject: [PATCH 01/10] WIP work reconciliation Refactor recording reconciliation into child workflows WIP WIP --- README.md | 21 + cmd/genkit-dev/main.go | 4 + cmd/worker/main.go | 1 + internal/ai/brainstormlabelqueries.go | 46 +++ internal/ai/brainstormworkqueries.go | 54 +++ internal/ai/grouptracksbywork.go | 65 +++ internal/ai/matchtrackpieces.go | 92 +++++ .../ai/{labelmatch.go => pickbestlabel.go} | 41 +- internal/ai/pickbestwork.go | 88 ++++ internal/ai/types.go | 12 + .../alpaca/confirmalbumnotingested_test.go | 9 + .../reconcile/albummetadata_test.go | 13 + .../reconcile/findworkcandidates.go | 378 ++++++++++++++++++ .../reconcile/findworkcandidates_test.go | 359 +++++++++++++++++ .../activities/reconcile/grouptracksbywork.go | 69 ++++ .../reconcile/grouptracksbywork_test.go | 99 +++++ .../reconcile/reconcilerecording.go | 36 ++ internal/temporal/memo/activitylog.go | 16 +- internal/temporal/workflows/helpers.go | 45 +++ internal/temporal/workflows/ingestalbum.go | 88 +++- .../temporal/workflows/ingestalbum_test.go | 164 +++++++- upstream/alpaca/client.go | 30 +- upstream/alpaca/workfull.go | 44 ++ upstream/alpaca/worktypes.go | 34 ++ 24 files changed, 1728 insertions(+), 80 deletions(-) create mode 100644 internal/ai/brainstormlabelqueries.go create mode 100644 internal/ai/brainstormworkqueries.go create mode 100644 internal/ai/grouptracksbywork.go create mode 100644 internal/ai/matchtrackpieces.go rename internal/ai/{labelmatch.go => pickbestlabel.go} (57%) create mode 100644 internal/ai/pickbestwork.go create mode 100644 internal/ai/types.go create mode 100644 internal/temporal/activities/reconcile/findworkcandidates.go create mode 100644 internal/temporal/activities/reconcile/findworkcandidates_test.go create mode 100644 internal/temporal/activities/reconcile/grouptracksbywork.go create mode 100644 internal/temporal/activities/reconcile/grouptracksbywork_test.go create mode 100644 internal/temporal/activities/reconcile/reconcilerecording.go create mode 100644 internal/temporal/workflows/helpers.go create mode 100644 upstream/alpaca/workfull.go create mode 100644 upstream/alpaca/worktypes.go diff --git a/README.md b/README.md index 7656577..84266ac 100644 --- a/README.md +++ b/README.md @@ -9,6 +9,27 @@ Alpaca, and ingests matched albums via Alpaca's ingestion API. - **Temporal Worker**: Executes album reconciliation workflows - **HTTP API (TODO)**: Accepts workflow requests, exposes workflow state +### Workflow Structure + +```mermaid +graph TD + A["**FetchLMSSubmission** + fetch submission from LMS"] --> B["**ConfirmAlbumNotIngested** + abort if album already exists in Alpaca"] + B --> C["**ReconcileAlbumMetadata** + match record label, parse release date & p-line"] + B --> D["**GroupTracksByWork** + LLM groups tracks into presumptive works based on track titles"] + + D --> R["**ReconcileWork** ×N + one concurrent goroutine per track group; + finds work candidates, matches tracks to pieces"] + + C --> G["return IngestAlbumResult"] + R --> G +``` + + ## External Dependencies - [LMS (Label Metadata Service)](https://github.com/IDAGIO/idagio-label-metadata-explorer-service) - Source of preprocessed DDEX submissions diff --git a/cmd/genkit-dev/main.go b/cmd/genkit-dev/main.go index e28b616..32b6fc0 100644 --- a/cmd/genkit-dev/main.go +++ b/cmd/genkit-dev/main.go @@ -29,6 +29,10 @@ func main() { aiflows.BrainstormLabelQueriesFlow(g) aiflows.PickBestLabelFlow(g) + aiflows.GroupTracksByWorkFlow(g) + aiflows.BrainstormWorkQueriesFlow(g) + aiflows.PickBestWorkFlow(g) + aiflows.MatchTrackPiecesFlow(g) mux := http.NewServeMux() for _, flow := range genkit.ListFlows(g) { diff --git a/cmd/worker/main.go b/cmd/worker/main.go index e049f35..d773107 100644 --- a/cmd/worker/main.go +++ b/cmd/worker/main.go @@ -57,6 +57,7 @@ func main() { w.RegisterActivity(&alpacaactivities.Config{Client: alpacaClient}) w.RegisterActivity(reconcileactivities.NewConfig(alpacaClient, g)) + w.RegisterActivity(reconcileactivities.NewReconcileRecordingConfig(alpacaClient, g)) err = w.Run(worker.InterruptCh()) if err != nil { diff --git a/internal/ai/brainstormlabelqueries.go b/internal/ai/brainstormlabelqueries.go new file mode 100644 index 0000000..db0ae08 --- /dev/null +++ b/internal/ai/brainstormlabelqueries.go @@ -0,0 +1,46 @@ +package ai + +import ( + "context" + "errors" + "fmt" + + "github.com/firebase/genkit/go/ai" + "github.com/firebase/genkit/go/core" + "github.com/firebase/genkit/go/genkit" +) + +func BrainstormLabelQueriesFlow(g *genkit.Genkit) *core.Flow[string, []string, struct{}] { + return genkit.DefineFlow(g, "brainstormLabelQueries", + func(ctx context.Context, labelName string) ([]string, error) { + if labelName == "" { + return nil, errors.New("label name must not be empty") + } + + result, _, err := genkit.GenerateData[alternativeQueries](ctx, g, + ai.WithModelName("openai/gpt-5.2"), + ai.WithSystem(`You are a classical music metadata expert. +You help match record label names to their canonical entries in a music database. + +The queries you suggest will be used as case-insensitive substring searches against a label name column. +Effective strategies: +- Shorter, core-name queries match more broadly than long ones +- Remove suffixes like "GmbH", "Ltd", "Inc", "Records", "Recordings", "Rec.", "Classics" +- Abbreviate or expand well-known names in both directions (e.g. "UMG" ↔ "Universal Music", "DG" ↔ "Deutsche Grammophon") +- Drop punctuation (hyphens, periods, ampersands) +- Parent company names can substitute for imprint names and vice versa + +Given a label name, produce 3-5 alternative queries most likely to match the canonical label name. +Prefer the shortest effective form for each variation.`), + ai.WithPrompt( + "The label name %q was not found. Suggest 3-5 alternative search queries.", + labelName, + ), + ) + if err != nil { + return nil, fmt.Errorf("brainstorm label alternatives: %w", err) + } + return result.Queries, nil + }, + ) +} diff --git a/internal/ai/brainstormworkqueries.go b/internal/ai/brainstormworkqueries.go new file mode 100644 index 0000000..8107feb --- /dev/null +++ b/internal/ai/brainstormworkqueries.go @@ -0,0 +1,54 @@ +package ai + +import ( + "context" + "errors" + "fmt" + + "github.com/firebase/genkit/go/ai" + "github.com/firebase/genkit/go/core" + "github.com/firebase/genkit/go/genkit" +) + +type BrainstormWorkInput struct { + WorkTitle string `json:"work_title"` + ComposerName string `json:"composer_name"` +} + +func BrainstormWorkQueriesFlow(g *genkit.Genkit) *core.Flow[BrainstormWorkInput, []string, struct{}] { + return genkit.DefineFlow(g, "brainstormWorkQueries", + func(ctx context.Context, input BrainstormWorkInput) ([]string, error) { + if input.WorkTitle == "" { + return nil, errors.New("work title must not be empty") + } + + result, _, err := genkit.GenerateData[alternativeQueries](ctx, g, + ai.WithModelName("openai/gpt-5.2"), + ai.WithSystem(`You are a classical music metadata expert. +You help match classical work titles to their canonical entries in a music database. + +The queries you suggest will be used as case-insensitive substring searches against a work title column, alongside a composer name filter. + +Effective strategies: +- Opus/catalogue number variations: Op., BWV, K., KV, D., HWV, RV, S., Sz., etc. +- Language variants: German vs Italian vs French vs English titles (e.g. "Sinfonie" vs "Symphony" vs "Symphonie") +- Popular title vs formal title (e.g. "Moonlight Sonata" vs "Piano Sonata No. 14") +- Number formatting differences ("No. 5" vs "Nr. 5" vs "n° 5") +- Abbreviations and key variations ("G minor" vs "g-Moll" vs "sol mineur") +- Prefer the shortest effective form for each variation +- Avoid including movement/section information, e.g. return a query for "Piano Sonata No. 14" instead of "Piano Sonata No. 14, Op. 27 No. 2 'Moonlight' Allegro", +or "Carmen" instead of "Carmen La Fleur de L'Amour" + +Given a work title and composer name, produce 3-5 alternative search queries most likely to match the canonical title.`), + ai.WithPrompt( + "The work %q by %q was not found. Suggest 3-5 alternative search queries.", + input.WorkTitle, input.ComposerName, + ), + ) + if err != nil { + return nil, fmt.Errorf("brainstorm work alternatives: %w", err) + } + return result.Queries, nil + }, + ) +} diff --git a/internal/ai/grouptracksbywork.go b/internal/ai/grouptracksbywork.go new file mode 100644 index 0000000..e8c7a09 --- /dev/null +++ b/internal/ai/grouptracksbywork.go @@ -0,0 +1,65 @@ +package ai + +import ( + "context" + "encoding/json" + "fmt" + + "github.com/firebase/genkit/go/ai" + "github.com/firebase/genkit/go/core" + "github.com/firebase/genkit/go/genkit" +) + +type AnalyzeTracksInput struct { + Tracks []TrackSummary `json:"tracks"` +} + +type IdentifiedWork struct { + WorkTitle string `json:"work_title"` + ComposerName string `json:"composer_name"` + TrackIndices []int `json:"track_indices"` +} + +type AnalyzeTracksResult struct { + Works []IdentifiedWork `json:"works"` +} + +func GroupTracksByWorkFlow(g *genkit.Genkit) *core.Flow[AnalyzeTracksInput, *AnalyzeTracksResult, struct{}] { + return genkit.DefineFlow(g, "groupTracksByWork", + func(ctx context.Context, input AnalyzeTracksInput) (*AnalyzeTracksResult, error) { + if len(input.Tracks) == 0 { + return &AnalyzeTracksResult{}, nil + } + + tracksJSON, err := json.Marshal(input.Tracks) + if err != nil { + return nil, fmt.Errorf("marshal tracks: %w", err) + } + + result, _, err := genkit.GenerateData[AnalyzeTracksResult](ctx, g, + ai.WithModelName("openai/gpt-5.2"), + ai.WithSystem(`You are a classical music metadata expert. +You analyze track listings from album submissions and identify which musical works the tracks belong to. + +Idagio uses a Work → WorkPart → Piece hierarchy: +- A Work is the top-level composition (e.g. "Symphony No. 40 in G minor, K. 550") +- WorkParts are movements or sections +- Pieces are the individual performable units that tracks map to + +Your task is to group tracks into works, identifying the work title and composer for each group. + +Important rules: +- Track titles are unpredictable: they may use "Work / Movement" patterns (e.g. "Symphonie n° 40, K 550 / Molto allegro"), or just the work name for single-movement pieces, or entirely opaque titles +- Composer names come from the input data, NOT from track titles +- Consecutive tracks by the same composer with movement-like titles likely belong to the same work +- Single-movement modern works (e.g. "Become Ocean") — the track title IS the work title +- Work titles should be the canonical name of the composition including catalogue/opus numbers if visible (e.g. "Symphony No. 40 in G minor, K. 550")`), + ai.WithPrompt("Analyze these tracks and group them into works:\n\n%s", string(tracksJSON)), + ) + if err != nil { + return nil, fmt.Errorf("analyze track works: %w", err) + } + return result, nil + }, + ) +} diff --git a/internal/ai/matchtrackpieces.go b/internal/ai/matchtrackpieces.go new file mode 100644 index 0000000..eac6c01 --- /dev/null +++ b/internal/ai/matchtrackpieces.go @@ -0,0 +1,92 @@ +package ai + +import ( + "context" + "encoding/json" + "errors" + "fmt" + + "github.com/firebase/genkit/go/ai" + "github.com/firebase/genkit/go/core" + "github.com/firebase/genkit/go/genkit" +) + +type PieceSummary struct { + PieceID string `json:"piece_id"` + Title string `json:"title"` + WorkpartName string `json:"workpart_name"` + Position int `json:"position"` +} + +type MatchTrackPiecesInput struct { + WorkTitle string `json:"work_title"` + ComposerName string `json:"composer_name"` + Tracks []TrackSummary `json:"tracks"` + Pieces []PieceSummary `json:"pieces"` +} + +type TrackPieceMapping struct { + TrackIndex int `json:"track_index"` + PieceID string `json:"piece_id"` +} + +type MatchTrackPiecesResult struct { + Mappings []TrackPieceMapping `json:"mappings"` + Confidence float64 `json:"confidence"` + Reasoning string `json:"reasoning"` +} + +func MatchTrackPiecesFlow(g *genkit.Genkit) *core.Flow[MatchTrackPiecesInput, *MatchTrackPiecesResult, struct{}] { + return genkit.DefineFlow(g, "matchTrackPieces", + func(ctx context.Context, input MatchTrackPiecesInput) (*MatchTrackPiecesResult, error) { + if len(input.Tracks) == 0 { + return nil, errors.New("tracks must not be empty") + } + if len(input.Pieces) == 0 { + return nil, errors.New("pieces must not be empty") + } + + tracksJSON, err := json.Marshal(input.Tracks) + if err != nil { + return nil, fmt.Errorf("marshal tracks: %w", err) + } + piecesJSON, err := json.Marshal(input.Pieces) + if err != nil { + return nil, fmt.Errorf("marshal pieces: %w", err) + } + + result, _, err := genkit.GenerateData[MatchTrackPiecesResult](ctx, g, + ai.WithModelName("openai/gpt-5.2"), + ai.WithSystem(`You are a classical music metadata expert. +You match album tracks to specific pieces within a musical work's structure. + +A work has workparts (movements/sections), each containing one or more pieces. +Each track on the album should map to exactly one piece. + +Important considerations: +- Movement numbering variants: "I. Allegro" vs "1. Allegro" vs "Allegro (1st movement)" +- Split movements: a work might have pieces [Mvt 1, Mvt 2, Mvt 1a, Mvt 1b] where the album splits Mvt 1 into two tracks mapping to 1a and 1b +- Language differences: track titles may be in a different language than piece titles (e.g. German track "Sehr langsam" matching English piece "Very slow") +- Non-musical pieces: ignore applause tracks, cadenza inserts, or bonus tracks that don't map to any piece +- Track count vs piece count mismatches: the album may include fewer or more tracks than the work has pieces +- Tempo/character markings: "Allegro con brio" should match "Allegro con brio" even with minor spelling differences + +Confidence semantics: +- >= 0.7: all tracks map cleanly to pieces with high certainty +- 0.5-0.7: most tracks map but some are uncertain +- < 0.5: poor match — likely the wrong work + +If a track cannot be mapped to any piece, omit it from the mappings array. +Return your confidence assessment honestly — it's better to report low confidence than to force bad mappings.`), + ai.WithPrompt( + "Match these tracks to pieces for the work %q by %s.\n\nTracks:\n%s\n\nPieces:\n%s", + input.WorkTitle, input.ComposerName, string(tracksJSON), string(piecesJSON), + ), + ) + if err != nil { + return nil, fmt.Errorf("match track pieces: %w", err) + } + return result, nil + }, + ) +} diff --git a/internal/ai/labelmatch.go b/internal/ai/pickbestlabel.go similarity index 57% rename from internal/ai/labelmatch.go rename to internal/ai/pickbestlabel.go index 8ae807b..58190a6 100644 --- a/internal/ai/labelmatch.go +++ b/internal/ai/pickbestlabel.go @@ -11,45 +11,6 @@ import ( "github.com/firebase/genkit/go/genkit" ) -type alternativeQueries struct { - Queries []string `json:"queries"` -} - -func BrainstormLabelQueriesFlow(g *genkit.Genkit) *core.Flow[string, []string, struct{}] { - return genkit.DefineFlow(g, "brainstormLabelQueries", - func(ctx context.Context, labelName string) ([]string, error) { - if labelName == "" { - return nil, errors.New("label name must not be empty") - } - - result, _, err := genkit.GenerateData[alternativeQueries](ctx, g, - ai.WithModelName("openai/gpt-4o"), - ai.WithSystem(`You are a classical music metadata expert. -You help match record label names to their canonical entries in a music database. - -The queries you suggest will be used as case-insensitive substring searches against a label name column. -Effective strategies: -- Shorter, core-name queries match more broadly than long ones -- Remove suffixes like "GmbH", "Ltd", "Inc", "Records", "Recordings", "Rec.", "Classics" -- Abbreviate or expand well-known names in both directions (e.g. "UMG" ↔ "Universal Music", "DG" ↔ "Deutsche Grammophon") -- Drop punctuation (hyphens, periods, ampersands) -- Parent company names can substitute for imprint names and vice versa - -Given a label name, produce 3-5 alternative queries most likely to match the canonical label name. -Prefer the shortest effective form for each variation.`), - ai.WithPrompt( - "The label name %q was not found. Suggest 3-5 alternative search queries.", - labelName, - ), - ) - if err != nil { - return nil, fmt.Errorf("brainstorm label alternatives: %w", err) - } - return result.Queries, nil - }, - ) -} - type LabelCandidate struct { ID string `json:"id"` Name string `json:"name"` @@ -83,7 +44,7 @@ func PickBestLabelFlow(g *genkit.Genkit) *core.Flow[PickLabelInput, *PickLabelRe } result, _, err := genkit.GenerateData[PickLabelResult](ctx, g, - ai.WithModelName("openai/gpt-4o"), + ai.WithModelName("openai/gpt-5.2"), ai.WithSystem(`You are a classical music metadata expert. You help match record label names to their canonical entries in a music database. diff --git a/internal/ai/pickbestwork.go b/internal/ai/pickbestwork.go new file mode 100644 index 0000000..c890ac2 --- /dev/null +++ b/internal/ai/pickbestwork.go @@ -0,0 +1,88 @@ +package ai + +import ( + "context" + "encoding/json" + "errors" + "fmt" + + "github.com/firebase/genkit/go/ai" + "github.com/firebase/genkit/go/core" + "github.com/firebase/genkit/go/genkit" +) + +type WorkCandidate struct { + ID string `json:"id"` + Title string `json:"title"` + ComposerNames []string `json:"composer_names"` + OpusNumber string `json:"opus_number,omitempty"` + CreatedAt string `json:"created_at,omitempty"` +} + +type PickWorkInput struct { + OriginalTitle string `json:"original_title"` + ComposerName string `json:"composer_name"` + TrackTitles []string `json:"track_titles"` + Candidates []WorkCandidate `json:"candidates"` +} + +type PickWorkResult struct { + WorkID string `json:"work_id"` + WorkTitle string `json:"work_title"` + Confidence float64 `json:"confidence"` + Reasoning string `json:"reasoning"` +} + +func PickBestWorkFlow(g *genkit.Genkit) *core.Flow[PickWorkInput, *PickWorkResult, struct{}] { + return genkit.DefineFlow(g, "pickBestWork", + func(ctx context.Context, input PickWorkInput) (*PickWorkResult, error) { + if input.OriginalTitle == "" { + return nil, errors.New("original work title must not be empty") + } + if len(input.Candidates) == 0 { + return nil, nil + } + + candidatesJSON, err := json.Marshal(input.Candidates) + if err != nil { + return nil, fmt.Errorf("marshal candidates: %w", err) + } + + trackTitlesJSON, err := json.Marshal(input.TrackTitles) + if err != nil { + return nil, fmt.Errorf("marshal track titles: %w", err) + } + + result, _, err := genkit.GenerateData[PickWorkResult](ctx, g, + ai.WithModelName("openai/gpt-5.2"), + ai.WithSystem(`You are a classical music metadata expert. +You help match work titles to their canonical entries in a music database. + +Given a work title, composer name, track titles from the album, and a list of candidate works from the database, pick the best match. Consider: +- The candidate may use a different language for the title +- Opus/catalogue numbers should match if present in both +- Minor spelling or formatting differences are acceptable +- Popular titles vs formal titles (e.g. "Moonlight" vs "Piano Sonata No. 14, Op. 27 No. 2") +- The composer names should match if present in both the candidate and the original work title +- The track titles represent individual movements or sections. The matched work should contain pieces that correspond to them. +- Original works will usually have older creation dates than excerpts, arrangements, etc. Use this as a clue and +prefer the original work when it's not clear that an arrangement is needed. + +IMPORTANT — avoid excerpts: +- Do NOT match excerpt or single-movement works whose title includes a specific piece name (e.g. "Symphony No. 5: Allegro con brio", "Die Walküre: Ride of the Valkyries") when a broader, complete work (e.g. "Symphony No. 5", "Die Walküre") is available as a candidate. +- The broader work will contain pieces for all the track titles, while an excerpt only covers one. +- Only match an excerpt if the album genuinely contains just that one excerpt and no broader work candidate exists. + +Return the work_id of the best match. If you are not very confident that any of the candidates are the correct work, return an empty work_id.`), + ai.WithPrompt( + "The work %q by %q was not found. The album has these tracks:\n%s\n\nWhich of these database works is the best match?\n\n%s", + input.OriginalTitle, input.ComposerName, string(trackTitlesJSON), string(candidatesJSON), + ), + ) + if err != nil { + return nil, fmt.Errorf("pick best work: %w", err) + } + return result, nil + }, + ) +} diff --git a/internal/ai/types.go b/internal/ai/types.go new file mode 100644 index 0000000..9e1e88c --- /dev/null +++ b/internal/ai/types.go @@ -0,0 +1,12 @@ +package ai + +// alternativeQueries is the LLM output schema shared by brainstorm flows. +type alternativeQueries struct { + Queries []string `json:"queries"` +} + +type TrackSummary struct { + Index int `json:"index"` + Title string `json:"title"` + ComposerNames []string `json:"composer_names"` +} diff --git a/internal/temporal/activities/alpaca/confirmalbumnotingested_test.go b/internal/temporal/activities/alpaca/confirmalbumnotingested_test.go index a59afa0..6cea95a 100644 --- a/internal/temporal/activities/alpaca/confirmalbumnotingested_test.go +++ b/internal/temporal/activities/alpaca/confirmalbumnotingested_test.go @@ -9,6 +9,7 @@ import ( "go.temporal.io/sdk/testsuite" "idagio/auto-ingester/internal/temporal/activities" + "idagio/auto-ingester/upstream/alpaca" "idagio/auto-ingester/upstream/alpaca/openapi/client/albums_admin" "idagio/auto-ingester/upstream/alpaca/openapi/models" ) @@ -27,6 +28,14 @@ func (m *mockAPI) GetLabels(_ context.Context, _ string) ([]*models.LabelAdminLa return nil, nil } +func (m *mockAPI) GetWorks(_ context.Context, _ string, _ string) ([]*models.WorkAdminWork, error) { + return nil, nil +} + +func (m *mockAPI) GetWorkByIDFull(_ context.Context, _ string) (*alpaca.WorkFull, error) { + return nil, nil +} + func TestConfirmAlbumNotIngested(t *testing.T) { testSuite := &testsuite.WorkflowTestSuite{} env := testSuite.NewTestActivityEnvironment() diff --git a/internal/temporal/activities/reconcile/albummetadata_test.go b/internal/temporal/activities/reconcile/albummetadata_test.go index 935d8c2..6f7e061 100644 --- a/internal/temporal/activities/reconcile/albummetadata_test.go +++ b/internal/temporal/activities/reconcile/albummetadata_test.go @@ -11,6 +11,7 @@ import ( "idagio/auto-ingester/internal/ai" "idagio/auto-ingester/internal/temporal/activities" + "idagio/auto-ingester/upstream/alpaca" "idagio/auto-ingester/upstream/alpaca/openapi/models" "idagio/auto-ingester/upstream/lms" ) @@ -20,6 +21,10 @@ type mockAPI struct { albumErr error labels []*models.LabelAdminLabel labelsErr error + works []*models.WorkAdminWork + worksErr error + workFull *alpaca.WorkFull + workFullErr error } func (m *mockAPI) GetAlbumByID(_ context.Context, _ string) (*models.AlbumWithTrackIDs, error) { @@ -30,6 +35,14 @@ func (m *mockAPI) GetLabels(_ context.Context, _ string) ([]*models.LabelAdminLa return m.labels, m.labelsErr } +func (m *mockAPI) GetWorks(_ context.Context, _ string, _ string) ([]*models.WorkAdminWork, error) { + return m.works, m.worksErr +} + +func (m *mockAPI) GetWorkByIDFull(_ context.Context, _ string) (*alpaca.WorkFull, error) { + return m.workFull, m.workFullErr +} + func TestAlbumMetadata(t *testing.T) { testSuite := &testsuite.WorkflowTestSuite{} env := testSuite.NewTestActivityEnvironment() diff --git a/internal/temporal/activities/reconcile/findworkcandidates.go b/internal/temporal/activities/reconcile/findworkcandidates.go new file mode 100644 index 0000000..b2a9d75 --- /dev/null +++ b/internal/temporal/activities/reconcile/findworkcandidates.go @@ -0,0 +1,378 @@ +package reconcile + +import ( + "context" + "fmt" + "strings" + + "go.temporal.io/sdk/activity" + + "idagio/auto-ingester/internal/ai" + "idagio/auto-ingester/internal/temporal/activities" + "idagio/auto-ingester/internal/temporal/memo" + "idagio/auto-ingester/upstream/alpaca" + "idagio/auto-ingester/upstream/alpaca/openapi/models" +) + +type ReconcileWorkInput struct { + ComposerName string `json:"composer_name"` + WorkTitle string `json:"work_title"` + TrackIndices []int `json:"track_indices"` + Tracks []TrackForPiece `json:"tracks"` +} + +type TrackForPiece struct { + Index int `json:"index"` + Title string `json:"title"` +} + +type workCandidate struct { + workID string + workTitle string + confidence float64 + exactMatch bool +} + +func (c *ReconcileRecordingConfig) ReconcileWork( + ctx context.Context, + input ReconcileWorkInput, +) (*activities.WithLog[ResolvedRecording], error) { + al := memo.NewActivityLogger(ctx) + + trackTitles := make([]string, len(input.Tracks)) + for i, t := range input.Tracks { + trackTitles[i] = stripWorkPrefix(t.Title, input.WorkTitle) + } + + candidates, err := c.findWorkCandidates(ctx, al, input.WorkTitle, input.ComposerName, trackTitles) + if err != nil { + return nil, err + } + + trackSummaries := make([]ai.TrackSummary, len(input.Tracks)) + for i, t := range input.Tracks { + trackSummaries[i] = ai.TrackSummary{ + Index: t.Index, + Title: t.Title, + ComposerNames: []string{input.ComposerName}, + } + } + + for _, cand := range candidates { + result, matched, err := c.tryMatchPieces(ctx, al, cand, input, trackSummaries) + if err != nil { + return nil, err + } + if matched { + return activities.Done(al, *result), nil + } + } + + return nil, fmt.Errorf("no work candidate matched pieces for %q by %s", input.WorkTitle, input.ComposerName) +} + +func (c *ReconcileRecordingConfig) findWorkCandidates( + ctx context.Context, + al *memo.ActivityLogger, + workTitle, composerName string, + trackTitles []string, +) ([]workCandidate, error) { + logger := activity.GetLogger(ctx) + + seen := make(map[string]bool) + var searchCandidates []ai.WorkCandidate + + works, err := c.AlpacaAPI.GetWorks(ctx, workTitle, composerName) + if err != nil { + logger.Warn("initial work search failed", "query", workTitle, "error", err) + } else { + if wID, title, ok := findExactWork(works, workTitle); ok { + al.Append("work_matched", fmt.Sprintf("exact match %q → work ID %s", title, wID)) + return []workCandidate{{workID: wID, workTitle: title, confidence: 1.0, exactMatch: true}}, nil + } + for _, w := range works { + if !seen[w.ID] { + seen[w.ID] = true + searchCandidates = append(searchCandidates, workToCandidate(w)) + } + } + } + + alternatives, err := c.brainstormWorkQueries(ctx, ai.BrainstormWorkInput{ + WorkTitle: workTitle, + ComposerName: composerName, + }) + if err != nil { + return nil, fmt.Errorf("brainstorm work alternatives for %q: %w", workTitle, err) + } + var altLines []string + for _, alt := range alternatives { + altLines = append(altLines, fmt.Sprintf(" - %s", alt)) + } + al.Append("work_brainstorm", fmt.Sprintf( + "no exact match for %q by %s, brainstormed %d alternative queries:\n%s", + workTitle, composerName, len(alternatives), strings.Join(altLines, "\n"))) + + for _, alt := range alternatives { + works, err := c.AlpacaAPI.GetWorks(ctx, alt, composerName) + if err != nil { + continue + } + if wID, title, ok := findExactWork(works, workTitle); ok { + al.Append("work_matched", fmt.Sprintf("exact match %q → work ID %s (via brainstorm query %q)", title, wID, alt)) + return []workCandidate{{workID: wID, workTitle: title, confidence: 1.0, exactMatch: true}}, nil + } + for _, w := range works { + if !seen[w.ID] { + seen[w.ID] = true + searchCandidates = append(searchCandidates, workToCandidate(w)) + } + } + } + + if len(searchCandidates) == 0 { + return nil, fmt.Errorf("no work candidates found in Alpaca for %q by %s (tried %d queries)", + workTitle, composerName, 1+len(alternatives)) + } + + logger.Debug("picking best work", "work_title", workTitle, "candidates", len(searchCandidates)) + picked, err := c.pickBestWork(ctx, ai.PickWorkInput{ + OriginalTitle: workTitle, + ComposerName: composerName, + TrackTitles: trackTitles, + Candidates: searchCandidates, + }) + if err != nil { + return nil, fmt.Errorf("pick best work for %q: %w", workTitle, err) + } + if picked == nil || picked.WorkID == "" { + return nil, fmt.Errorf("LLM could not match %q by %s to any of %d candidates", + workTitle, composerName, len(searchCandidates)) + } + + ranked := []workCandidate{{ + workID: picked.WorkID, + workTitle: picked.WorkTitle, + confidence: picked.Confidence, + }} + for _, c := range searchCandidates { + if c.ID != picked.WorkID { + ranked = append(ranked, workCandidate{workID: c.ID, workTitle: c.Title}) + } + } + + var otherLines []string + for _, c := range searchCandidates { + if c.ID != picked.WorkID { + otherLines = append(otherLines, fmt.Sprintf(" - %s (%s)", c.Title, c.ID)) + } + } + detail := fmt.Sprintf( + "fuzzy-matched %q by %s → %q (%s)\n confidence: %.2f\n reasoning: %s", + workTitle, composerName, picked.WorkTitle, picked.WorkID, + picked.Confidence, picked.Reasoning) + if len(otherLines) > 0 { + detail += fmt.Sprintf("\n other candidates:\n%s", strings.Join(otherLines, "\n")) + } + al.Append("work_matched", detail) + + return ranked, nil +} + +func (c *ReconcileRecordingConfig) tryMatchPieces( + ctx context.Context, + al *memo.ActivityLogger, + cand workCandidate, + input ReconcileWorkInput, + trackSummaries []ai.TrackSummary, +) (*ResolvedRecording, bool, error) { + logger := activity.GetLogger(ctx) + + work, err := c.AlpacaAPI.GetWorkByIDFull(ctx, cand.workID) + if err != nil { + return nil, false, fmt.Errorf("get work %s: %w", cand.workID, err) + } + + pieces := flattenPieces(work) + if len(pieces) == 0 { + al.Append("no_pieces", fmt.Sprintf("work %s (%q) has no pieces", cand.workID, cand.workTitle)) + return nil, false, nil + } + + logger.Debug("matching track pieces", + "work_id", cand.workID, + "work_title", cand.workTitle, + "composer_name", input.ComposerName, + "tracks", len(trackSummaries), + "pieces", len(pieces), + ) + + matchResult, err := c.matchTrackPieces(ctx, ai.MatchTrackPiecesInput{ + WorkTitle: cand.workTitle, + ComposerName: input.ComposerName, + Tracks: trackSummaries, + Pieces: pieces, + }) + if err != nil { + return nil, false, fmt.Errorf("match track pieces for work %s: %w", cand.workID, err) + } + + if matchResult.Confidence >= 0.7 { + al.Append("pieces_matched", formatPiecesMatched(work, matchResult, input.Tracks, cand)) + + return &ResolvedRecording{ + WorkID: cand.workID, + WorkTitle: cand.workTitle, + ComposerName: input.ComposerName, + TrackIndices: input.TrackIndices, + Mappings: matchResult.Mappings, + }, true, nil + } + + al.Append("pieces_low_confidence", fmt.Sprintf( + "low confidence %.2f matching tracks to %q — likely wrong work (reasoning: %s)", + matchResult.Confidence, cand.workTitle, matchResult.Reasoning)) + + return nil, false, nil +} + +func stripWorkPrefix(trackTitle, workTitle string) string { + lower := strings.ToLower(trackTitle) + prefix := strings.ToLower(workTitle) + if !strings.HasPrefix(lower, prefix) { + return trackTitle + } + rest := strings.TrimLeft(trackTitle[len(workTitle):], " /:-–—") + if rest == "" { + return trackTitle + } + return rest +} + +func flattenPieces(work *alpaca.WorkFull) []ai.PieceSummary { + var pieces []ai.PieceSummary + for _, wp := range work.Workparts { + if wp.RecordingID != nil { + continue + } + wpTitle := translationText(wp.Title) + for _, p := range wp.Pieces { + pieces = append(pieces, ai.PieceSummary{ + PieceID: p.ID, + Title: translationText(p.Title), + WorkpartName: wpTitle, + Position: p.PositionInWorkpart, + }) + } + } + return pieces +} + +func translationText(ts []alpaca.Translation) string { + for _, t := range ts { + if t.Locale == "en-GB" { + return t.Text + } + } + if len(ts) > 0 { + return ts[0].Text + } + return "" +} + +func findExactWork(works []*models.WorkAdminWork, title string) (id, resolvedTitle string, ok bool) { + for _, w := range works { + wt := workTitle(w) + if strings.EqualFold(wt, title) { + return w.ID, wt, true + } + } + return "", "", false +} + +func workTitle(w *models.WorkAdminWork) string { + for _, t := range w.Title { + if t.Locale == "en-GB" { + return t.Text + } + } + if len(w.Title) > 0 { + return w.Title[0].Text + } + return "" +} + +func formatPiecesMatched( + work *alpaca.WorkFull, + result *ai.MatchTrackPiecesResult, + tracks []TrackForPiece, + cand workCandidate, +) string { + pieceToTrack := make(map[string]int) + for _, m := range result.Mappings { + pieceToTrack[m.PieceID] = m.TrackIndex + } + trackTitle := make(map[int]string) + for _, t := range tracks { + trackTitle[t.Index] = t.Title + } + + var sb strings.Builder + fmt.Fprintf(&sb, "matched %d tracks to pieces in %q (%s, confidence: %.2f)\n", + len(result.Mappings), cand.workTitle, cand.workID, result.Confidence) + fmt.Fprintf(&sb, "reasoning: %s\n", result.Reasoning) + fmt.Fprintf(&sb, "%s", cand.workTitle) + + type canonicalWP struct { + title string + pieces []alpaca.PieceFull + } + var visibleWPs []canonicalWP + for _, wp := range work.Workparts { + if wp.RecordingID != nil { + continue + } + title := translationText(wp.Title) + if title == "" { + title = "(untitled workpart)" + } + visibleWPs = append(visibleWPs, canonicalWP{title: title, pieces: wp.Pieces}) + } + + for wi, wp := range visibleWPs { + lastWP := wi == len(visibleWPs)-1 + wpBranch := "├── " + wpPrefix := "│ " + if lastWP { + wpBranch = "└── " + wpPrefix = " " + } + fmt.Fprintf(&sb, "\n%s%s", wpBranch, wp.title) + + for pi, p := range wp.pieces { + lastP := pi == len(wp.pieces)-1 + pBranch := "├── " + if lastP { + pBranch = "└── " + } + pTitle := translationText(p.Title) + if trackIdx, ok := pieceToTrack[p.ID]; ok { + fmt.Fprintf(&sb, "\n%s%s%s ⇒ Track %d: %q", wpPrefix, pBranch, pTitle, trackIdx, trackTitle[trackIdx]) + } else { + fmt.Fprintf(&sb, "\n%s%s⚠ %s (unmatched)", wpPrefix, pBranch, pTitle) + } + } + } + + return sb.String() +} + +func workToCandidate(w *models.WorkAdminWork) ai.WorkCandidate { + return ai.WorkCandidate{ + ID: w.ID, + Title: workTitle(w), + ComposerNames: w.ComposerNames, + OpusNumber: w.OpusNumber, + CreatedAt: w.CreatedAt, + } +} diff --git a/internal/temporal/activities/reconcile/findworkcandidates_test.go b/internal/temporal/activities/reconcile/findworkcandidates_test.go new file mode 100644 index 0000000..9b5af9e --- /dev/null +++ b/internal/temporal/activities/reconcile/findworkcandidates_test.go @@ -0,0 +1,359 @@ +package reconcile + +import ( + "context" + "errors" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.temporal.io/sdk/testsuite" + + "idagio/auto-ingester/internal/ai" + "idagio/auto-ingester/internal/temporal/activities" + "idagio/auto-ingester/upstream/alpaca" + "idagio/auto-ingester/upstream/alpaca/openapi/models" +) + +func symphonyWork() *alpaca.WorkFull { + return &alpaca.WorkFull{ + ID: "work-1", + Title: []alpaca.Translation{{Locale: "en-GB", Text: "Symphony No. 40"}}, + ComposerNames: []string{"Mozart"}, + Workparts: []alpaca.WorkpartFull{ + { + ID: "wp-1", + Position: 1, + Title: []alpaca.Translation{{Locale: "en-GB", Text: "I. Molto allegro"}}, + Pieces: []alpaca.PieceFull{ + {ID: "piece-1", PositionInWorkpart: 1, Title: []alpaca.Translation{{Locale: "en-GB", Text: "Molto allegro"}}}, + }, + }, + { + ID: "wp-2", + Position: 2, + Title: []alpaca.Translation{{Locale: "en-GB", Text: "II. Andante"}}, + Pieces: []alpaca.PieceFull{ + {ID: "piece-2", PositionInWorkpart: 1, Title: []alpaca.Translation{{Locale: "en-GB", Text: "Andante"}}}, + }, + }, + }, + } +} + +func successfulMatcher(t *testing.T) func(context.Context, ai.MatchTrackPiecesInput) (*ai.MatchTrackPiecesResult, error) { + return func(_ context.Context, input ai.MatchTrackPiecesInput) (*ai.MatchTrackPiecesResult, error) { + mappings := make([]ai.TrackPieceMapping, len(input.Tracks)) + for i, track := range input.Tracks { + if i < len(input.Pieces) { + mappings[i] = ai.TrackPieceMapping{TrackIndex: track.Index, PieceID: input.Pieces[i].PieceID} + } + } + return &ai.MatchTrackPiecesResult{ + Mappings: mappings, + Confidence: 0.95, + Reasoning: "Clear movement match", + }, nil + } +} + +func TestReconcileWork_ExactMatch(t *testing.T) { + testSuite := &testsuite.WorkflowTestSuite{} + env := testSuite.NewTestActivityEnvironment() + + mock := &mockAPI{ + works: []*models.WorkAdminWork{ + { + ID: "work-1", + Title: []*models.ModelTranslation{{Locale: "en-GB", Text: "Symphony No. 40"}}, + ComposerNames: []string{"Mozart"}, + OpusNumber: "K. 550", + }, + }, + workFull: symphonyWork(), + } + act := &ReconcileRecordingConfig{ + AlpacaAPI: mock, + matchTrackPieces: successfulMatcher(t), + } + env.RegisterActivity(act.ReconcileWork) + + val, err := env.ExecuteActivity(act.ReconcileWork, ReconcileWorkInput{ + ComposerName: "Mozart", + WorkTitle: "Symphony No. 40", + TrackIndices: []int{0, 1}, + Tracks: []TrackForPiece{ + {Index: 0, Title: "Molto allegro"}, + {Index: 1, Title: "Andante"}, + }, + }) + require.NoError(t, err) + + var result activities.WithLog[ResolvedRecording] + require.NoError(t, val.Get(&result)) + assert.Equal(t, "work-1", result.Result.WorkID) + assert.Equal(t, "Symphony No. 40", result.Result.WorkTitle) + assert.Equal(t, "Mozart", result.Result.ComposerName) + assert.Equal(t, []int{0, 1}, result.Result.TrackIndices) + require.Len(t, result.Result.Mappings, 2) + assert.Equal(t, "piece-1", result.Result.Mappings[0].PieceID) + assert.Equal(t, "piece-2", result.Result.Mappings[1].PieceID) +} + +func TestReconcileWork_FuzzyMatch(t *testing.T) { + testSuite := &testsuite.WorkflowTestSuite{} + env := testSuite.NewTestActivityEnvironment() + + moonlightWork := &alpaca.WorkFull{ + ID: "work-moon", + Title: []alpaca.Translation{{Locale: "en-GB", Text: "Piano Sonata No. 14"}}, + Workparts: []alpaca.WorkpartFull{ + {Pieces: []alpaca.PieceFull{ + {ID: "p-1", PositionInWorkpart: 1, Title: []alpaca.Translation{{Locale: "en-GB", Text: "Adagio sostenuto"}}}, + }}, + }, + } + + mock := &mockAPI{ + works: []*models.WorkAdminWork{}, + workFull: moonlightWork, + } + act := &ReconcileRecordingConfig{ + AlpacaAPI: mock, + brainstormWorkQueries: func(_ context.Context, _ ai.BrainstormWorkInput) ([]string, error) { + mock.works = []*models.WorkAdminWork{ + { + ID: "work-moon", + Title: []*models.ModelTranslation{{Locale: "en-GB", Text: "Piano Sonata No. 14"}}, + ComposerNames: []string{"Beethoven"}, + OpusNumber: "Op. 27 No. 2", + }, + } + return []string{"Piano Sonata No. 14", "Moonlight"}, nil + }, + pickBestWork: func(_ context.Context, input ai.PickWorkInput) (*ai.PickWorkResult, error) { + assert.Equal(t, "Mondscheinsonate", input.OriginalTitle) + assert.Equal(t, []string{"Adagio sostenuto"}, input.TrackTitles) + return &ai.PickWorkResult{ + WorkID: "work-moon", + WorkTitle: "Piano Sonata No. 14", + Confidence: 0.95, + Reasoning: "Mondscheinsonate is German for Moonlight Sonata", + }, nil + }, + matchTrackPieces: successfulMatcher(t), + } + env.RegisterActivity(act.ReconcileWork) + + val, err := env.ExecuteActivity(act.ReconcileWork, ReconcileWorkInput{ + ComposerName: "Beethoven", + WorkTitle: "Mondscheinsonate", + TrackIndices: []int{0}, + Tracks: []TrackForPiece{{Index: 0, Title: "Adagio sostenuto"}}, + }) + require.NoError(t, err) + + var result activities.WithLog[ResolvedRecording] + require.NoError(t, val.Get(&result)) + assert.Equal(t, "work-moon", result.Result.WorkID) + assert.Equal(t, "Piano Sonata No. 14", result.Result.WorkTitle) +} + +func TestReconcileWork_PiecesLowConfidence_TriesNextCandidate(t *testing.T) { + testSuite := &testsuite.WorkflowTestSuite{} + env := testSuite.NewTestActivityEnvironment() + + wrongWork := &alpaca.WorkFull{ + ID: "work-wrong", + Title: []alpaca.Translation{{Locale: "en-GB", Text: "Wrong Work"}}, + Workparts: []alpaca.WorkpartFull{ + {Pieces: []alpaca.PieceFull{ + {ID: "p-wrong", PositionInWorkpart: 1, Title: []alpaca.Translation{{Locale: "en-GB", Text: "Something"}}}, + }}, + }, + } + rightWork := &alpaca.WorkFull{ + ID: "work-right", + Title: []alpaca.Translation{{Locale: "en-GB", Text: "Right Work"}}, + Workparts: []alpaca.WorkpartFull{ + {Pieces: []alpaca.PieceFull{ + {ID: "p-right", PositionInWorkpart: 1, Title: []alpaca.Translation{{Locale: "en-GB", Text: "Allegro"}}}, + }}, + }, + } + + callCount := 0 + mock := &mockAPI{ + works: []*models.WorkAdminWork{ + {ID: "work-wrong", Title: []*models.ModelTranslation{{Locale: "en-GB", Text: "Wrong Work"}}, ComposerNames: []string{"Mozart"}}, + {ID: "work-right", Title: []*models.ModelTranslation{{Locale: "en-GB", Text: "Right Work"}}, ComposerNames: []string{"Mozart"}}, + }, + } + act := &ReconcileRecordingConfig{ + AlpacaAPI: mock, + brainstormWorkQueries: func(_ context.Context, _ ai.BrainstormWorkInput) ([]string, error) { + return []string{"Alt1"}, nil + }, + pickBestWork: func(_ context.Context, _ ai.PickWorkInput) (*ai.PickWorkResult, error) { + return &ai.PickWorkResult{WorkID: "work-wrong", WorkTitle: "Wrong Work", Confidence: 0.8, Reasoning: "best guess"}, nil + }, + matchTrackPieces: func(_ context.Context, input ai.MatchTrackPiecesInput) (*ai.MatchTrackPiecesResult, error) { + callCount++ + if input.WorkTitle == "Wrong Work" { + mock.workFull = rightWork + return &ai.MatchTrackPiecesResult{Confidence: 0.3, Reasoning: "no match"}, nil + } + return &ai.MatchTrackPiecesResult{ + Mappings: []ai.TrackPieceMapping{{TrackIndex: 0, PieceID: "p-right"}}, + Confidence: 0.9, + Reasoning: "good match", + }, nil + }, + } + mock.workFull = wrongWork + env.RegisterActivity(act.ReconcileWork) + + val, err := env.ExecuteActivity(act.ReconcileWork, ReconcileWorkInput{ + ComposerName: "Mozart", + WorkTitle: "Some Work", + TrackIndices: []int{0}, + Tracks: []TrackForPiece{{Index: 0, Title: "Allegro"}}, + }) + require.NoError(t, err) + + var result activities.WithLog[ResolvedRecording] + require.NoError(t, val.Get(&result)) + assert.Equal(t, "work-right", result.Result.WorkID) + assert.Equal(t, 2, callCount) +} + +func TestReconcileWork_NoCandidates(t *testing.T) { + testSuite := &testsuite.WorkflowTestSuite{} + env := testSuite.NewTestActivityEnvironment() + + mock := &mockAPI{works: []*models.WorkAdminWork{}} + act := &ReconcileRecordingConfig{ + AlpacaAPI: mock, + brainstormWorkQueries: func(_ context.Context, _ ai.BrainstormWorkInput) ([]string, error) { + return []string{"Alt1"}, nil + }, + } + env.RegisterActivity(act.ReconcileWork) + + _, err := env.ExecuteActivity(act.ReconcileWork, ReconcileWorkInput{ + ComposerName: "Mozart", + WorkTitle: "Unknown Work", + TrackIndices: []int{0}, + Tracks: []TrackForPiece{{Index: 0, Title: "Allegro"}}, + }) + assert.Error(t, err) + assert.Contains(t, err.Error(), "no work candidates found") +} + +func TestReconcileWork_LLMDeclinesAll(t *testing.T) { + testSuite := &testsuite.WorkflowTestSuite{} + env := testSuite.NewTestActivityEnvironment() + + mock := &mockAPI{ + works: []*models.WorkAdminWork{ + {ID: "wrong-work", Title: []*models.ModelTranslation{{Locale: "en-GB", Text: "Something Else"}}, ComposerNames: []string{"Mozart"}}, + }, + } + act := &ReconcileRecordingConfig{ + AlpacaAPI: mock, + brainstormWorkQueries: func(_ context.Context, _ ai.BrainstormWorkInput) ([]string, error) { + return []string{"Alt1"}, nil + }, + pickBestWork: func(_ context.Context, _ ai.PickWorkInput) (*ai.PickWorkResult, error) { + return nil, nil + }, + } + env.RegisterActivity(act.ReconcileWork) + + _, err := env.ExecuteActivity(act.ReconcileWork, ReconcileWorkInput{ + ComposerName: "Mozart", + WorkTitle: "Unknown Work", + TrackIndices: []int{0}, + Tracks: []TrackForPiece{{Index: 0, Title: "Allegro"}}, + }) + assert.Error(t, err) + assert.Contains(t, err.Error(), "could not match") +} + +func TestReconcileWork_NoPieces_Skipped(t *testing.T) { + testSuite := &testsuite.WorkflowTestSuite{} + env := testSuite.NewTestActivityEnvironment() + + mock := &mockAPI{ + works: []*models.WorkAdminWork{ + {ID: "work-empty", Title: []*models.ModelTranslation{{Locale: "en-GB", Text: "Empty Work"}}, ComposerNames: []string{"Mozart"}}, + }, + workFull: &alpaca.WorkFull{ + ID: "work-empty", + Title: []alpaca.Translation{{Locale: "en-GB", Text: "Empty Work"}}, + Workparts: []alpaca.WorkpartFull{}, + }, + } + act := &ReconcileRecordingConfig{AlpacaAPI: mock} + env.RegisterActivity(act.ReconcileWork) + + _, err := env.ExecuteActivity(act.ReconcileWork, ReconcileWorkInput{ + ComposerName: "Mozart", + WorkTitle: "Empty Work", + TrackIndices: []int{0}, + Tracks: []TrackForPiece{{Index: 0, Title: "Track 1"}}, + }) + assert.Error(t, err) + assert.Contains(t, err.Error(), "no work candidate matched pieces") +} + +func TestReconcileWork_GetWorkError_Propagates(t *testing.T) { + testSuite := &testsuite.WorkflowTestSuite{} + env := testSuite.NewTestActivityEnvironment() + + mock := &mockAPI{ + works: []*models.WorkAdminWork{ + {ID: "work-1", Title: []*models.ModelTranslation{{Locale: "en-GB", Text: "Symphony No. 40"}}, ComposerNames: []string{"Mozart"}}, + }, + workFullErr: errors.New("connection refused"), + } + act := &ReconcileRecordingConfig{AlpacaAPI: mock} + env.RegisterActivity(act.ReconcileWork) + + _, err := env.ExecuteActivity(act.ReconcileWork, ReconcileWorkInput{ + ComposerName: "Mozart", + WorkTitle: "Symphony No. 40", + TrackIndices: []int{0}, + Tracks: []TrackForPiece{{Index: 0, Title: "Molto allegro"}}, + }) + assert.Error(t, err) + assert.Contains(t, err.Error(), "connection refused") +} + +func TestReconcileWork_MatchLLMError_Propagates(t *testing.T) { + testSuite := &testsuite.WorkflowTestSuite{} + env := testSuite.NewTestActivityEnvironment() + + mock := &mockAPI{ + works: []*models.WorkAdminWork{ + {ID: "work-1", Title: []*models.ModelTranslation{{Locale: "en-GB", Text: "Symphony No. 40"}}, ComposerNames: []string{"Mozart"}}, + }, + workFull: symphonyWork(), + } + act := &ReconcileRecordingConfig{ + AlpacaAPI: mock, + matchTrackPieces: func(_ context.Context, _ ai.MatchTrackPiecesInput) (*ai.MatchTrackPiecesResult, error) { + return nil, errors.New("LLM rate limited") + }, + } + env.RegisterActivity(act.ReconcileWork) + + _, err := env.ExecuteActivity(act.ReconcileWork, ReconcileWorkInput{ + ComposerName: "Mozart", + WorkTitle: "Symphony No. 40", + TrackIndices: []int{0}, + Tracks: []TrackForPiece{{Index: 0, Title: "Molto allegro"}}, + }) + assert.Error(t, err) + assert.Contains(t, err.Error(), "LLM rate limited") +} diff --git a/internal/temporal/activities/reconcile/grouptracksbywork.go b/internal/temporal/activities/reconcile/grouptracksbywork.go new file mode 100644 index 0000000..92fef4b --- /dev/null +++ b/internal/temporal/activities/reconcile/grouptracksbywork.go @@ -0,0 +1,69 @@ +package reconcile + +import ( + "context" + "fmt" + "strings" + + "go.temporal.io/sdk/activity" + + "idagio/auto-ingester/internal/ai" + "idagio/auto-ingester/internal/temporal/activities" + "idagio/auto-ingester/internal/temporal/memo" + "idagio/auto-ingester/upstream/lms" +) + +type GroupTracksByWorkResult struct { + Works []ai.IdentifiedWork `json:"works"` +} + +func (c *ReconcileRecordingConfig) GroupTracksByWork( + ctx context.Context, + submission *lms.SubmissionMetadata, +) (*activities.WithLog[GroupTracksByWorkResult], error) { + al := memo.NewActivityLogger(ctx) + logger := activity.GetLogger(ctx) + + if len(submission.Tracks) == 0 { + return activities.Done(al, GroupTracksByWorkResult{}), nil + } + + tracks := buildTrackSummaries(submission.Tracks) + + analyzed, err := c.groupTracksByWork(ctx, ai.AnalyzeTracksInput{Tracks: tracks}) + if err != nil { + return nil, fmt.Errorf("analyze track works: %w", err) + } + + var lines []string + for _, w := range analyzed.Works { + lines = append(lines, fmt.Sprintf(" %q by %s (tracks %v)", w.WorkTitle, w.ComposerName, w.TrackIndices)) + } + al.Append("tracks_analyzed", fmt.Sprintf( + "LLM grouped %d tracks into %d presumptive works:\n%s", + len(tracks), len(analyzed.Works), strings.Join(lines, "\n"))) + + logger.Info("Grouped tracks by work", "works", len(analyzed.Works), "tracks", len(tracks)) + return activities.Done(al, GroupTracksByWorkResult{Works: analyzed.Works}), nil +} + +func buildTrackSummaries(tracks []lms.Track) []ai.TrackSummary { + summaries := make([]ai.TrackSummary, len(tracks)) + for i, t := range tracks { + composers := []string{} + for _, c := range t.Contributors { + for _, role := range c.Roles { + if strings.EqualFold(role, "Composer") && c.FullName != nil { + composers = append(composers, *c.FullName) + break + } + } + } + summaries[i] = ai.TrackSummary{ + Index: i, + Title: t.Title, + ComposerNames: composers, + } + } + return summaries +} diff --git a/internal/temporal/activities/reconcile/grouptracksbywork_test.go b/internal/temporal/activities/reconcile/grouptracksbywork_test.go new file mode 100644 index 0000000..77da6f5 --- /dev/null +++ b/internal/temporal/activities/reconcile/grouptracksbywork_test.go @@ -0,0 +1,99 @@ +package reconcile + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.temporal.io/sdk/testsuite" + + "idagio/auto-ingester/internal/ai" + "idagio/auto-ingester/internal/temporal/activities" + "idagio/auto-ingester/upstream/lms" +) + +func ptr(s string) *string { return &s } + +func TestGroupTracksByWork_HappyPath(t *testing.T) { + testSuite := &testsuite.WorkflowTestSuite{} + env := testSuite.NewTestActivityEnvironment() + + act := &ReconcileRecordingConfig{ + groupTracksByWork: func(_ context.Context, input ai.AnalyzeTracksInput) (*ai.AnalyzeTracksResult, error) { + assert.Len(t, input.Tracks, 2) + assert.Equal(t, "Symphony No. 40 / Molto allegro", input.Tracks[0].Title) + return &ai.AnalyzeTracksResult{ + Works: []ai.IdentifiedWork{ + { + WorkTitle: "Symphony No. 40", + ComposerName: "Mozart", + TrackIndices: []int{0, 1}, + }, + }, + }, nil + }, + } + env.RegisterActivity(act.GroupTracksByWork) + + submission := &lms.SubmissionMetadata{ + Tracks: []lms.Track{ + {Title: "Symphony No. 40 / Molto allegro", Contributors: []lms.Contributor{{FullName: ptr("Mozart"), Roles: []string{"Composer"}}}}, + {Title: "Symphony No. 40 / Andante", Contributors: []lms.Contributor{{FullName: ptr("Mozart"), Roles: []string{"Composer"}}}}, + }, + } + + val, err := env.ExecuteActivity(act.GroupTracksByWork, submission) + require.NoError(t, err) + + var result activities.WithLog[GroupTracksByWorkResult] + require.NoError(t, val.Get(&result)) + require.Len(t, result.Result.Works, 1) + assert.Equal(t, "Symphony No. 40", result.Result.Works[0].WorkTitle) + assert.Equal(t, "Mozart", result.Result.Works[0].ComposerName) + assert.Equal(t, []int{0, 1}, result.Result.Works[0].TrackIndices) +} + +func TestGroupTracksByWork_NoTracks(t *testing.T) { + testSuite := &testsuite.WorkflowTestSuite{} + env := testSuite.NewTestActivityEnvironment() + + act := &ReconcileRecordingConfig{} + env.RegisterActivity(act.GroupTracksByWork) + + submission := &lms.SubmissionMetadata{} + + val, err := env.ExecuteActivity(act.GroupTracksByWork, submission) + require.NoError(t, err) + + var result activities.WithLog[GroupTracksByWorkResult] + require.NoError(t, val.Get(&result)) + assert.Empty(t, result.Result.Works) +} + +func TestBuildTrackSummaries(t *testing.T) { + tracks := []lms.Track{ + { + Title: "Molto allegro", + Contributors: []lms.Contributor{ + {FullName: ptr("Mozart"), Roles: []string{"Composer"}}, + {FullName: ptr("Harnoncourt"), Roles: []string{"Conductor"}}, + }, + }, + { + Title: "Andante", + Contributors: []lms.Contributor{ + {FullName: ptr("Mozart"), Roles: []string{"Composer"}}, + }, + }, + } + + summaries := buildTrackSummaries(tracks) + + require.Len(t, summaries, 2) + assert.Equal(t, 0, summaries[0].Index) + assert.Equal(t, "Molto allegro", summaries[0].Title) + assert.Equal(t, []string{"Mozart"}, summaries[0].ComposerNames) + assert.Equal(t, 1, summaries[1].Index) + assert.Equal(t, "Andante", summaries[1].Title) +} diff --git a/internal/temporal/activities/reconcile/reconcilerecording.go b/internal/temporal/activities/reconcile/reconcilerecording.go new file mode 100644 index 0000000..742553d --- /dev/null +++ b/internal/temporal/activities/reconcile/reconcilerecording.go @@ -0,0 +1,36 @@ +package reconcile + +import ( + "context" + + "github.com/firebase/genkit/go/genkit" + + "idagio/auto-ingester/internal/ai" + "idagio/auto-ingester/upstream/alpaca" +) + +type ReconcileRecordingConfig struct { //nolint:revive // intentional: imported as reconcileactivities + AlpacaAPI alpaca.API + groupTracksByWork func(ctx context.Context, input ai.AnalyzeTracksInput) (*ai.AnalyzeTracksResult, error) + brainstormWorkQueries func(ctx context.Context, input ai.BrainstormWorkInput) ([]string, error) + pickBestWork func(ctx context.Context, input ai.PickWorkInput) (*ai.PickWorkResult, error) + matchTrackPieces func(ctx context.Context, input ai.MatchTrackPiecesInput) (*ai.MatchTrackPiecesResult, error) +} + +func NewReconcileRecordingConfig(alpacaAPI alpaca.API, g *genkit.Genkit) *ReconcileRecordingConfig { + return &ReconcileRecordingConfig{ + AlpacaAPI: alpacaAPI, + groupTracksByWork: ai.GroupTracksByWorkFlow(g).Run, + brainstormWorkQueries: ai.BrainstormWorkQueriesFlow(g).Run, + pickBestWork: ai.PickBestWorkFlow(g).Run, + matchTrackPieces: ai.MatchTrackPiecesFlow(g).Run, + } +} + +type ResolvedRecording struct { + WorkID string `json:"work_id"` + WorkTitle string `json:"work_title"` + ComposerName string `json:"composer_name"` + TrackIndices []int `json:"track_indices"` + Mappings []ai.TrackPieceMapping `json:"mappings"` +} diff --git a/internal/temporal/memo/activitylog.go b/internal/temporal/memo/activitylog.go index af540eb..382f94d 100644 --- a/internal/temporal/memo/activitylog.go +++ b/internal/temporal/memo/activitylog.go @@ -30,15 +30,28 @@ type ActivityLogEvent struct { } type ActivityLogger struct { + ctx context.Context activity string events []ActivityLogEvent } func NewActivityLogger(ctx context.Context) *ActivityLogger { info := activity.GetInfo(ctx) - return &ActivityLogger{ + al := &ActivityLogger{ + ctx: ctx, activity: info.ActivityType.Name, } + + // Recover events from a previous attempt's heartbeat, so the log + // is cumulative across retries rather than starting from scratch. + var prev []ActivityLogEvent + if activity.HasHeartbeatDetails(ctx) { + if err := activity.GetHeartbeatDetails(ctx, &prev); err == nil { + al.events = prev + } + } + + return al } func (l *ActivityLogger) Append(event, detail string) { @@ -48,6 +61,7 @@ func (l *ActivityLogger) Append(event, detail string) { Event: event, Detail: detail, }) + activity.RecordHeartbeat(l.ctx, l.events) } func (l *ActivityLogger) Events() []ActivityLogEvent { diff --git a/internal/temporal/workflows/helpers.go b/internal/temporal/workflows/helpers.go new file mode 100644 index 0000000..8c8bd5f --- /dev/null +++ b/internal/temporal/workflows/helpers.go @@ -0,0 +1,45 @@ +package workflows + +import ( + "go.temporal.io/sdk/workflow" + + "idagio/auto-ingester/internal/temporal/activities" + "idagio/auto-ingester/internal/temporal/memo" +) + +type activityRunner struct { + ctx workflow.Context + actCtx workflow.Context + activityLog *[]memo.ActivityLogEvent +} + +type activityFuture[T any] struct { + future workflow.Future + r *activityRunner +} + +func startActivity[T any](r *activityRunner, activity any, args ...any) activityFuture[T] { + return activityFuture[T]{ + future: workflow.ExecuteActivity(r.actCtx, activity, args...), + r: r, + } +} + +func (f activityFuture[T]) get() (T, error) { + var result activities.WithLog[T] + if err := f.future.Get(f.r.ctx, &result); err != nil { + var zero T + return zero, err + } + updateMemo(f.r.ctx, f.r.activityLog, result.Log) + return result.Result, nil +} + +func execActivity[T any](r *activityRunner, activity any, args ...any) (T, error) { + return startActivity[T](r, activity, args...).get() +} + +func updateMemo(ctx workflow.Context, activityLog *[]memo.ActivityLogEvent, events []memo.ActivityLogEvent) { + *activityLog = append(*activityLog, events...) + _ = workflow.UpsertMemo(ctx, map[string]interface{}{"activity_log": *activityLog}) +} diff --git a/internal/temporal/workflows/ingestalbum.go b/internal/temporal/workflows/ingestalbum.go index 2ae21af..ba794c5 100644 --- a/internal/temporal/workflows/ingestalbum.go +++ b/internal/temporal/workflows/ingestalbum.go @@ -1,12 +1,10 @@ package workflows import ( - "fmt" "time" "go.temporal.io/sdk/workflow" - "idagio/auto-ingester/internal/temporal/activities" alpacaactivities "idagio/auto-ingester/internal/temporal/activities/alpaca" lmsactivities "idagio/auto-ingester/internal/temporal/activities/lms" reconcileactivities "idagio/auto-ingester/internal/temporal/activities/reconcile" @@ -14,46 +12,96 @@ import ( "idagio/auto-ingester/upstream/lms" ) -func updateMemo(ctx workflow.Context, activityLog *[]memo.ActivityLogEvent, events []memo.ActivityLogEvent) { - *activityLog = append(*activityLog, events...) - _ = workflow.UpsertMemo(ctx, map[string]interface{}{"activity_log": *activityLog}) +type IngestAlbumResult struct { + AlbumMetadata reconcileactivities.AlbumMetadataResult `json:"album_metadata"` + Recordings []reconcileactivities.ResolvedRecording `json:"recordings"` } -func IngestAlbumWorkflow(ctx workflow.Context, submissionID string) (*reconcileactivities.AlbumMetadataResult, error) { +func IngestAlbumWorkflow(ctx workflow.Context, submissionID string) (*IngestAlbumResult, error) { var ( lmsAct *lmsactivities.Config alpacaAct *alpacaactivities.Config reconcileAct *reconcileactivities.Config + recRecAct *reconcileactivities.ReconcileRecordingConfig ) - activityOptions := workflow.ActivityOptions{ + actCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ StartToCloseTimeout: time.Second * 30, ScheduleToCloseTimeout: time.Second * 30, - } - actCtx := workflow.WithActivityOptions(ctx, activityOptions) + }) var activityLog []memo.ActivityLogEvent + r := &activityRunner{ctx: ctx, actCtx: actCtx, activityLog: &activityLog} - var fetchResult activities.WithLog[*lms.SubmissionMetadata] - err := workflow.ExecuteActivity(actCtx, lmsAct.FetchLMSSubmission, submissionID).Get(ctx, &fetchResult) + submission, err := execActivity[*lms.SubmissionMetadata](r, lmsAct.FetchLMSSubmission, submissionID) if err != nil { return nil, err } - updateMemo(ctx, &activityLog, fetchResult.Log) - var confirmResult activities.WithLog[struct{}] - err = workflow.ExecuteActivity(actCtx, alpacaAct.ConfirmAlbumNotIngested, fetchResult.Result.OriginalID).Get(ctx, &confirmResult) + _, err = execActivity[struct{}](r, alpacaAct.ConfirmAlbumNotIngested, submission.OriginalID) if err != nil { return nil, err } - updateMemo(ctx, &activityLog, confirmResult.Log) - var reconcileResult activities.WithLog[reconcileactivities.AlbumMetadataResult] - err = workflow.ExecuteActivity(actCtx, reconcileAct.ReconcileAlbumMetadata, fetchResult.Result).Get(ctx, &reconcileResult) + albumMetadataFuture := startActivity[reconcileactivities.AlbumMetadataResult](r, reconcileAct.ReconcileAlbumMetadata, submission) + trackGroupingFuture := startActivity[reconcileactivities.GroupTracksByWorkResult](r, recRecAct.GroupTracksByWork, submission) + + trackGroupings, err := trackGroupingFuture.get() if err != nil { - return nil, fmt.Errorf("%w: %s", err, "failed to reconcile album metadata") + return nil, err } - updateMemo(ctx, &activityLog, reconcileResult.Log) - return &reconcileResult.Result, nil + recordings := make([]reconcileactivities.ResolvedRecording, len(trackGroupings.Works)) + trackGroupErrs := make([]error, len(trackGroupings.Works)) + doneCh := workflow.NewChannel(ctx) + + for i, work := range trackGroupings.Works { + i, work := i, work + workflow.Go(ctx, func(gCtx workflow.Context) { + gr := &activityRunner{ctx: gCtx, actCtx: actCtx, activityLog: &activityLog} + result, err := execActivity[reconcileactivities.ResolvedRecording](gr, recRecAct.ReconcileWork, reconcileactivities.ReconcileWorkInput{ + ComposerName: work.ComposerName, + WorkTitle: work.WorkTitle, + TrackIndices: work.TrackIndices, + Tracks: buildTracksForGroup(submission, work.TrackIndices), + }) + if err != nil { + trackGroupErrs[i] = err + } else { + recordings[i] = result + } + doneCh.Send(gCtx, true) + }) + } + + for range trackGroupings.Works { + doneCh.Receive(ctx, nil) + } + + for _, err := range trackGroupErrs { + if err != nil { + return nil, err + } + } + + albumMetadata, err := albumMetadataFuture.get() + if err != nil { + return nil, err + } + + return &IngestAlbumResult{ + AlbumMetadata: albumMetadata, + Recordings: recordings, + }, nil +} + +func buildTracksForGroup(submission *lms.SubmissionMetadata, trackIndices []int) []reconcileactivities.TrackForPiece { + tracks := make([]reconcileactivities.TrackForPiece, len(trackIndices)) + for i, idx := range trackIndices { + tracks[i] = reconcileactivities.TrackForPiece{ + Index: idx, + Title: submission.Tracks[idx].Title, + } + } + return tracks } diff --git a/internal/temporal/workflows/ingestalbum_test.go b/internal/temporal/workflows/ingestalbum_test.go index bfc349f..389ef46 100644 --- a/internal/temporal/workflows/ingestalbum_test.go +++ b/internal/temporal/workflows/ingestalbum_test.go @@ -10,6 +10,7 @@ import ( "go.temporal.io/sdk/temporal" "go.temporal.io/sdk/testsuite" + "idagio/auto-ingester/internal/ai" "idagio/auto-ingester/internal/temporal/activities" alpacaactivities "idagio/auto-ingester/internal/temporal/activities/alpaca" lmsactivities "idagio/auto-ingester/internal/temporal/activities/lms" @@ -21,59 +22,192 @@ import ( func lmsResult(metadata *lms.SubmissionMetadata) *activities.WithLog[*lms.SubmissionMetadata] { return &activities.WithLog[*lms.SubmissionMetadata]{ Result: metadata, - Log: []memo.ActivityLogEvent{ - {Activity: "FetchLMSSubmission", Event: "fetchSubmission"}, - }, + Log: []memo.ActivityLogEvent{{Activity: "FetchLMSSubmission", Event: "fetchSubmission"}}, } } func confirmResult() *activities.WithLog[struct{}] { return &activities.WithLog[struct{}]{ - Log: []memo.ActivityLogEvent{ - {Activity: "ConfirmAlbumNotIngested", Event: "confirmedNotIngested"}, - }, + Log: []memo.ActivityLogEvent{{Activity: "ConfirmAlbumNotIngested", Event: "confirmedNotIngested"}}, } } func reconcileResult(result reconcileactivities.AlbumMetadataResult) *activities.WithLog[reconcileactivities.AlbumMetadataResult] { return &activities.WithLog[reconcileactivities.AlbumMetadataResult]{ Result: result, - Log: []memo.ActivityLogEvent{ - {Activity: "ReconcileAlbumMetadata", Event: "labelMatched"}, + Log: []memo.ActivityLogEvent{{Activity: "ReconcileAlbumMetadata", Event: "labelMatched"}}, + } +} + +func groupResult(result reconcileactivities.GroupTracksByWorkResult) *activities.WithLog[reconcileactivities.GroupTracksByWorkResult] { + return &activities.WithLog[reconcileactivities.GroupTracksByWorkResult]{ + Result: result, + Log: []memo.ActivityLogEvent{{Activity: "GroupTracksByWork", Event: "tracks_analyzed"}}, + } +} + +func reconcileWorkResult(result reconcileactivities.ResolvedRecording) *activities.WithLog[reconcileactivities.ResolvedRecording] { + return &activities.WithLog[reconcileactivities.ResolvedRecording]{ + Result: result, + Log: []memo.ActivityLogEvent{{Activity: "ReconcileWork", Event: "work_matched"}}, + } +} + +func submissionWithTracks() *lms.SubmissionMetadata { + return &lms.SubmissionMetadata{ + AlbumTitle: "Test Album", + OriginalID: "1234567890123", + Tracks: []lms.Track{ + {Title: "Molto allegro"}, + {Title: "Andante"}, }, } } -func TestIngestAlbumWorkflow_AlbumNotInAlpaca_Proceeds(t *testing.T) { +func setupBaseActivities(env *testsuite.TestWorkflowEnvironment, metadata *lms.SubmissionMetadata) { + var lmsAct *lmsactivities.Config + var alpacaAct *alpacaactivities.Config + var reconcileAct *reconcileactivities.Config + var recRecAct *reconcileactivities.ReconcileRecordingConfig + + env.OnActivity(lmsAct.FetchLMSSubmission, mock.Anything, "submission-123"). + Return(lmsResult(metadata), nil) + + env.OnActivity(alpacaAct.ConfirmAlbumNotIngested, mock.Anything, metadata.OriginalID). + Return(confirmResult(), nil) + + env.OnActivity(reconcileAct.ReconcileAlbumMetadata, mock.Anything, metadata). + Return(reconcileResult(reconcileactivities.AlbumMetadataResult{Title: metadata.AlbumTitle}), nil) + + env.OnActivity(recRecAct.GroupTracksByWork, mock.Anything, metadata). + Return(groupResult(reconcileactivities.GroupTracksByWorkResult{ + Works: []ai.IdentifiedWork{ + { + WorkTitle: "Symphony No. 40", + ComposerName: "Mozart", + TrackIndices: []int{0, 1}, + }, + }, + }), nil) +} + +func TestIngestAlbumWorkflow_HappyPath(t *testing.T) { testSuite := &testsuite.WorkflowTestSuite{} env := testSuite.NewTestWorkflowEnvironment() - lmsMetadata := &lms.SubmissionMetadata{ + metadata := submissionWithTracks() + setupBaseActivities(env, metadata) + + var recRecAct *reconcileactivities.ReconcileRecordingConfig + + env.OnActivity(recRecAct.ReconcileWork, mock.Anything, reconcileactivities.ReconcileWorkInput{ + ComposerName: "Mozart", + WorkTitle: "Symphony No. 40", + TrackIndices: []int{0, 1}, + Tracks: []reconcileactivities.TrackForPiece{ + {Index: 0, Title: "Molto allegro"}, + {Index: 1, Title: "Andante"}, + }, + }).Return(reconcileWorkResult(reconcileactivities.ResolvedRecording{ + WorkID: "work-1", + WorkTitle: "Symphony No. 40", + ComposerName: "Mozart", + TrackIndices: []int{0, 1}, + Mappings: []ai.TrackPieceMapping{ + {TrackIndex: 0, PieceID: "piece-1"}, + {TrackIndex: 1, PieceID: "piece-2"}, + }, + }), nil) + + env.ExecuteWorkflow(IngestAlbumWorkflow, "submission-123") + + require.True(t, env.IsWorkflowCompleted()) + require.NoError(t, env.GetWorkflowError()) + + var result IngestAlbumResult + require.NoError(t, env.GetWorkflowResult(&result)) + assert.Equal(t, "Test Album", result.AlbumMetadata.Title) + require.Len(t, result.Recordings, 1) + assert.Equal(t, "work-1", result.Recordings[0].WorkID) + assert.Equal(t, "Mozart", result.Recordings[0].ComposerName) + assert.Len(t, result.Recordings[0].Mappings, 2) +} + +func TestIngestAlbumWorkflow_ReconcileWorkFails(t *testing.T) { + testSuite := &testsuite.WorkflowTestSuite{} + env := testSuite.NewTestWorkflowEnvironment() + + metadata := submissionWithTracks() + setupBaseActivities(env, metadata) + + var recRecAct *reconcileactivities.ReconcileRecordingConfig + + env.OnActivity(recRecAct.ReconcileWork, mock.Anything, mock.Anything). + Return(nil, temporal.NewNonRetryableApplicationError("no work candidate matched pieces", "", nil)) + + env.ExecuteWorkflow(IngestAlbumWorkflow, "submission-123") + + require.True(t, env.IsWorkflowCompleted()) + err := env.GetWorkflowError() + require.Error(t, err) + assert.Contains(t, err.Error(), "no work candidate matched pieces") +} + +func TestIngestAlbumWorkflow_ReconcileWorkError_Propagates(t *testing.T) { + testSuite := &testsuite.WorkflowTestSuite{} + env := testSuite.NewTestWorkflowEnvironment() + + metadata := submissionWithTracks() + setupBaseActivities(env, metadata) + + var recRecAct *reconcileactivities.ReconcileRecordingConfig + + env.OnActivity(recRecAct.ReconcileWork, mock.Anything, mock.Anything). + Return(nil, errors.New("connection refused")) + + env.ExecuteWorkflow(IngestAlbumWorkflow, "submission-123") + + require.True(t, env.IsWorkflowCompleted()) + err := env.GetWorkflowError() + require.Error(t, err) + assert.Contains(t, err.Error(), "connection refused") +} + +func TestIngestAlbumWorkflow_NoWorkGroups_Succeeds(t *testing.T) { + testSuite := &testsuite.WorkflowTestSuite{} + env := testSuite.NewTestWorkflowEnvironment() + + metadata := &lms.SubmissionMetadata{ AlbumTitle: "Test Album", OriginalID: "1234567890123", } var lmsAct *lmsactivities.Config var alpacaAct *alpacaactivities.Config + var reconcileAct *reconcileactivities.Config + var recRecAct *reconcileactivities.ReconcileRecordingConfig env.OnActivity(lmsAct.FetchLMSSubmission, mock.Anything, "submission-123"). - Return(lmsResult(lmsMetadata), nil) + Return(lmsResult(metadata), nil) env.OnActivity(alpacaAct.ConfirmAlbumNotIngested, mock.Anything, "1234567890123"). Return(confirmResult(), nil) - var reconcileAct *reconcileactivities.Config - env.OnActivity(reconcileAct.ReconcileAlbumMetadata, mock.Anything, lmsMetadata). + env.OnActivity(reconcileAct.ReconcileAlbumMetadata, mock.Anything, metadata). Return(reconcileResult(reconcileactivities.AlbumMetadataResult{Title: "Test Album"}), nil) + env.OnActivity(recRecAct.GroupTracksByWork, mock.Anything, metadata). + Return(groupResult(reconcileactivities.GroupTracksByWorkResult{}), nil) + env.ExecuteWorkflow(IngestAlbumWorkflow, "submission-123") require.True(t, env.IsWorkflowCompleted()) require.NoError(t, env.GetWorkflowError()) - var result reconcileactivities.AlbumMetadataResult + var result IngestAlbumResult require.NoError(t, env.GetWorkflowResult(&result)) - assert.Equal(t, "Test Album", result.Title) + assert.Equal(t, "Test Album", result.AlbumMetadata.Title) + assert.Empty(t, result.Recordings) } func TestIngestAlbumWorkflow_AlbumAlreadyExists_Aborts(t *testing.T) { diff --git a/upstream/alpaca/client.go b/upstream/alpaca/client.go index 2728065..9e9fa3a 100644 --- a/upstream/alpaca/client.go +++ b/upstream/alpaca/client.go @@ -16,12 +16,15 @@ import ( alpacaclient "idagio/auto-ingester/upstream/alpaca/openapi/client" "idagio/auto-ingester/upstream/alpaca/openapi/client/albums_admin" "idagio/auto-ingester/upstream/alpaca/openapi/client/labels_admin" + "idagio/auto-ingester/upstream/alpaca/openapi/client/works_admin" "idagio/auto-ingester/upstream/alpaca/openapi/models" ) type Client struct { - alpaca *alpacaclient.Alpaca - auth runtime.ClientAuthInfoWriter + alpaca *alpacaclient.Alpaca + auth runtime.ClientAuthInfoWriter + baseURL string + authHeader string } func NewClient(baseURL, authHeader string) *Client { @@ -34,8 +37,10 @@ func NewClient(baseURL, authHeader string) *Client { transport.Transport = &envelopeUnwrapper{inner: http.DefaultTransport} return &Client{ - alpaca: alpacaclient.New(transport, strfmt.Default), - auth: httptransport.APIKeyAuth("Authorization", "header", authHeader), + alpaca: alpacaclient.New(transport, strfmt.Default), + auth: httptransport.APIKeyAuth("Authorization", "header", authHeader), + baseURL: baseURL, + authHeader: authHeader, } } @@ -64,9 +69,26 @@ func (c *Client) GetLabels(ctx context.Context, query string) ([]*models.LabelAd return resp.Payload, nil } +func (c *Client) GetWorks(ctx context.Context, query string, composerName string) ([]*models.WorkAdminWork, error) { + params := works_admin.NewGetWorksParamsWithContext(ctx) + if query != "" { + params = params.WithQuery(&query) + } + if composerName != "" { + params = params.WithComposerName(&composerName) + } + resp, err := c.alpaca.WorksAdmin.GetWorks(params, c.auth) + if err != nil { + return nil, err + } + return resp.Payload, nil +} + type API interface { GetAlbumByID(ctx context.Context, idOrUPC string) (*models.AlbumWithTrackIDs, error) GetLabels(ctx context.Context, query string) ([]*models.LabelAdminLabel, error) + GetWorks(ctx context.Context, query string, composerName string) ([]*models.WorkAdminWork, error) + GetWorkByIDFull(ctx context.Context, workID string) (*WorkFull, error) } // envelopeUnwrapper strips the {"result": ...} envelope that the Alpaca API diff --git a/upstream/alpaca/workfull.go b/upstream/alpaca/workfull.go new file mode 100644 index 0000000..71f84bc --- /dev/null +++ b/upstream/alpaca/workfull.go @@ -0,0 +1,44 @@ +package alpaca + +import ( + "context" + "encoding/json" + "fmt" + "io" + "net/http" +) + +func (c *Client) GetWorkByIDFull(ctx context.Context, workID string) (*WorkFull, error) { + url := c.baseURL + "/admin/works.v1/" + workID + + req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil) + if err != nil { + return nil, fmt.Errorf("create request: %w", err) + } + req.Header.Set("Authorization", c.authHeader) + + resp, err := http.DefaultClient.Do(req) + if err != nil { + return nil, fmt.Errorf("get work %s: %w", workID, err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + body, _ := io.ReadAll(resp.Body) + return nil, fmt.Errorf("get work %s: unexpected status %d: %s", workID, resp.StatusCode, string(body)) + } + + body, err := io.ReadAll(resp.Body) + if err != nil { + return nil, fmt.Errorf("read response body: %w", err) + } + + var envelope struct { + Result WorkFull `json:"result"` + } + if err := json.Unmarshal(body, &envelope); err != nil { + return nil, fmt.Errorf("unmarshal work %s: %w", workID, err) + } + + return &envelope.Result, nil +} diff --git a/upstream/alpaca/worktypes.go b/upstream/alpaca/worktypes.go new file mode 100644 index 0000000..7ccad7e --- /dev/null +++ b/upstream/alpaca/worktypes.go @@ -0,0 +1,34 @@ +package alpaca + +// Custom types for the full work response from GET /admin/works.v1/{id}. +// The swagger snapshot doesn't capture workparts/pieces in the GetWorkByID response, +// so these are hand-written. + +type WorkFull struct { + ID string `json:"id"` + Title []Translation `json:"title"` + ComposerNames []string `json:"composerNames"` + OpusNumber string `json:"opusNumber,omitempty"` + Workparts []WorkpartFull `json:"workparts"` +} + +type Translation struct { + Locale string `json:"locale"` + Text string `json:"text"` +} + +type WorkpartFull struct { + ID string `json:"id"` + Position int `json:"position"` + Title []Translation `json:"title"` + IsOverture bool `json:"isOverture"` + RecordingID *string `json:"recordingId"` + Pieces []PieceFull `json:"pieces"` +} + +type PieceFull struct { + ID string `json:"id"` + PositionInWorkpart int `json:"positionInWorkpart"` + Title []Translation `json:"title"` + TonalityID *string `json:"tonalityId"` +} From 38da89249d4918a7ce7893c8853fa81d568879b5 Mon Sep 17 00:00:00 2001 From: Myles Date: Thu, 19 Feb 2026 14:48:16 +0100 Subject: [PATCH 02/10] Fix log formatting --- .../activities/reconcile/findworkcandidates.go | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/internal/temporal/activities/reconcile/findworkcandidates.go b/internal/temporal/activities/reconcile/findworkcandidates.go index b2a9d75..4a927b0 100644 --- a/internal/temporal/activities/reconcile/findworkcandidates.go +++ b/internal/temporal/activities/reconcile/findworkcandidates.go @@ -87,7 +87,7 @@ func (c *ReconcileRecordingConfig) findWorkCandidates( logger.Warn("initial work search failed", "query", workTitle, "error", err) } else { if wID, title, ok := findExactWork(works, workTitle); ok { - al.Append("work_matched", fmt.Sprintf("exact match %q → work ID %s", title, wID)) + al.Append("work_matched", fmt.Sprintf("exact match %s → work ID %s", title, wID)) return []workCandidate{{workID: wID, workTitle: title, confidence: 1.0, exactMatch: true}}, nil } for _, w := range works { @@ -119,7 +119,7 @@ func (c *ReconcileRecordingConfig) findWorkCandidates( continue } if wID, title, ok := findExactWork(works, workTitle); ok { - al.Append("work_matched", fmt.Sprintf("exact match %q → work ID %s (via brainstorm query %q)", title, wID, alt)) + al.Append("work_matched", fmt.Sprintf("exact match %s → work ID %s (via brainstorm query: %s)", title, wID, alt)) return []workCandidate{{workID: wID, workTitle: title, confidence: 1.0, exactMatch: true}}, nil } for _, w := range works { @@ -168,7 +168,7 @@ func (c *ReconcileRecordingConfig) findWorkCandidates( } } detail := fmt.Sprintf( - "fuzzy-matched %q by %s → %q (%s)\n confidence: %.2f\n reasoning: %s", + "fuzzy-matched %s by %s → %s (%s)\n confidence: %.2f\n reasoning: %s", workTitle, composerName, picked.WorkTitle, picked.WorkID, picked.Confidence, picked.Reasoning) if len(otherLines) > 0 { @@ -195,7 +195,7 @@ func (c *ReconcileRecordingConfig) tryMatchPieces( pieces := flattenPieces(work) if len(pieces) == 0 { - al.Append("no_pieces", fmt.Sprintf("work %s (%q) has no pieces", cand.workID, cand.workTitle)) + al.Append("no_pieces", fmt.Sprintf("work %s (%s) has no pieces", cand.workID, cand.workTitle)) return nil, false, nil } @@ -318,7 +318,7 @@ func formatPiecesMatched( } var sb strings.Builder - fmt.Fprintf(&sb, "matched %d tracks to pieces in %q (%s, confidence: %.2f)\n", + fmt.Fprintf(&sb, "matched %d tracks to pieces in %s (%s, confidence: %.2f)\n", len(result.Mappings), cand.workTitle, cand.workID, result.Confidence) fmt.Fprintf(&sb, "reasoning: %s\n", result.Reasoning) fmt.Fprintf(&sb, "%s", cand.workTitle) @@ -357,9 +357,9 @@ func formatPiecesMatched( } pTitle := translationText(p.Title) if trackIdx, ok := pieceToTrack[p.ID]; ok { - fmt.Fprintf(&sb, "\n%s%s%s ⇒ Track %d: %q", wpPrefix, pBranch, pTitle, trackIdx, trackTitle[trackIdx]) + fmt.Fprintf(&sb, "\n%s%s%s ⇒ Track %d: %s", wpPrefix, pBranch, pTitle, trackIdx, trackTitle[trackIdx]) } else { - fmt.Fprintf(&sb, "\n%s%s⚠ %s (unmatched)", wpPrefix, pBranch, pTitle) + fmt.Fprintf(&sb, "\n%s%s⚠️ %s (unmatched)", wpPrefix, pBranch, pTitle) } } } From fe341b8f1427716c4b1cd45901f0efc3fbe506a3 Mon Sep 17 00:00:00 2001 From: Myles Date: Thu, 19 Feb 2026 15:09:47 +0100 Subject: [PATCH 03/10] Build Alpaca ingestion payload activity --- .../activities/reconcile/buildpayload.go | 96 +++++++++++++++++++ .../reconcile/findworkcandidates.go | 13 +++ .../reconcile/reconcilerecording.go | 6 ++ internal/temporal/workflows/ingestalbum.go | 18 ++-- .../temporal/workflows/ingestalbum_test.go | 44 +++++++-- 5 files changed, 162 insertions(+), 15 deletions(-) create mode 100644 internal/temporal/activities/reconcile/buildpayload.go diff --git a/internal/temporal/activities/reconcile/buildpayload.go b/internal/temporal/activities/reconcile/buildpayload.go new file mode 100644 index 0000000..8f3c68e --- /dev/null +++ b/internal/temporal/activities/reconcile/buildpayload.go @@ -0,0 +1,96 @@ +package reconcile + +import ( + "context" + "fmt" + + "github.com/google/uuid" + + "idagio/auto-ingester/internal/temporal/activities" + "idagio/auto-ingester/internal/temporal/memo" + "idagio/auto-ingester/upstream/alpaca/openapi/models" + "idagio/auto-ingester/upstream/lms" +) + +type BuildPayloadInput struct { + AlbumMetadata AlbumMetadataResult `json:"album_metadata"` + Recordings []ResolvedRecording `json:"recordings"` + Submission *lms.SubmissionMetadata `json:"submission"` +} + +func (c *Config) BuildIngestionPayload( + ctx context.Context, + input BuildPayloadInput, +) (*activities.WithLog[*models.IngestionPayload], error) { + al := memo.NewActivityLogger(ctx) + + trackRefs := make([]string, len(input.Submission.Tracks)) + for i := range trackRefs { + trackRefs[i] = uuid.New().String() + } + + orderedTrackList := make([]string, len(trackRefs)) + copy(orderedTrackList, trackRefs) + + album := &models.IngestionAlbumInput{ + Title: input.AlbumMetadata.Title, + ReleaseDate: input.AlbumMetadata.ReleaseDate, + PLine: input.AlbumMetadata.PLine, + PlineYear: input.AlbumMetadata.PLineYear, + LabelID: input.AlbumMetadata.LabelID, + Upc: input.Submission.OriginalID, + OrderedTrackList: orderedTrackList, + ExistingTracks: []string{}, + CreateAlbum: true, + } + + recordings := make([]*models.IngestionRecordingInput, len(input.Recordings)) + for i, rec := range input.Recordings { + pieceRefByID := make(map[string]string) + var workParts []*models.IngestionWorkPart + for _, wp := range rec.WorkParts { + pieces := make([]*models.IngestionPiece, len(wp.PieceIDs)) + for j, pieceID := range wp.PieceIDs { + ref := uuid.New().String() + pieceRefByID[pieceID] = ref + pieces[j] = &models.IngestionPiece{ID: pieceID, Ref: ref} + } + workParts = append(workParts, &models.IngestionWorkPart{ + ID: wp.ID, + Pieces: pieces, + }) + } + + tracks := make([]*models.IngestionTrack, len(rec.Mappings)) + for j, m := range rec.Mappings { + t := input.Submission.Tracks[m.TrackIndex] + dur := t.DurationComponents + tracks[j] = &models.IngestionTrack{ + Ref: trackRefs[m.TrackIndex], + Isrc: t.ISRC, + Filename: t.Filename, + DurationInSeconds: int64(dur.Hours*3600 + dur.Minutes*60 + dur.Seconds), + PieceRef: pieceRefByID[m.PieceID], + } + } + + recordings[i] = &models.IngestionRecordingInput{ + Tracks: tracks, + Work: &models.IngestionWork{ + ID: rec.WorkID, + WorkParts: workParts, + }, + } + } + + payload := &models.IngestionPayload{ + Album: album, + Recordings: recordings, + } + + al.Append("payload_built", fmt.Sprintf( + "built ingestion payload: %d recordings, %d tracks", + len(recordings), len(input.Submission.Tracks))) + + return activities.Done(al, payload), nil +} diff --git a/internal/temporal/activities/reconcile/findworkcandidates.go b/internal/temporal/activities/reconcile/findworkcandidates.go index 4a927b0..70e034a 100644 --- a/internal/temporal/activities/reconcile/findworkcandidates.go +++ b/internal/temporal/activities/reconcile/findworkcandidates.go @@ -220,12 +220,25 @@ func (c *ReconcileRecordingConfig) tryMatchPieces( if matchResult.Confidence >= 0.7 { al.Append("pieces_matched", formatPiecesMatched(work, matchResult, input.Tracks, cand)) + var workParts []ResolvedWorkPart + for _, wp := range work.Workparts { + if wp.RecordingID != nil { + continue + } + pieceIDs := make([]string, len(wp.Pieces)) + for j, p := range wp.Pieces { + pieceIDs[j] = p.ID + } + workParts = append(workParts, ResolvedWorkPart{ID: wp.ID, PieceIDs: pieceIDs}) + } + return &ResolvedRecording{ WorkID: cand.workID, WorkTitle: cand.workTitle, ComposerName: input.ComposerName, TrackIndices: input.TrackIndices, Mappings: matchResult.Mappings, + WorkParts: workParts, }, true, nil } diff --git a/internal/temporal/activities/reconcile/reconcilerecording.go b/internal/temporal/activities/reconcile/reconcilerecording.go index 742553d..3f340b6 100644 --- a/internal/temporal/activities/reconcile/reconcilerecording.go +++ b/internal/temporal/activities/reconcile/reconcilerecording.go @@ -33,4 +33,10 @@ type ResolvedRecording struct { ComposerName string `json:"composer_name"` TrackIndices []int `json:"track_indices"` Mappings []ai.TrackPieceMapping `json:"mappings"` + WorkParts []ResolvedWorkPart `json:"work_parts"` +} + +type ResolvedWorkPart struct { + ID string `json:"id"` + PieceIDs []string `json:"piece_ids"` } diff --git a/internal/temporal/workflows/ingestalbum.go b/internal/temporal/workflows/ingestalbum.go index ba794c5..3d97e37 100644 --- a/internal/temporal/workflows/ingestalbum.go +++ b/internal/temporal/workflows/ingestalbum.go @@ -9,15 +9,11 @@ import ( lmsactivities "idagio/auto-ingester/internal/temporal/activities/lms" reconcileactivities "idagio/auto-ingester/internal/temporal/activities/reconcile" "idagio/auto-ingester/internal/temporal/memo" + "idagio/auto-ingester/upstream/alpaca/openapi/models" "idagio/auto-ingester/upstream/lms" ) -type IngestAlbumResult struct { - AlbumMetadata reconcileactivities.AlbumMetadataResult `json:"album_metadata"` - Recordings []reconcileactivities.ResolvedRecording `json:"recordings"` -} - -func IngestAlbumWorkflow(ctx workflow.Context, submissionID string) (*IngestAlbumResult, error) { +func IngestAlbumWorkflow(ctx workflow.Context, submissionID string) (*models.IngestionPayload, error) { var ( lmsAct *lmsactivities.Config alpacaAct *alpacaactivities.Config @@ -89,10 +85,16 @@ func IngestAlbumWorkflow(ctx workflow.Context, submissionID string) (*IngestAlbu return nil, err } - return &IngestAlbumResult{ + payload, err := execActivity[*models.IngestionPayload](r, reconcileAct.BuildIngestionPayload, reconcileactivities.BuildPayloadInput{ AlbumMetadata: albumMetadata, Recordings: recordings, - }, nil + Submission: submission, + }) + if err != nil { + return nil, err + } + + return payload, nil } func buildTracksForGroup(submission *lms.SubmissionMetadata, trackIndices []int) []reconcileactivities.TrackForPiece { diff --git a/internal/temporal/workflows/ingestalbum_test.go b/internal/temporal/workflows/ingestalbum_test.go index 389ef46..d43ee1b 100644 --- a/internal/temporal/workflows/ingestalbum_test.go +++ b/internal/temporal/workflows/ingestalbum_test.go @@ -16,6 +16,7 @@ import ( lmsactivities "idagio/auto-ingester/internal/temporal/activities/lms" reconcileactivities "idagio/auto-ingester/internal/temporal/activities/reconcile" "idagio/auto-ingester/internal/temporal/memo" + "idagio/auto-ingester/upstream/alpaca/openapi/models" "idagio/auto-ingester/upstream/lms" ) @@ -53,6 +54,13 @@ func reconcileWorkResult(result reconcileactivities.ResolvedRecording) *activiti } } +func buildPayloadResult(payload *models.IngestionPayload) *activities.WithLog[*models.IngestionPayload] { + return &activities.WithLog[*models.IngestionPayload]{ + Result: payload, + Log: []memo.ActivityLogEvent{{Activity: "BuildIngestionPayload", Event: "payload_built"}}, + } +} + func submissionWithTracks() *lms.SubmissionMetadata { return &lms.SubmissionMetadata{ AlbumTitle: "Test Album", @@ -99,6 +107,7 @@ func TestIngestAlbumWorkflow_HappyPath(t *testing.T) { setupBaseActivities(env, metadata) var recRecAct *reconcileactivities.ReconcileRecordingConfig + var reconcileAct *reconcileactivities.Config env.OnActivity(recRecAct.ReconcileWork, mock.Anything, reconcileactivities.ReconcileWorkInput{ ComposerName: "Mozart", @@ -117,20 +126,34 @@ func TestIngestAlbumWorkflow_HappyPath(t *testing.T) { {TrackIndex: 0, PieceID: "piece-1"}, {TrackIndex: 1, PieceID: "piece-2"}, }, + WorkParts: []reconcileactivities.ResolvedWorkPart{ + {ID: "wp-1", PieceIDs: []string{"piece-1", "piece-2"}}, + }, }), nil) + expectedPayload := &models.IngestionPayload{ + Album: &models.IngestionAlbumInput{ + Title: "Test Album", + CreateAlbum: true, + }, + Recordings: []*models.IngestionRecordingInput{ + {Work: &models.IngestionWork{ID: "work-1"}}, + }, + } + env.OnActivity(reconcileAct.BuildIngestionPayload, mock.Anything, mock.Anything). + Return(buildPayloadResult(expectedPayload), nil) + env.ExecuteWorkflow(IngestAlbumWorkflow, "submission-123") require.True(t, env.IsWorkflowCompleted()) require.NoError(t, env.GetWorkflowError()) - var result IngestAlbumResult + var result models.IngestionPayload require.NoError(t, env.GetWorkflowResult(&result)) - assert.Equal(t, "Test Album", result.AlbumMetadata.Title) + require.NotNil(t, result.Album) + assert.Equal(t, "Test Album", result.Album.Title) require.Len(t, result.Recordings, 1) - assert.Equal(t, "work-1", result.Recordings[0].WorkID) - assert.Equal(t, "Mozart", result.Recordings[0].ComposerName) - assert.Len(t, result.Recordings[0].Mappings, 2) + assert.Equal(t, "work-1", result.Recordings[0].Work.ID) } func TestIngestAlbumWorkflow_ReconcileWorkFails(t *testing.T) { @@ -199,14 +222,21 @@ func TestIngestAlbumWorkflow_NoWorkGroups_Succeeds(t *testing.T) { env.OnActivity(recRecAct.GroupTracksByWork, mock.Anything, metadata). Return(groupResult(reconcileactivities.GroupTracksByWorkResult{}), nil) + expectedPayload := &models.IngestionPayload{ + Album: &models.IngestionAlbumInput{Title: "Test Album", CreateAlbum: true}, + } + env.OnActivity(reconcileAct.BuildIngestionPayload, mock.Anything, mock.Anything). + Return(buildPayloadResult(expectedPayload), nil) + env.ExecuteWorkflow(IngestAlbumWorkflow, "submission-123") require.True(t, env.IsWorkflowCompleted()) require.NoError(t, env.GetWorkflowError()) - var result IngestAlbumResult + var result models.IngestionPayload require.NoError(t, env.GetWorkflowResult(&result)) - assert.Equal(t, "Test Album", result.AlbumMetadata.Title) + require.NotNil(t, result.Album) + assert.Equal(t, "Test Album", result.Album.Title) assert.Empty(t, result.Recordings) } From ed7f51390313172f375d1e1653d29a0f4507279a Mon Sep 17 00:00:00 2001 From: Myles Date: Thu, 19 Feb 2026 15:14:53 +0100 Subject: [PATCH 04/10] Fix bad formatting --- internal/temporal/activities/reconcile/albummetadata.go | 6 +++--- .../temporal/activities/reconcile/findworkcandidates.go | 4 ++-- internal/temporal/activities/reconcile/grouptracksbywork.go | 2 +- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/internal/temporal/activities/reconcile/albummetadata.go b/internal/temporal/activities/reconcile/albummetadata.go index 94bb41f..1d791fe 100644 --- a/internal/temporal/activities/reconcile/albummetadata.go +++ b/internal/temporal/activities/reconcile/albummetadata.go @@ -111,7 +111,7 @@ func (c *Config) reconcileLabel( } if labelID, ok := findExactLabel(labels, labelName); ok { - al.Append("label_matched", fmt.Sprintf("exact match %q → label ID %s", labelName, labelID)) + al.Append("label_matched", fmt.Sprintf("exact match %s → label ID %s", labelName, labelID)) return labelID, nil } @@ -119,7 +119,7 @@ func (c *Config) reconcileLabel( if err != nil { return "", fmt.Errorf("brainstorm label alternatives for %q: %w", labelName, err) } - al.Append("label_brainstorm", fmt.Sprintf("no exact match for %q, brainstormed alternatives: %s", labelName, strings.Join(alternatives, ", "))) + al.Append("label_brainstorm", fmt.Sprintf("no exact match for %s, brainstormed alternatives: %s", labelName, strings.Join(alternatives, ", "))) seen := make(map[string]bool) var candidates []ai.LabelCandidate @@ -165,7 +165,7 @@ func (c *Config) reconcileLabel( } } al.Append("label_matched", fmt.Sprintf( - "picked %q (%s) for submission label %q (confidence: %.2f, reasoning: %s); other candidates: %s", + "picked %s (%s) for submission label %s (confidence: %.2f, reasoning: %s); other candidates: %s", picked.LabelName, picked.LabelID, labelName, picked.Confidence, picked.Reasoning, strings.Join(others, ", "))) return picked.LabelID, nil } diff --git a/internal/temporal/activities/reconcile/findworkcandidates.go b/internal/temporal/activities/reconcile/findworkcandidates.go index 70e034a..8eaad56 100644 --- a/internal/temporal/activities/reconcile/findworkcandidates.go +++ b/internal/temporal/activities/reconcile/findworkcandidates.go @@ -110,7 +110,7 @@ func (c *ReconcileRecordingConfig) findWorkCandidates( altLines = append(altLines, fmt.Sprintf(" - %s", alt)) } al.Append("work_brainstorm", fmt.Sprintf( - "no exact match for %q by %s, brainstormed %d alternative queries:\n%s", + "no exact match for %s by %s, brainstormed %d alternative queries:\n%s", workTitle, composerName, len(alternatives), strings.Join(altLines, "\n"))) for _, alt := range alternatives { @@ -243,7 +243,7 @@ func (c *ReconcileRecordingConfig) tryMatchPieces( } al.Append("pieces_low_confidence", fmt.Sprintf( - "low confidence %.2f matching tracks to %q — likely wrong work (reasoning: %s)", + "low confidence %.2f matching tracks to %s — likely wrong work (reasoning: %s)", matchResult.Confidence, cand.workTitle, matchResult.Reasoning)) return nil, false, nil diff --git a/internal/temporal/activities/reconcile/grouptracksbywork.go b/internal/temporal/activities/reconcile/grouptracksbywork.go index 92fef4b..8bcf454 100644 --- a/internal/temporal/activities/reconcile/grouptracksbywork.go +++ b/internal/temporal/activities/reconcile/grouptracksbywork.go @@ -37,7 +37,7 @@ func (c *ReconcileRecordingConfig) GroupTracksByWork( var lines []string for _, w := range analyzed.Works { - lines = append(lines, fmt.Sprintf(" %q by %s (tracks %v)", w.WorkTitle, w.ComposerName, w.TrackIndices)) + lines = append(lines, fmt.Sprintf(" %s by %s (tracks %v)", w.WorkTitle, w.ComposerName, w.TrackIndices)) } al.Append("tracks_analyzed", fmt.Sprintf( "LLM grouped %d tracks into %d presumptive works:\n%s", From 2e71724b55662e0c2f2e8be5a0d062d3aa090c9c Mon Sep 17 00:00:00 2001 From: Myles Date: Thu, 19 Feb 2026 15:15:09 +0100 Subject: [PATCH 05/10] Build payload is a Alpaca specific activity --- .../activities/{reconcile => alpaca}/buildpayload.go | 11 ++++++----- internal/temporal/workflows/ingestalbum.go | 2 +- internal/temporal/workflows/ingestalbum_test.go | 7 ++++--- 3 files changed, 11 insertions(+), 9 deletions(-) rename internal/temporal/activities/{reconcile => alpaca}/buildpayload.go (87%) diff --git a/internal/temporal/activities/reconcile/buildpayload.go b/internal/temporal/activities/alpaca/buildpayload.go similarity index 87% rename from internal/temporal/activities/reconcile/buildpayload.go rename to internal/temporal/activities/alpaca/buildpayload.go index 8f3c68e..af9a5a2 100644 --- a/internal/temporal/activities/reconcile/buildpayload.go +++ b/internal/temporal/activities/alpaca/buildpayload.go @@ -1,4 +1,4 @@ -package reconcile +package alpaca import ( "context" @@ -7,18 +7,19 @@ import ( "github.com/google/uuid" "idagio/auto-ingester/internal/temporal/activities" + reconcileactivities "idagio/auto-ingester/internal/temporal/activities/reconcile" "idagio/auto-ingester/internal/temporal/memo" "idagio/auto-ingester/upstream/alpaca/openapi/models" "idagio/auto-ingester/upstream/lms" ) type BuildPayloadInput struct { - AlbumMetadata AlbumMetadataResult `json:"album_metadata"` - Recordings []ResolvedRecording `json:"recordings"` - Submission *lms.SubmissionMetadata `json:"submission"` + AlbumMetadata reconcileactivities.AlbumMetadataResult `json:"album_metadata"` + Recordings []reconcileactivities.ResolvedRecording `json:"recordings"` + Submission *lms.SubmissionMetadata `json:"submission"` } -func (c *Config) BuildIngestionPayload( +func (a *Config) BuildIngestionPayload( ctx context.Context, input BuildPayloadInput, ) (*activities.WithLog[*models.IngestionPayload], error) { diff --git a/internal/temporal/workflows/ingestalbum.go b/internal/temporal/workflows/ingestalbum.go index 3d97e37..3c45d11 100644 --- a/internal/temporal/workflows/ingestalbum.go +++ b/internal/temporal/workflows/ingestalbum.go @@ -85,7 +85,7 @@ func IngestAlbumWorkflow(ctx workflow.Context, submissionID string) (*models.Ing return nil, err } - payload, err := execActivity[*models.IngestionPayload](r, reconcileAct.BuildIngestionPayload, reconcileactivities.BuildPayloadInput{ + payload, err := execActivity[*models.IngestionPayload](r, alpacaAct.BuildIngestionPayload, alpacaactivities.BuildPayloadInput{ AlbumMetadata: albumMetadata, Recordings: recordings, Submission: submission, diff --git a/internal/temporal/workflows/ingestalbum_test.go b/internal/temporal/workflows/ingestalbum_test.go index d43ee1b..569ea80 100644 --- a/internal/temporal/workflows/ingestalbum_test.go +++ b/internal/temporal/workflows/ingestalbum_test.go @@ -107,7 +107,6 @@ func TestIngestAlbumWorkflow_HappyPath(t *testing.T) { setupBaseActivities(env, metadata) var recRecAct *reconcileactivities.ReconcileRecordingConfig - var reconcileAct *reconcileactivities.Config env.OnActivity(recRecAct.ReconcileWork, mock.Anything, reconcileactivities.ReconcileWorkInput{ ComposerName: "Mozart", @@ -140,7 +139,8 @@ func TestIngestAlbumWorkflow_HappyPath(t *testing.T) { {Work: &models.IngestionWork{ID: "work-1"}}, }, } - env.OnActivity(reconcileAct.BuildIngestionPayload, mock.Anything, mock.Anything). + var alpacaAct *alpacaactivities.Config + env.OnActivity(alpacaAct.BuildIngestionPayload, mock.Anything, mock.Anything). Return(buildPayloadResult(expectedPayload), nil) env.ExecuteWorkflow(IngestAlbumWorkflow, "submission-123") @@ -225,7 +225,8 @@ func TestIngestAlbumWorkflow_NoWorkGroups_Succeeds(t *testing.T) { expectedPayload := &models.IngestionPayload{ Album: &models.IngestionAlbumInput{Title: "Test Album", CreateAlbum: true}, } - env.OnActivity(reconcileAct.BuildIngestionPayload, mock.Anything, mock.Anything). + var alpacaAct2 *alpacaactivities.Config + env.OnActivity(alpacaAct2.BuildIngestionPayload, mock.Anything, mock.Anything). Return(buildPayloadResult(expectedPayload), nil) env.ExecuteWorkflow(IngestAlbumWorkflow, "submission-123") From a36fef137199390bbd90102dfea2e881536be6db Mon Sep 17 00:00:00 2001 From: Myles Date: Thu, 19 Feb 2026 15:40:10 +0100 Subject: [PATCH 06/10] Loosen LMS input validation --- upstream/lms/types.go | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/upstream/lms/types.go b/upstream/lms/types.go index d34cb72..732fc4f 100644 --- a/upstream/lms/types.go +++ b/upstream/lms/types.go @@ -1,5 +1,7 @@ package lms +import "encoding/json" + type Contributor struct { FullName *string `json:"FullName"` Roles []string `json:"Roles"` @@ -19,9 +21,9 @@ type HashSum struct { } type FileLocation struct { - Location string `json:"location"` - Kind string `json:"kind"` - HashSum HashSum `json:"hash_sum"` + Location json.RawMessage `json:"location"` + Kind string `json:"kind"` + HashSum HashSum `json:"hash_sum"` } type ValidityPeriod struct { @@ -30,7 +32,7 @@ type ValidityPeriod struct { } type Availability struct { - TerritoryCodes string `json:"territory_codes"` + TerritoryCodes []string `json:"territory_codes"` ValidityPeriod ValidityPeriod `json:"validity_period"` IsTakedown bool `json:"is_takedown"` } From 1fa24cff7e740c759df5deace67f035a018f6498 Mon Sep 17 00:00:00 2001 From: Myles Date: Thu, 19 Feb 2026 15:40:27 +0100 Subject: [PATCH 07/10] Allow longer activity time for times that Alpaca becomes a bottleneck --- internal/temporal/workflows/ingestalbum.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/internal/temporal/workflows/ingestalbum.go b/internal/temporal/workflows/ingestalbum.go index 3c45d11..0fe0028 100644 --- a/internal/temporal/workflows/ingestalbum.go +++ b/internal/temporal/workflows/ingestalbum.go @@ -22,8 +22,8 @@ func IngestAlbumWorkflow(ctx workflow.Context, submissionID string) (*models.Ing ) actCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ - StartToCloseTimeout: time.Second * 30, - ScheduleToCloseTimeout: time.Second * 30, + StartToCloseTimeout: time.Minute * 2, + ScheduleToCloseTimeout: time.Minute * 2, }) var activityLog []memo.ActivityLogEvent From 0c940fcc19dc3277d1bb084a9d3070f10a9fef5d Mon Sep 17 00:00:00 2001 From: Myles Date: Thu, 19 Feb 2026 16:11:25 +0100 Subject: [PATCH 08/10] Adds skip duplicate check flag --- Makefile | 4 ++++ README.md | 11 +++++++++- cmd/exec-workflow/main.go | 22 ++++++++++++------- .../lms/testdata/submission_metadata.json | 12 +++++----- internal/temporal/workflows/ingestalbum.go | 17 +++++++++----- .../temporal/workflows/ingestalbum_test.go | 14 ++++++------ 6 files changed, 53 insertions(+), 27 deletions(-) diff --git a/Makefile b/Makefile index 70c6764..81e1f74 100644 --- a/Makefile +++ b/Makefile @@ -46,6 +46,10 @@ lint-fix: $(GOBIN)/golangci-lint test: go test $(if $(path),./$(path)/...,./...) +.PHONY: ingest +ingest: + go run ./cmd/exec-workflow ingest-album $(if $(skip-dup),--skip-duplicate-check) $(id) + .PHONY: format format: gofmt -s -w . diff --git a/README.md b/README.md index 84266ac..ebb04aa 100644 --- a/README.md +++ b/README.md @@ -25,8 +25,10 @@ graph TD one concurrent goroutine per track group; finds work candidates, matches tracks to pieces"] - C --> G["return IngestAlbumResult"] + C --> G["**BuildIngestionPayload** + assemble Alpaca ingestion payload"] R --> G + G --> H["return IngestionPayload"] ``` @@ -91,6 +93,13 @@ make test path=internal/temporal/workflows # Run tests for specific package make worker # Start Temporal worker ``` +### How to run a workflow? + +``` +make ingest id= # Ingest an album +make ingest id= skip-dup=1 # Skip the duplicate-album check +``` + ## API TBD diff --git a/cmd/exec-workflow/main.go b/cmd/exec-workflow/main.go index 8a854e4..fdc2f61 100644 --- a/cmd/exec-workflow/main.go +++ b/cmd/exec-workflow/main.go @@ -1,9 +1,10 @@ package main import ( - "context" "bytes" + "context" "encoding/json" + "flag" "fmt" "log" "os" @@ -31,23 +32,28 @@ func main() { switch workflowName { case "ingest-album": - if len(os.Args) < 3 { - log.Fatalln("Usage: exec-workflow ingest-album ") + fs := flag.NewFlagSet("ingest-album", flag.ExitOnError) + skipDuplicateCheck := fs.Bool("skip-duplicate-check", false, "skip the check that prevents re-ingesting an existing album") + fs.Parse(os.Args[2:]) + if fs.NArg() < 1 { + log.Fatalln("Usage: exec-workflow ingest-album [--skip-duplicate-check] id=") } - submissionID := os.Args[2] - runIngestAlbum(c, submissionID) + runIngestAlbum(c, workflows.IngestAlbumInput{ + SubmissionID: fs.Arg(0), + SkipDuplicateCheck: *skipDuplicateCheck, + }) default: log.Fatalf("Unknown workflow: %s", workflowName) } } -func runIngestAlbum(c client.Client, submissionID string) { +func runIngestAlbum(c client.Client, input workflows.IngestAlbumInput) { options := client.StartWorkflowOptions{ - ID: fmt.Sprintf("ingest-album-%s", submissionID), + ID: fmt.Sprintf("ingest-album-%s", input.SubmissionID), TaskQueue: "auto-ingester-task-queue", } - we, err := c.ExecuteWorkflow(context.Background(), options, workflows.IngestAlbumWorkflow, submissionID) + we, err := c.ExecuteWorkflow(context.Background(), options, workflows.IngestAlbumWorkflow, input) if err != nil { log.Fatalln("Unable to execute workflow", err) } diff --git a/internal/temporal/activities/lms/testdata/submission_metadata.json b/internal/temporal/activities/lms/testdata/submission_metadata.json index e7a4f54..0f6a777 100644 --- a/internal/temporal/activities/lms/testdata/submission_metadata.json +++ b/internal/temporal/activities/lms/testdata/submission_metadata.json @@ -107,7 +107,7 @@ }, "availabilities": [ { - "territory_codes": "Worldwide", + "territory_codes": ["Worldwide"], "validity_period": { "start_date": "2020-04-01", "end_date": null @@ -168,7 +168,7 @@ }, "availabilities": [ { - "territory_codes": "Worldwide", + "territory_codes": ["Worldwide"], "validity_period": { "start_date": "2020-04-01", "end_date": null @@ -229,7 +229,7 @@ }, "availabilities": [ { - "territory_codes": "Worldwide", + "territory_codes": ["Worldwide"], "validity_period": { "start_date": "2020-04-01", "end_date": null @@ -290,7 +290,7 @@ }, "availabilities": [ { - "territory_codes": "Worldwide", + "territory_codes": ["Worldwide"], "validity_period": { "start_date": "2020-04-01", "end_date": null @@ -351,7 +351,7 @@ }, "availabilities": [ { - "territory_codes": "Worldwide", + "territory_codes": ["Worldwide"], "validity_period": { "start_date": "2020-04-01", "end_date": null @@ -412,7 +412,7 @@ }, "availabilities": [ { - "territory_codes": "Worldwide", + "territory_codes": ["Worldwide"], "validity_period": { "start_date": "2020-04-01", "end_date": null diff --git a/internal/temporal/workflows/ingestalbum.go b/internal/temporal/workflows/ingestalbum.go index 0fe0028..e6453a0 100644 --- a/internal/temporal/workflows/ingestalbum.go +++ b/internal/temporal/workflows/ingestalbum.go @@ -13,7 +13,12 @@ import ( "idagio/auto-ingester/upstream/lms" ) -func IngestAlbumWorkflow(ctx workflow.Context, submissionID string) (*models.IngestionPayload, error) { +type IngestAlbumInput struct { + SubmissionID string `json:"submission_id"` + SkipDuplicateCheck bool `json:"skip_duplicate_check,omitempty"` +} + +func IngestAlbumWorkflow(ctx workflow.Context, input IngestAlbumInput) (*models.IngestionPayload, error) { var ( lmsAct *lmsactivities.Config alpacaAct *alpacaactivities.Config @@ -29,14 +34,16 @@ func IngestAlbumWorkflow(ctx workflow.Context, submissionID string) (*models.Ing var activityLog []memo.ActivityLogEvent r := &activityRunner{ctx: ctx, actCtx: actCtx, activityLog: &activityLog} - submission, err := execActivity[*lms.SubmissionMetadata](r, lmsAct.FetchLMSSubmission, submissionID) + submission, err := execActivity[*lms.SubmissionMetadata](r, lmsAct.FetchLMSSubmission, input.SubmissionID) if err != nil { return nil, err } - _, err = execActivity[struct{}](r, alpacaAct.ConfirmAlbumNotIngested, submission.OriginalID) - if err != nil { - return nil, err + if !input.SkipDuplicateCheck { + _, err = execActivity[struct{}](r, alpacaAct.ConfirmAlbumNotIngested, submission.OriginalID) + if err != nil { + return nil, err + } } albumMetadataFuture := startActivity[reconcileactivities.AlbumMetadataResult](r, reconcileAct.ReconcileAlbumMetadata, submission) diff --git a/internal/temporal/workflows/ingestalbum_test.go b/internal/temporal/workflows/ingestalbum_test.go index 569ea80..abd2138 100644 --- a/internal/temporal/workflows/ingestalbum_test.go +++ b/internal/temporal/workflows/ingestalbum_test.go @@ -143,7 +143,7 @@ func TestIngestAlbumWorkflow_HappyPath(t *testing.T) { env.OnActivity(alpacaAct.BuildIngestionPayload, mock.Anything, mock.Anything). Return(buildPayloadResult(expectedPayload), nil) - env.ExecuteWorkflow(IngestAlbumWorkflow, "submission-123") + env.ExecuteWorkflow(IngestAlbumWorkflow, IngestAlbumInput{SubmissionID: "submission-123"}) require.True(t, env.IsWorkflowCompleted()) require.NoError(t, env.GetWorkflowError()) @@ -168,7 +168,7 @@ func TestIngestAlbumWorkflow_ReconcileWorkFails(t *testing.T) { env.OnActivity(recRecAct.ReconcileWork, mock.Anything, mock.Anything). Return(nil, temporal.NewNonRetryableApplicationError("no work candidate matched pieces", "", nil)) - env.ExecuteWorkflow(IngestAlbumWorkflow, "submission-123") + env.ExecuteWorkflow(IngestAlbumWorkflow, IngestAlbumInput{SubmissionID: "submission-123"}) require.True(t, env.IsWorkflowCompleted()) err := env.GetWorkflowError() @@ -188,7 +188,7 @@ func TestIngestAlbumWorkflow_ReconcileWorkError_Propagates(t *testing.T) { env.OnActivity(recRecAct.ReconcileWork, mock.Anything, mock.Anything). Return(nil, errors.New("connection refused")) - env.ExecuteWorkflow(IngestAlbumWorkflow, "submission-123") + env.ExecuteWorkflow(IngestAlbumWorkflow, IngestAlbumInput{SubmissionID: "submission-123"}) require.True(t, env.IsWorkflowCompleted()) err := env.GetWorkflowError() @@ -229,7 +229,7 @@ func TestIngestAlbumWorkflow_NoWorkGroups_Succeeds(t *testing.T) { env.OnActivity(alpacaAct2.BuildIngestionPayload, mock.Anything, mock.Anything). Return(buildPayloadResult(expectedPayload), nil) - env.ExecuteWorkflow(IngestAlbumWorkflow, "submission-123") + env.ExecuteWorkflow(IngestAlbumWorkflow, IngestAlbumInput{SubmissionID: "submission-123"}) require.True(t, env.IsWorkflowCompleted()) require.NoError(t, env.GetWorkflowError()) @@ -259,7 +259,7 @@ func TestIngestAlbumWorkflow_AlbumAlreadyExists_Aborts(t *testing.T) { env.OnActivity(alpacaAct.ConfirmAlbumNotIngested, mock.Anything, "1234567890123"). Return(nil, temporal.NewNonRetryableApplicationError("album already exists in Alpaca with ID existing-album-id", "AlreadyExists", nil)) - env.ExecuteWorkflow(IngestAlbumWorkflow, "submission-123") + env.ExecuteWorkflow(IngestAlbumWorkflow, IngestAlbumInput{SubmissionID: "submission-123"}) require.True(t, env.IsWorkflowCompleted()) err := env.GetWorkflowError() @@ -286,7 +286,7 @@ func TestIngestAlbumWorkflow_AlpacaError_RetriesThenFails(t *testing.T) { env.OnActivity(alpacaAct.ConfirmAlbumNotIngested, mock.Anything, "1234567890123"). Return(nil, errors.New("unexpected status code: 500")) - env.ExecuteWorkflow(IngestAlbumWorkflow, "submission-123") + env.ExecuteWorkflow(IngestAlbumWorkflow, IngestAlbumInput{SubmissionID: "submission-123"}) require.True(t, env.IsWorkflowCompleted()) err := env.GetWorkflowError() @@ -303,7 +303,7 @@ func TestIngestAlbumWorkflow_LMSFetchFails_Errors(t *testing.T) { env.OnActivity(lmsAct.FetchLMSSubmission, mock.Anything, "submission-123"). Return(nil, errors.New("connection refused")) - env.ExecuteWorkflow(IngestAlbumWorkflow, "submission-123") + env.ExecuteWorkflow(IngestAlbumWorkflow, IngestAlbumInput{SubmissionID: "submission-123"}) require.True(t, env.IsWorkflowCompleted()) err := env.GetWorkflowError() From ec3f5049c01613ededb1becf686fcc147e696f35 Mon Sep 17 00:00:00 2001 From: Myles Date: Tue, 24 Feb 2026 14:45:23 +0100 Subject: [PATCH 09/10] Set default model --- cmd/worker/main.go | 17 +++++++++++++---- go.mod | 6 +++--- go.sum | 4 ++-- internal/ai/brainstormlabelqueries.go | 1 - internal/ai/brainstormworkqueries.go | 1 - internal/ai/grouptracksbywork.go | 1 - internal/ai/matchtrackpieces.go | 1 - internal/ai/pickbestlabel.go | 1 - internal/ai/pickbestwork.go | 1 - 9 files changed, 18 insertions(+), 15 deletions(-) diff --git a/cmd/worker/main.go b/cmd/worker/main.go index d773107..d86b87f 100644 --- a/cmd/worker/main.go +++ b/cmd/worker/main.go @@ -7,8 +7,10 @@ import ( "os" "github.com/firebase/genkit/go/genkit" + "github.com/firebase/genkit/go/plugins/compat_oai/anthropic" oai "github.com/firebase/genkit/go/plugins/compat_oai/openai" "github.com/joho/godotenv" + "github.com/openai/openai-go/option" "go.temporal.io/sdk/client" tlog "go.temporal.io/sdk/log" "go.temporal.io/sdk/worker" @@ -35,10 +37,17 @@ func main() { logger := tlog.NewStructuredLogger(slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: logLevel}))) g := genkit.Init(context.Background(), - genkit.WithPlugins(&oai.OpenAI{ - APIKey: os.Getenv("OPENAI_API_KEY"), - }), - ) + genkit.WithPlugins( + &oai.OpenAI{ + APIKey: os.Getenv("OPENAI_API_KEY"), + }, + &anthropic.Anthropic{ + Opts: []option.RequestOption{ + option.WithAPIKey(os.Getenv("ANTHROPIC_API_KEY")), + }, + }), + genkit.WithDefaultModel("openai/gpt-5.2"), + ) c, err := client.Dial(client.Options{Logger: logger}) if err != nil { diff --git a/go.mod b/go.mod index 74cf4a4..7f545c6 100644 --- a/go.mod +++ b/go.mod @@ -4,6 +4,7 @@ go 1.25.6 require ( github.com/air-verse/air v1.64.5 + github.com/firebase/genkit/go v1.4.0 github.com/go-openapi/errors v0.22.2 github.com/go-openapi/runtime v0.28.0 github.com/go-openapi/strfmt v0.23.0 @@ -11,7 +12,9 @@ require ( github.com/go-openapi/validate v0.24.0 github.com/go-swagger/go-swagger v0.33.1 github.com/golangci/golangci-lint v1.64.8 + github.com/google/uuid v1.6.0 github.com/joho/godotenv v1.5.1 + github.com/openai/openai-go v1.8.2 github.com/stretchr/testify v1.11.1 go.temporal.io/sdk v1.39.0 ) @@ -69,7 +72,6 @@ require ( github.com/fatih/color v1.18.0 // indirect github.com/fatih/structtag v1.2.0 // indirect github.com/felixge/httpsnoop v1.0.4 // indirect - github.com/firebase/genkit/go v1.4.0 // indirect github.com/firefart/nonamedreturns v1.0.5 // indirect github.com/fsnotify/fsnotify v1.9.0 // indirect github.com/fzipp/gocyclo v0.6.0 // indirect @@ -108,7 +110,6 @@ require ( github.com/golangci/unconvert v0.0.0-20240309020433-c5143eacb3ed // indirect github.com/google/dotprompt/go v0.0.0-20251014011017-8d056e027254 // indirect github.com/google/go-cmp v0.7.0 // indirect - github.com/google/uuid v1.6.0 // indirect github.com/gordonklaus/ineffassign v0.1.0 // indirect github.com/gorilla/handlers v1.5.2 // indirect github.com/gostaticanalysis/analysisutil v0.7.1 // indirect @@ -169,7 +170,6 @@ require ( github.com/olekukonko/errors v1.1.0 // indirect github.com/olekukonko/ll v0.0.9 // indirect github.com/olekukonko/tablewriter v1.0.9 // indirect - github.com/openai/openai-go v1.8.2 // indirect github.com/opentracing/opentracing-go v1.2.0 // indirect github.com/pelletier/go-toml v1.9.5 // indirect github.com/pelletier/go-toml/v2 v2.2.4 // indirect diff --git a/go.sum b/go.sum index 13b9143..d945360 100644 --- a/go.sum +++ b/go.sum @@ -846,8 +846,8 @@ go.uber.org/atomic v1.9.0 h1:ECmE8Bn/WFTYwEW/bpKD3M8VtR/zQVbavAoalC1PYyE= go.uber.org/atomic v1.9.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= go.uber.org/automaxprocs v1.6.0 h1:O3y2/QNTOdbF+e/dpXNNW7Rx2hZ4sTIPyybbxyNqTUs= go.uber.org/automaxprocs v1.6.0/go.mod h1:ifeIMSnPZuznNm6jmdzmU3/bfk01Fe2fotchwEFJ8r8= -go.uber.org/goleak v1.1.11 h1:wy28qYRKZgnJTxGxvye5/wgWr1EKjmUDGYox5mGlRlI= -go.uber.org/goleak v1.1.11/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ= +go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= +go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= go.uber.org/multierr v1.9.0 h1:7fIwc/ZtS0q++VgcfqFDxSBZVv/Xo49/SYnDFupUwlI= go.uber.org/multierr v1.9.0/go.mod h1:X2jQV1h+kxSjClGpnseKVIxpmcjrj7MNnI0bnlfKTVQ= go.uber.org/zap v1.24.0 h1:FiJd5l1UOLj0wCgbSE0rwwXHzEdAZS6hiiSnxJN/D60= diff --git a/internal/ai/brainstormlabelqueries.go b/internal/ai/brainstormlabelqueries.go index db0ae08..3ac7188 100644 --- a/internal/ai/brainstormlabelqueries.go +++ b/internal/ai/brainstormlabelqueries.go @@ -18,7 +18,6 @@ func BrainstormLabelQueriesFlow(g *genkit.Genkit) *core.Flow[string, []string, s } result, _, err := genkit.GenerateData[alternativeQueries](ctx, g, - ai.WithModelName("openai/gpt-5.2"), ai.WithSystem(`You are a classical music metadata expert. You help match record label names to their canonical entries in a music database. diff --git a/internal/ai/brainstormworkqueries.go b/internal/ai/brainstormworkqueries.go index 8107feb..be04109 100644 --- a/internal/ai/brainstormworkqueries.go +++ b/internal/ai/brainstormworkqueries.go @@ -23,7 +23,6 @@ func BrainstormWorkQueriesFlow(g *genkit.Genkit) *core.Flow[BrainstormWorkInput, } result, _, err := genkit.GenerateData[alternativeQueries](ctx, g, - ai.WithModelName("openai/gpt-5.2"), ai.WithSystem(`You are a classical music metadata expert. You help match classical work titles to their canonical entries in a music database. diff --git a/internal/ai/grouptracksbywork.go b/internal/ai/grouptracksbywork.go index e8c7a09..67c089d 100644 --- a/internal/ai/grouptracksbywork.go +++ b/internal/ai/grouptracksbywork.go @@ -37,7 +37,6 @@ func GroupTracksByWorkFlow(g *genkit.Genkit) *core.Flow[AnalyzeTracksInput, *Ana } result, _, err := genkit.GenerateData[AnalyzeTracksResult](ctx, g, - ai.WithModelName("openai/gpt-5.2"), ai.WithSystem(`You are a classical music metadata expert. You analyze track listings from album submissions and identify which musical works the tracks belong to. diff --git a/internal/ai/matchtrackpieces.go b/internal/ai/matchtrackpieces.go index eac6c01..99a7ff3 100644 --- a/internal/ai/matchtrackpieces.go +++ b/internal/ai/matchtrackpieces.go @@ -56,7 +56,6 @@ func MatchTrackPiecesFlow(g *genkit.Genkit) *core.Flow[MatchTrackPiecesInput, *M } result, _, err := genkit.GenerateData[MatchTrackPiecesResult](ctx, g, - ai.WithModelName("openai/gpt-5.2"), ai.WithSystem(`You are a classical music metadata expert. You match album tracks to specific pieces within a musical work's structure. diff --git a/internal/ai/pickbestlabel.go b/internal/ai/pickbestlabel.go index 58190a6..2778baf 100644 --- a/internal/ai/pickbestlabel.go +++ b/internal/ai/pickbestlabel.go @@ -44,7 +44,6 @@ func PickBestLabelFlow(g *genkit.Genkit) *core.Flow[PickLabelInput, *PickLabelRe } result, _, err := genkit.GenerateData[PickLabelResult](ctx, g, - ai.WithModelName("openai/gpt-5.2"), ai.WithSystem(`You are a classical music metadata expert. You help match record label names to their canonical entries in a music database. diff --git a/internal/ai/pickbestwork.go b/internal/ai/pickbestwork.go index c890ac2..d70d169 100644 --- a/internal/ai/pickbestwork.go +++ b/internal/ai/pickbestwork.go @@ -54,7 +54,6 @@ func PickBestWorkFlow(g *genkit.Genkit) *core.Flow[PickWorkInput, *PickWorkResul } result, _, err := genkit.GenerateData[PickWorkResult](ctx, g, - ai.WithModelName("openai/gpt-5.2"), ai.WithSystem(`You are a classical music metadata expert. You help match work titles to their canonical entries in a music database. From 8001b72cf7eefa53d0e3ac97b8ece865e4afd4d9 Mon Sep 17 00:00:00 2001 From: Myles Date: Tue, 24 Feb 2026 16:15:51 +0100 Subject: [PATCH 10/10] Misc refactors for work matching --- .../activities/reconcile/findworkcandidates.go | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/internal/temporal/activities/reconcile/findworkcandidates.go b/internal/temporal/activities/reconcile/findworkcandidates.go index 8eaad56..bcb09ab 100644 --- a/internal/temporal/activities/reconcile/findworkcandidates.go +++ b/internal/temporal/activities/reconcile/findworkcandidates.go @@ -3,6 +3,7 @@ package reconcile import ( "context" "fmt" + "slices" "strings" "go.temporal.io/sdk/activity" @@ -193,6 +194,12 @@ func (c *ReconcileRecordingConfig) tryMatchPieces( return nil, false, fmt.Errorf("get work %s: %w", cand.workID, err) } + // Workparts with a recording ID are non-canonical workparts created for ingesting one specific album. + // They should not be matched to new recordings/tracks. + work.Workparts = slices.DeleteFunc(work.Workparts, func(wp alpaca.WorkpartFull) bool { + return wp.RecordingID != nil + }) + pieces := flattenPieces(work) if len(pieces) == 0 { al.Append("no_pieces", fmt.Sprintf("work %s (%s) has no pieces", cand.workID, cand.workTitle)) @@ -218,13 +225,10 @@ func (c *ReconcileRecordingConfig) tryMatchPieces( } if matchResult.Confidence >= 0.7 { - al.Append("pieces_matched", formatPiecesMatched(work, matchResult, input.Tracks, cand)) + al.Append("pieces_matched", logPieceReconciliationStructure(work, matchResult, input.Tracks, cand)) var workParts []ResolvedWorkPart for _, wp := range work.Workparts { - if wp.RecordingID != nil { - continue - } pieceIDs := make([]string, len(wp.Pieces)) for j, p := range wp.Pieces { pieceIDs[j] = p.ID @@ -265,9 +269,6 @@ func stripWorkPrefix(trackTitle, workTitle string) string { func flattenPieces(work *alpaca.WorkFull) []ai.PieceSummary { var pieces []ai.PieceSummary for _, wp := range work.Workparts { - if wp.RecordingID != nil { - continue - } wpTitle := translationText(wp.Title) for _, p := range wp.Pieces { pieces = append(pieces, ai.PieceSummary{ @@ -315,7 +316,7 @@ func workTitle(w *models.WorkAdminWork) string { return "" } -func formatPiecesMatched( +func logPieceReconciliationStructure( work *alpaca.WorkFull, result *ai.MatchTrackPiecesResult, tracks []TrackForPiece,