Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ lint-fix: $(GOBIN)/golangci-lint
test:
go test $(if $(path),./$(path)/...,./...)

.PHONY: ingest
ingest:
go run ./cmd/exec-workflow ingest-album $(if $(skip-dup),--skip-duplicate-check) $(id)

.PHONY: format
format:
gofmt -s -w .
30 changes: 30 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,29 @@ Alpaca, and ingests matched albums via Alpaca's ingestion API.
- **Temporal Worker**: Executes album reconciliation workflows
- **HTTP API (TODO)**: Accepts workflow requests, exposes workflow state

### Workflow Structure

```mermaid
graph TD
A["**FetchLMSSubmission**
fetch submission from LMS"] --> B["**ConfirmAlbumNotIngested**
abort if album already exists in Alpaca"]
B --> C["**ReconcileAlbumMetadata**
match record label, parse release date & p-line"]
B --> D["**GroupTracksByWork**
LLM groups tracks into presumptive works based on track titles"]

D --> R["**ReconcileWork** ×N
one concurrent goroutine per track group;
finds work candidates, matches tracks to pieces"]

C --> G["**BuildIngestionPayload**
assemble Alpaca ingestion payload"]
R --> G
G --> H["return IngestionPayload"]
```


## External Dependencies

- [LMS (Label Metadata Service)](https://github.com/IDAGIO/idagio-label-metadata-explorer-service) - Source of preprocessed DDEX submissions
Expand Down Expand Up @@ -70,6 +93,13 @@ make test path=internal/temporal/workflows # Run tests for specific package
make worker # Start Temporal worker
```

### How to run a workflow?

```
make ingest id=<lms-submission-id> # Ingest an album
make ingest id=<lms-submission-id> skip-dup=1 # Skip the duplicate-album check
```

## API

TBD
Expand Down
22 changes: 14 additions & 8 deletions cmd/exec-workflow/main.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
package main

import (
"context"
"bytes"
"context"
"encoding/json"
"flag"
"fmt"
"log"
"os"
Expand Down Expand Up @@ -31,23 +32,28 @@ func main() {

switch workflowName {
case "ingest-album":
if len(os.Args) < 3 {
log.Fatalln("Usage: exec-workflow ingest-album <submission-id>")
fs := flag.NewFlagSet("ingest-album", flag.ExitOnError)
skipDuplicateCheck := fs.Bool("skip-duplicate-check", false, "skip the check that prevents re-ingesting an existing album")
fs.Parse(os.Args[2:])
if fs.NArg() < 1 {
log.Fatalln("Usage: exec-workflow ingest-album [--skip-duplicate-check] id=<submission-id>")
}
submissionID := os.Args[2]
runIngestAlbum(c, submissionID)
runIngestAlbum(c, workflows.IngestAlbumInput{
SubmissionID: fs.Arg(0),
SkipDuplicateCheck: *skipDuplicateCheck,
})
default:
log.Fatalf("Unknown workflow: %s", workflowName)
}
}

func runIngestAlbum(c client.Client, submissionID string) {
func runIngestAlbum(c client.Client, input workflows.IngestAlbumInput) {
options := client.StartWorkflowOptions{
ID: fmt.Sprintf("ingest-album-%s", submissionID),
ID: fmt.Sprintf("ingest-album-%s", input.SubmissionID),
TaskQueue: "auto-ingester-task-queue",
}

we, err := c.ExecuteWorkflow(context.Background(), options, workflows.IngestAlbumWorkflow, submissionID)
we, err := c.ExecuteWorkflow(context.Background(), options, workflows.IngestAlbumWorkflow, input)
if err != nil {
log.Fatalln("Unable to execute workflow", err)
}
Expand Down
4 changes: 4 additions & 0 deletions cmd/genkit-dev/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,10 @@ func main() {

aiflows.BrainstormLabelQueriesFlow(g)
aiflows.PickBestLabelFlow(g)
aiflows.GroupTracksByWorkFlow(g)
aiflows.BrainstormWorkQueriesFlow(g)
aiflows.PickBestWorkFlow(g)
aiflows.MatchTrackPiecesFlow(g)

mux := http.NewServeMux()
for _, flow := range genkit.ListFlows(g) {
Expand Down
18 changes: 14 additions & 4 deletions cmd/worker/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@ import (
"os"

"github.com/firebase/genkit/go/genkit"
"github.com/firebase/genkit/go/plugins/compat_oai/anthropic"
oai "github.com/firebase/genkit/go/plugins/compat_oai/openai"
"github.com/joho/godotenv"
"github.com/openai/openai-go/option"
"go.temporal.io/sdk/client"
tlog "go.temporal.io/sdk/log"
"go.temporal.io/sdk/worker"
Expand All @@ -35,10 +37,17 @@ func main() {
logger := tlog.NewStructuredLogger(slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: logLevel})))

g := genkit.Init(context.Background(),
genkit.WithPlugins(&oai.OpenAI{
APIKey: os.Getenv("OPENAI_API_KEY"),
}),
)
genkit.WithPlugins(
&oai.OpenAI{
APIKey: os.Getenv("OPENAI_API_KEY"),
},
&anthropic.Anthropic{
Opts: []option.RequestOption{
option.WithAPIKey(os.Getenv("ANTHROPIC_API_KEY")),
},
}),
genkit.WithDefaultModel("openai/gpt-5.2"),
)

c, err := client.Dial(client.Options{Logger: logger})
if err != nil {
Expand All @@ -57,6 +66,7 @@ func main() {
w.RegisterActivity(&alpacaactivities.Config{Client: alpacaClient})

w.RegisterActivity(reconcileactivities.NewConfig(alpacaClient, g))
w.RegisterActivity(reconcileactivities.NewReconcileRecordingConfig(alpacaClient, g))

err = w.Run(worker.InterruptCh())
if err != nil {
Expand Down
6 changes: 3 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,17 @@ go 1.25.6

require (
github.com/air-verse/air v1.64.5
github.com/firebase/genkit/go v1.4.0
github.com/go-openapi/errors v0.22.2
github.com/go-openapi/runtime v0.28.0
github.com/go-openapi/strfmt v0.23.0
github.com/go-openapi/swag v0.23.1
github.com/go-openapi/validate v0.24.0
github.com/go-swagger/go-swagger v0.33.1
github.com/golangci/golangci-lint v1.64.8
github.com/google/uuid v1.6.0
github.com/joho/godotenv v1.5.1
github.com/openai/openai-go v1.8.2
github.com/stretchr/testify v1.11.1
go.temporal.io/sdk v1.39.0
)
Expand Down Expand Up @@ -69,7 +72,6 @@ require (
github.com/fatih/color v1.18.0 // indirect
github.com/fatih/structtag v1.2.0 // indirect
github.com/felixge/httpsnoop v1.0.4 // indirect
github.com/firebase/genkit/go v1.4.0 // indirect
github.com/firefart/nonamedreturns v1.0.5 // indirect
github.com/fsnotify/fsnotify v1.9.0 // indirect
github.com/fzipp/gocyclo v0.6.0 // indirect
Expand Down Expand Up @@ -108,7 +110,6 @@ require (
github.com/golangci/unconvert v0.0.0-20240309020433-c5143eacb3ed // indirect
github.com/google/dotprompt/go v0.0.0-20251014011017-8d056e027254 // indirect
github.com/google/go-cmp v0.7.0 // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/gordonklaus/ineffassign v0.1.0 // indirect
github.com/gorilla/handlers v1.5.2 // indirect
github.com/gostaticanalysis/analysisutil v0.7.1 // indirect
Expand Down Expand Up @@ -169,7 +170,6 @@ require (
github.com/olekukonko/errors v1.1.0 // indirect
github.com/olekukonko/ll v0.0.9 // indirect
github.com/olekukonko/tablewriter v1.0.9 // indirect
github.com/openai/openai-go v1.8.2 // indirect
github.com/opentracing/opentracing-go v1.2.0 // indirect
github.com/pelletier/go-toml v1.9.5 // indirect
github.com/pelletier/go-toml/v2 v2.2.4 // indirect
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -846,8 +846,8 @@ go.uber.org/atomic v1.9.0 h1:ECmE8Bn/WFTYwEW/bpKD3M8VtR/zQVbavAoalC1PYyE=
go.uber.org/atomic v1.9.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
go.uber.org/automaxprocs v1.6.0 h1:O3y2/QNTOdbF+e/dpXNNW7Rx2hZ4sTIPyybbxyNqTUs=
go.uber.org/automaxprocs v1.6.0/go.mod h1:ifeIMSnPZuznNm6jmdzmU3/bfk01Fe2fotchwEFJ8r8=
go.uber.org/goleak v1.1.11 h1:wy28qYRKZgnJTxGxvye5/wgWr1EKjmUDGYox5mGlRlI=
go.uber.org/goleak v1.1.11/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ=
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
go.uber.org/multierr v1.9.0 h1:7fIwc/ZtS0q++VgcfqFDxSBZVv/Xo49/SYnDFupUwlI=
go.uber.org/multierr v1.9.0/go.mod h1:X2jQV1h+kxSjClGpnseKVIxpmcjrj7MNnI0bnlfKTVQ=
go.uber.org/zap v1.24.0 h1:FiJd5l1UOLj0wCgbSE0rwwXHzEdAZS6hiiSnxJN/D60=
Expand Down
45 changes: 45 additions & 0 deletions internal/ai/brainstormlabelqueries.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package ai

import (
"context"
"errors"
"fmt"

"github.com/firebase/genkit/go/ai"
"github.com/firebase/genkit/go/core"
"github.com/firebase/genkit/go/genkit"
)

func BrainstormLabelQueriesFlow(g *genkit.Genkit) *core.Flow[string, []string, struct{}] {
return genkit.DefineFlow(g, "brainstormLabelQueries",
func(ctx context.Context, labelName string) ([]string, error) {
if labelName == "" {
return nil, errors.New("label name must not be empty")
}

result, _, err := genkit.GenerateData[alternativeQueries](ctx, g,
ai.WithSystem(`You are a classical music metadata expert.
You help match record label names to their canonical entries in a music database.

The queries you suggest will be used as case-insensitive substring searches against a label name column.
Effective strategies:
- Shorter, core-name queries match more broadly than long ones
- Remove suffixes like "GmbH", "Ltd", "Inc", "Records", "Recordings", "Rec.", "Classics"
- Abbreviate or expand well-known names in both directions (e.g. "UMG" ↔ "Universal Music", "DG" ↔ "Deutsche Grammophon")
- Drop punctuation (hyphens, periods, ampersands)
- Parent company names can substitute for imprint names and vice versa

Given a label name, produce 3-5 alternative queries most likely to match the canonical label name.
Prefer the shortest effective form for each variation.`),
ai.WithPrompt(
"The label name %q was not found. Suggest 3-5 alternative search queries.",
labelName,
),
)
if err != nil {
return nil, fmt.Errorf("brainstorm label alternatives: %w", err)
}
return result.Queries, nil
},
)
}
53 changes: 53 additions & 0 deletions internal/ai/brainstormworkqueries.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package ai

import (
"context"
"errors"
"fmt"

"github.com/firebase/genkit/go/ai"
"github.com/firebase/genkit/go/core"
"github.com/firebase/genkit/go/genkit"
)

type BrainstormWorkInput struct {
WorkTitle string `json:"work_title"`
ComposerName string `json:"composer_name"`
}

func BrainstormWorkQueriesFlow(g *genkit.Genkit) *core.Flow[BrainstormWorkInput, []string, struct{}] {
return genkit.DefineFlow(g, "brainstormWorkQueries",
func(ctx context.Context, input BrainstormWorkInput) ([]string, error) {
if input.WorkTitle == "" {
return nil, errors.New("work title must not be empty")
}

result, _, err := genkit.GenerateData[alternativeQueries](ctx, g,
ai.WithSystem(`You are a classical music metadata expert.
You help match classical work titles to their canonical entries in a music database.

The queries you suggest will be used as case-insensitive substring searches against a work title column, alongside a composer name filter.

Effective strategies:
- Opus/catalogue number variations: Op., BWV, K., KV, D., HWV, RV, S., Sz., etc.
- Language variants: German vs Italian vs French vs English titles (e.g. "Sinfonie" vs "Symphony" vs "Symphonie")
- Popular title vs formal title (e.g. "Moonlight Sonata" vs "Piano Sonata No. 14")
- Number formatting differences ("No. 5" vs "Nr. 5" vs "n° 5")
- Abbreviations and key variations ("G minor" vs "g-Moll" vs "sol mineur")
- Prefer the shortest effective form for each variation
- Avoid including movement/section information, e.g. return a query for "Piano Sonata No. 14" instead of "Piano Sonata No. 14, Op. 27 No. 2 'Moonlight' Allegro",
or "Carmen" instead of "Carmen La Fleur de L'Amour"

Given a work title and composer name, produce 3-5 alternative search queries most likely to match the canonical title.`),
ai.WithPrompt(
"The work %q by %q was not found. Suggest 3-5 alternative search queries.",
input.WorkTitle, input.ComposerName,
),
)
if err != nil {
return nil, fmt.Errorf("brainstorm work alternatives: %w", err)
}
return result.Queries, nil
},
)
}
64 changes: 64 additions & 0 deletions internal/ai/grouptracksbywork.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package ai

import (
"context"
"encoding/json"
"fmt"

"github.com/firebase/genkit/go/ai"
"github.com/firebase/genkit/go/core"
"github.com/firebase/genkit/go/genkit"
)

type AnalyzeTracksInput struct {
Tracks []TrackSummary `json:"tracks"`
}

type IdentifiedWork struct {
WorkTitle string `json:"work_title"`
ComposerName string `json:"composer_name"`
TrackIndices []int `json:"track_indices"`
}

type AnalyzeTracksResult struct {
Works []IdentifiedWork `json:"works"`
}

func GroupTracksByWorkFlow(g *genkit.Genkit) *core.Flow[AnalyzeTracksInput, *AnalyzeTracksResult, struct{}] {
return genkit.DefineFlow(g, "groupTracksByWork",
func(ctx context.Context, input AnalyzeTracksInput) (*AnalyzeTracksResult, error) {
if len(input.Tracks) == 0 {
return &AnalyzeTracksResult{}, nil
}

tracksJSON, err := json.Marshal(input.Tracks)
if err != nil {
return nil, fmt.Errorf("marshal tracks: %w", err)
}

result, _, err := genkit.GenerateData[AnalyzeTracksResult](ctx, g,
ai.WithSystem(`You are a classical music metadata expert.
You analyze track listings from album submissions and identify which musical works the tracks belong to.

Idagio uses a Work → WorkPart → Piece hierarchy:
- A Work is the top-level composition (e.g. "Symphony No. 40 in G minor, K. 550")
- WorkParts are movements or sections
- Pieces are the individual performable units that tracks map to

Your task is to group tracks into works, identifying the work title and composer for each group.

Important rules:
- Track titles are unpredictable: they may use "Work / Movement" patterns (e.g. "Symphonie n° 40, K 550 / Molto allegro"), or just the work name for single-movement pieces, or entirely opaque titles
- Composer names come from the input data, NOT from track titles
- Consecutive tracks by the same composer with movement-like titles likely belong to the same work
- Single-movement modern works (e.g. "Become Ocean") — the track title IS the work title
- Work titles should be the canonical name of the composition including catalogue/opus numbers if visible (e.g. "Symphony No. 40 in G minor, K. 550")`),
ai.WithPrompt("Analyze these tracks and group them into works:\n\n%s", string(tracksJSON)),
)
if err != nil {
return nil, fmt.Errorf("analyze track works: %w", err)
}
return result, nil
},
)
}
Loading