diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 2d50b2b..2902211 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -21,7 +21,7 @@ jobs: - name: Set up Go uses: actions/setup-go@v5 with: - go-version: '1.23' + go-version: '1.24' cache: true - name: Run golangci-lint @@ -40,7 +40,7 @@ jobs: - name: Set up Go uses: actions/setup-go@v5 with: - go-version: '1.23' + go-version: '1.24' cache: true - name: Run tests @@ -62,7 +62,7 @@ jobs: strategy: matrix: os: [ubuntu-latest, macos-latest, windows-latest] - go-version: ['1.23'] + go-version: ['1.24'] steps: - name: Checkout code uses: actions/checkout@v4 diff --git a/.golangci.yml b/.golangci.yml index e805ef0..de31658 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -3,16 +3,17 @@ run: timeout: 5m - go: '1.23' + go: '1.24' linters-settings: errcheck: check-type-assertions: true - check-blank: true + check-blank: false govet: - check-shadowing: true enable-all: true + disable: + - fieldalignment gofmt: simplify: true @@ -92,7 +93,7 @@ linters: - funlen # Too noisy for early development - gocognit # Similar to gocyclo, redundant - godox # Allow TODO/FIXME comments - - gomnd # Magic number detection - too strict for initial dev + - mnd # Magic number detection - too strict for initial dev - testpackage # Separate test packages not always needed issues: diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..9120cfc --- /dev/null +++ b/Dockerfile @@ -0,0 +1,29 @@ +FROM golang:1.24-alpine AS builder + +RUN apk add --no-cache git + +WORKDIR /src +COPY go.mod go.sum ./ +RUN go mod download +COPY . . + +ARG VERSION=dev +ARG COMMIT=unknown +ARG BUILD_TIME=unknown + +RUN CGO_ENABLED=0 go build \ + -ldflags "-X main.Version=${VERSION} -X main.Commit=${COMMIT} -X main.BuildTime=${BUILD_TIME}" \ + -o /queuekitd ./cmd/queuekitd + +RUN CGO_ENABLED=0 go build \ + -ldflags "-X main.Version=${VERSION} -X main.Commit=${COMMIT} -X main.BuildTime=${BUILD_TIME}" \ + -o /queuekit ./cmd/queuekit + +FROM gcr.io/distroless/static-debian12:nonroot + +COPY --from=builder /queuekitd /queuekitd +COPY --from=builder /queuekit /queuekit + +EXPOSE 8080 + +ENTRYPOINT ["/queuekitd"] diff --git a/PLAN.md b/PLAN.md index 92704e8..c2f8a0b 100644 --- a/PLAN.md +++ b/PLAN.md @@ -41,46 +41,48 @@ - [x] Basic logging and instrumentation hooks (`log/slog`, per-event structured logs) - [x] Fix retry bug: backends now re-schedule non-final failures as `pending` with backoff delay -## Phase 3 – HTTP API +## Phase 3 – HTTP API ✅ -- [ ] Choose router (chi/echo) -- [ ] Implement endpoints: - - [ ] `POST /v1/jobs` – enqueue - - [ ] `GET /v1/queues` – list queues - - [ ] `GET /v1/queues/{name}/jobs` – list jobs - - [ ] `POST /v1/jobs/{id}/retry` - - [ ] `POST /v1/jobs/{id}/cancel` -- [ ] API key authentication -- [ ] JSON schema & validation -- [ ] Integration tests against Postgres/Redis backends +- [x] Choose router (`chi/v5`) +- [x] Implement endpoints: + - [x] `POST /v1/jobs` – enqueue + - [x] `GET /v1/queues` – list queues + - [x] `GET /v1/queues/{name}/jobs` – list jobs + - [x] `GET /v1/jobs/{id}` – get job + - [x] `POST /v1/jobs/{id}/retry` + - [x] `POST /v1/jobs/{id}/cancel` + - [x] `DELETE /v1/jobs/{id}` – delete job +- [x] API key authentication (Bearer token middleware) +- [x] JSON validation (leverages `job.Validate()`) +- [x] Integration tests (19 tests with mock backend) -## Phase 4 – Dashboard +## Phase 4 – Dashboard ✅ -- [ ] Setup `html/template` with basic layout -- [ ] Views: - - [ ] Queues overview (throughput, latency, failures) - - [ ] Queue details (jobs, statuses, pagination) - - [ ] Job details panel (payload, error, retry) - - [ ] Schedules view -- [ ] Minimal CSS (no framework) -- [ ] Server-side sorting/filtering (no SPA) +- [x] Setup `html/template` with embedded layout +- [x] Views: + - [x] Queues overview (counts, health scores) + - [x] Queue details (jobs, status filter tabs, pagination) + - [x] Job details panel (payload, error, retry button) +- [x] Minimal CSS (no framework, embedded in layout) +- [x] Server-side sorting/filtering (no SPA) -## Phase 5 – CLI (`queuekit`) +## Phase 5 – CLI (`queuekit`) ✅ -- [ ] Set up CLI using `cobra` or stdlib `flag` -- [ ] Commands: - - [ ] `queuekit enqueue --queue --payload ` - - [ ] `queuekit inspect queues` - - [ ] `queuekit inspect jobs --queue ` - - [ ] `queuekit retry ` -- [ ] Global config via YAML (`~/.config/queuekit/config.yaml`) +- [x] Set up CLI using `cobra` +- [x] Commands: + - [x] `queuekit enqueue --queue --payload ` + - [x] `queuekit inspect queues` + - [x] `queuekit inspect jobs --queue ` + - [x] `queuekit retry ` + - [x] `queuekit cancel ` +- [x] Global config via YAML (`~/.config/queuekit/config.yaml`) with viper -## Phase 6 – Packaging & Examples +## Phase 6 – Packaging & Examples ✅ -- [ ] Dockerfile for `queuekitd` -- [ ] `docker-compose.yml` with Postgres + Redis -- [ ] Example integrations: - - [ ] Simple Go service using the client - - [ ] Cron-style recurring jobs -- [ ] Final README polish and screenshots +- [x] Multi-stage Dockerfile for `queuekitd` (distroless final image) +- [x] `docker-compose.yml` with Postgres + Redis +- [x] Example integrations: + - [x] Simple Go service (`examples/simple/`) + - [x] Cron-style recurring jobs (`examples/cron/`) +- [x] README polish with API reference, CLI usage, architecture diagram diff --git a/README.md b/README.md index 37b0c17..40f455c 100644 --- a/README.md +++ b/README.md @@ -1,219 +1,256 @@ -1️⃣ QueueKit (Go Job Queue & Scheduler) -README.md # QueueKit -> A minimal, self-hosted job queue and scheduler written in Go. +> A minimal, self-hosted job queue and scheduler written in Go. > Think Sidekiq/BullMQ, but "boring Go" with HTML templates and Postgres/Redis under the hood. -QueueKit is a background job system for services that need retries, backoff, idempotency, and scheduled work -without dragging in a huge stack or JS-heavy dashboards. +QueueKit is a background job system for services that need retries, backoff, and scheduled work without dragging in a huge stack or JS-heavy dashboards. -- 🧵 **Concurrency-first**: built with Go's goroutines and channels. -- 📬 **Job queues + scheduled jobs**: fire-and-forget work and cron-style recurring jobs. -- 🔁 **Retries & backoff**: configurable strategies, dead-letter queue. -- 🗃️ **Postgres / Redis**: pluggable backend for durability and speed. -- 📊 **Tiny dashboard**: Go `html/template` + minimal CSS, no SPA. -- 🛠️ **CLI tooling**: `queuekit` for enqueueing, inspecting, and managing jobs. +- **Concurrency-first**: built with Go's goroutines and channels. +- **Job queues + scheduled jobs**: fire-and-forget work and cron-style recurring jobs. +- **Retries & backoff**: configurable fixed/exponential strategies, dead-letter queue. +- **Postgres / Redis**: pluggable backend for durability and speed. +- **Dashboard**: Go `html/template` + minimal CSS, no SPA. +- **CLI tooling**: `queuekit` for enqueueing, inspecting, and managing jobs. --- -## Architecture +## Quick Start -High-level components: +### With Docker Compose -- **API server (`queuekitd`)** - - HTTP API to enqueue jobs, inspect queues, and manage schedules. - - Exposes Prometheus metrics (optional). -- **Worker pool** - - Runs inside the same process or separate worker processes. - - Pulls jobs from backend, executes registered handlers. -- **Backend** - - **Postgres**: durable job storage, job history, DLQ. - - **Redis**: fast queue/lock operations. -- **Dashboard** - - Simple HTML views for queues, workers, failures, schedules. +```bash +docker compose up --build +``` -```text -Client → HTTP API → Backend (Postgres/Redis) → Workers → Dashboard +This starts Postgres, Redis, and the `queuekitd` server on port 8080. -Tech Stack +- **API**: http://localhost:8080/v1/queues +- **Dashboard**: http://localhost:8080/dashboard -Language: Go (1.22+) +### Without Docker -Backend: Postgres + Redis +```bash +# Prerequisites: Postgres and Redis running locally -HTTP: net/http + chi/echo/gorilla (TBD) +export QUEUEKIT_BACKEND=postgres +export QUEUEKIT_POSTGRES_DSN="postgres://user:pass@localhost:5432/queuekit?sslmode=disable" +export QUEUEKIT_API_KEY="your-secret-key" -Templates: html/template +go run ./cmd/queuekitd +``` -CLI: cobra or stdlib flag +--- -Quickstart (planned UX) -# run server + workers -queuekitd serve --config ./queuekit.yaml +## API Reference + +All `/v1/*` endpoints require an `Authorization: Bearer ` header. + +| Method | Path | Description | +|----------|-------------------------------|--------------------------| +| `POST` | `/v1/jobs` | Enqueue a new job | +| `GET` | `/v1/jobs/{id}` | Get job details | +| `DELETE` | `/v1/jobs/{id}` | Delete a job | +| `POST` | `/v1/jobs/{id}/retry` | Retry a failed/dead job | +| `POST` | `/v1/jobs/{id}/cancel` | Cancel a pending job | +| `GET` | `/v1/queues` | List all queues | +| `GET` | `/v1/queues/{name}/jobs` | List jobs in a queue | + +### Enqueue a Job + +```bash +curl -X POST http://localhost:8080/v1/jobs \ + -H "Authorization: Bearer your-secret-key" \ + -H "Content-Type: application/json" \ + -d '{ + "type": "send_email", + "queue": "emails", + "payload": {"to": "user@example.com", "subject": "Hello"}, + "priority": 20, + "max_attempts": 5 + }' +``` + +### List Queues + +```bash +curl http://localhost:8080/v1/queues \ + -H "Authorization: Bearer your-secret-key" +``` + +### List Jobs (with filtering) + +```bash +curl "http://localhost:8080/v1/queues/emails/jobs?status=pending&limit=10" \ + -H "Authorization: Bearer your-secret-key" +``` -# enqueue a job from CLI -queuekit enqueue email.send \ - --payload '{"to":"user@example.com","subject":"Hi"}' \ - --queue critical +--- -# inspect queues -queuekit inspect queues -queuekit inspect jobs --queue critical +## Dashboard +The dashboard is available at `/dashboard` and provides: -Then open the dashboard: +- **Queues overview**: pending, running, completed, failed, and dead counts with health scores +- **Queue detail**: filterable job list with status tabs and pagination +- **Job detail**: full payload view, error details, and retry button -http://localhost:8080 +--- -Job Definition (example) +## CLI -In your Go service: +```bash +go install ./cmd/queuekit -import "github.com/yourname/queuekit/client" +# Or use the built binary +./queuekit --help +``` -func main() { - c := client.New("http://localhost:8080", client.WithAPIKey("secret")) - - _ = c.Enqueue(context.Background(), client.Job{ - Queue: "emails", - Type: "email.send", - Payload: map[string]any{ - "to": "user@example.com", - "subject": "Welcome", - }, - }) -} +### Configuration +Create `~/.config/queuekit/config.yaml`: -Worker registration: +```yaml +server: http://localhost:8080 +api_key: your-secret-key +``` -import "github.com/yourname/queuekit/worker" +Or pass flags directly: -func main() { - w := worker.New(worker.Config{ /* ... */ }) +```bash +queuekit --server http://localhost:8080 --api-key your-secret-key inspect queues +``` - w.Handle("email.send", func(ctx context.Context, job worker.Job) error { - // send email here - return nil - }) +### Commands - w.Run() -} +```bash +# Enqueue a job +queuekit enqueue send_email --queue emails --payload '{"to":"user@example.com"}' -Status +# Inspect queues +queuekit inspect queues -This is a portfolio / learning project intended to showcase: +# Inspect jobs in a queue +queuekit inspect jobs --queue emails --status pending --limit 20 -Production-ready Go code organization. +# Retry a failed job +queuekit retry -Concurrency, reliability, and observability patterns. +# Cancel a pending job +queuekit cancel +``` -Minimalist but usable dashboards without any frontend frameworks. +--- + +## Using as a Library + +### Register handlers and run workers -Not yet production hardened. Use at your own risk. +```go +package main -Roadmap (high level) +import ( + "context" + "encoding/json" + "fmt" + "log/slog" - Core job model and storage + "github.com/reckziegelwilliam/queuekit/internal/backend/postgres" + "github.com/reckziegelwilliam/queuekit/internal/queue" + "github.com/reckziegelwilliam/queuekit/internal/worker" +) - Redis + Postgres backends +func main() { + ctx := context.Background() + be, _ := postgres.NewFromDSN(ctx, "postgres://...") + defer be.Close() - Worker pool with backoff & DLQ + registry := worker.NewRegistry() + registry.Register("send_email", func(ctx context.Context, job *queue.Job) error { + fmt.Println("Sending email for job", job.ID) + return nil + }) - HTTP API + auth + // Enqueue a job + job := queue.NewJob("send_email", "emails", json.RawMessage(`{"to":"user@example.com"}`)) + be.Enqueue(ctx, job) - HTML dashboard + // Start workers + pool := worker.NewPool(be, registry, []worker.QueueConfig{ + {Name: "emails", Concurrency: 4}, + }, worker.WithPoolLogger(slog.Default())) - CLI (queuekit / queuekitd) + pool.Start(ctx) + // ... wait for shutdown signal ... + pool.Stop() +} +``` - Docker & example deployment +See [`examples/simple/`](examples/simple/) and [`examples/cron/`](examples/cron/) for complete working examples. +--- -### `PLAN.md` +## Architecture -```md -# QueueKit – Implementation Plan +``` +┌──────────┐ ┌────────────────────────────────────────┐ +│ queuekit │────▶│ queuekitd │ +│ (CLI) │HTTP │ ┌──────────┐ ┌──────────────────┐ │ +└──────────┘ │ │ HTTP API │ │ Dashboard │ │ + │ │ /v1/* │ │ /dashboard/* │ │ + │ └────┬─────┘ └────────┬──────────┘ │ + │ │ │ │ + │ ▼ ▼ │ + │ ┌──────────────────────────────┐ │ + │ │ backend.Backend │ │ + │ ├──────────────┬───────────────┤ │ + │ │ Postgres │ Redis │ │ + │ └──────────────┴───────────────┘ │ + │ │ │ + │ ▼ │ + │ ┌──────────────────┐ │ + │ │ Worker Pool │ │ + │ │ (goroutines) │ │ + │ └──────────────────┘ │ + └────────────────────────────────────────┘ +``` -## Phase 0 – Repo & Skeleton +--- -- [ ] Initialize Go module: `github.com//queuekit` -- [ ] Create basic structure: - - [ ] `cmd/queuekitd` – server/worker binary - - [ ] `cmd/queuekit` – CLI tool - - [ ] `internal/queue` – core domain types - - [ ] `internal/backend` – Postgres/Redis adapters - - [ ] `internal/worker` – worker pool - - [ ] `internal/httpapi` – HTTP handlers - - [ ] `internal/dashboard` – templates and handlers -- [ ] Add `Makefile` / `taskfile` for common commands -- [ ] Set up Go linters and CI (GitHub Actions) +## Configuration -## Phase 1 – Core Domain & Storage +`queuekitd` is configured via environment variables: -- [ ] Define job model: - - [ ] `Job` (id, type, queue, payload, status, attempts, scheduled_at, etc.) - - [ ] `Queue` model and statuses -- [ ] Implement backend interfaces: - - [ ] `Backend` interface (enqueue, reserve, ack, nack, moveToDLQ, listQueues, listJobs) - - [ ] Postgres implementation (including migrations) - - [ ] Redis implementation (fast queue operations, locks) -- [ ] Unit tests for backend behavior +| Variable | Default | Description | +|------------------------|----------------|----------------------------------| +| `QUEUEKIT_LISTEN_ADDR` | `:8080` | HTTP listen address | +| `QUEUEKIT_API_KEY` | | API key for `/v1/*` auth | +| `QUEUEKIT_BACKEND` | `postgres` | Backend type: `postgres`/`redis` | +| `QUEUEKIT_POSTGRES_DSN`| | Postgres connection string | +| `QUEUEKIT_REDIS_ADDR` | `localhost:6379`| Redis address | -## Phase 2 – Worker Pool & Execution +--- -- [ ] Implement worker pool: - - [ ] Multiple workers per queue - - [ ] Graceful shutdown - - [ ] Heartbeats / worker status -- [ ] Implement retry & backoff strategies: - - [ ] Fixed backoff - - [ ] Exponential backoff - - [ ] Max attempts → DLQ -- [ ] Worker handler registration: - - [ ] Map job type → handler func - - [ ] Context with job metadata -- [ ] Basic logging and instrumentation hooks +## Development -## Phase 3 – HTTP API +```bash +# Run tests +make test -- [ ] Choose router (chi/echo) -- [ ] Implement endpoints: - - [ ] `POST /v1/jobs` – enqueue - - [ ] `GET /v1/queues` – list queues - - [ ] `GET /v1/queues/{name}/jobs` – list jobs - - [ ] `POST /v1/jobs/{id}/retry` - - [ ] `POST /v1/jobs/{id}/cancel` -- [ ] API key authentication -- [ ] JSON schema & validation -- [ ] Integration tests against Postgres/Redis backends +# Run linter +make lint -## Phase 4 – Dashboard +# Build binaries +make build -- [ ] Setup `html/template` with basic layout -- [ ] Views: - - [ ] Queues overview (throughput, latency, failures) - - [ ] Queue details (jobs, statuses, pagination) - - [ ] Job details panel (payload, error, retry) - - [ ] Schedules view -- [ ] Minimal CSS (no framework) -- [ ] Server-side sorting/filtering (no SPA) +# Run server locally +make run +``` -## Phase 5 – CLI (`queuekit`) +--- -- [ ] Set up CLI using `cobra` or stdlib `flag` -- [ ] Commands: - - [ ] `queuekit enqueue --queue --payload ` - - [ ] `queuekit inspect queues` - - [ ] `queuekit inspect jobs --queue ` - - [ ] `queuekit retry ` -- [ ] Global config via YAML (`~/.config/queuekit/config.yaml`) +## Project Status -## Phase 6 – Packaging & Examples +This is a portfolio / learning project showcasing production-ready Go patterns: +concurrency, reliability, and observability, with minimalist but usable dashboards. -- [ ] Dockerfile for `queuekitd` -- [ ] `docker-compose.yml` with Postgres + Redis -- [ ] Example integrations: - - [ ] Simple Go service using the client - - [ ] Cron-style recurring jobs -- [ ] Final README polish and screenshots +See [PLAN.md](PLAN.md) for the full implementation roadmap. diff --git a/cmd/queuekit/main.go b/cmd/queuekit/main.go index a51c000..729d02a 100644 --- a/cmd/queuekit/main.go +++ b/cmd/queuekit/main.go @@ -1,37 +1,10 @@ +// Command queuekit is the CLI for managing QueueKit jobs and queues. package main import ( - "flag" - "fmt" - "os" -) - -var ( - // Version information (injected at build time) - Version = "dev" - Commit = "unknown" - BuildTime = "unknown" + "github.com/reckziegelwilliam/queuekit/internal/cli" ) func main() { - versionFlag := flag.Bool("version", false, "Print version information") - flag.Parse() - - if *versionFlag { - fmt.Printf("queuekit version %s (commit: %s, built: %s)\n", Version, Commit, BuildTime) - os.Exit(0) - } - - fmt.Println("📋 QueueKit CLI") - fmt.Println("===============") - fmt.Printf("Version: %s\n", Version) - fmt.Printf("Commit: %s\n", Commit) - fmt.Printf("Built: %s\n\n", BuildTime) - fmt.Println("This is a placeholder. CLI functionality will be implemented in Phase 5.") - fmt.Println("Use --version to see version information.") - fmt.Println("\nFuture commands:") - fmt.Println(" queuekit enqueue --queue --payload ") - fmt.Println(" queuekit inspect queues") - fmt.Println(" queuekit inspect jobs --queue ") - fmt.Println(" queuekit retry ") + cli.Execute() } diff --git a/cmd/queuekitd/main.go b/cmd/queuekitd/main.go index fefc476..ec68ea0 100644 --- a/cmd/queuekitd/main.go +++ b/cmd/queuekitd/main.go @@ -1,13 +1,29 @@ +// Command queuekitd runs the QueueKit server and worker pool. package main import ( + "context" "flag" "fmt" + "log/slog" + "net/http" "os" + "os/signal" + "syscall" + "time" + + goredis "github.com/redis/go-redis/v9" + + "github.com/reckziegelwilliam/queuekit/internal/backend" + "github.com/reckziegelwilliam/queuekit/internal/backend/postgres" + redisbe "github.com/reckziegelwilliam/queuekit/internal/backend/redis" + "github.com/reckziegelwilliam/queuekit/internal/config" + "github.com/reckziegelwilliam/queuekit/internal/dashboard" + "github.com/reckziegelwilliam/queuekit/internal/httpapi" + "github.com/reckziegelwilliam/queuekit/internal/worker" ) var ( - // Version information (injected at build time) Version = "dev" Commit = "unknown" BuildTime = "unknown" @@ -22,11 +38,84 @@ func main() { os.Exit(0) } - fmt.Println("🚀 QueueKit Server (queuekitd)") - fmt.Println("================================") - fmt.Printf("Version: %s\n", Version) - fmt.Printf("Commit: %s\n", Commit) - fmt.Printf("Built: %s\n\n", BuildTime) - fmt.Println("This is a placeholder. Server functionality will be implemented in Phase 3.") - fmt.Println("Use --version to see version information.") + logger := slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelInfo})) + slog.SetDefault(logger) + + cfg, err := config.Load() + if err != nil { + logger.Error("failed to load config", "error", err) + os.Exit(1) + } + + ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM) + defer stop() + + be, err := createBackend(ctx, cfg) + if err != nil { + logger.Error("failed to create backend", "error", err) + stop() // run before os.Exit; defers are skipped by os.Exit + os.Exit(1) //nolint:gocritic // exitAfterDefer: explicit stop() above + } + defer func() { _ = be.Close() }() + + registry := worker.NewRegistry() + pool := worker.NewPool(be, registry, nil, worker.WithPoolLogger(logger)) + + srv := httpapi.NewServer(be, cfg.APIKey, logger) + dash := dashboard.New(be, logger) + srv.MountDashboard(dash) + + httpServer := &http.Server{ + Addr: cfg.ListenAddr, + Handler: srv, + ReadHeaderTimeout: 10 * time.Second, + } + + go func() { + logger.Info("starting HTTP server", "addr", cfg.ListenAddr, "backend", cfg.Backend) + if err := httpServer.ListenAndServe(); err != nil && err != http.ErrServerClosed { + logger.Error("http server error", "error", err) + os.Exit(1) //nolint:gocritic // exitAfterDefer: fatal from goroutine; process exit + } + }() + + if err := pool.Start(ctx); err != nil { + logger.Error("failed to start worker pool", "error", err) + stop() + _ = be.Close() + os.Exit(1) //nolint:gocritic // exitAfterDefer: explicit stop/Close above + } + + <-ctx.Done() + logger.Info("shutting down...") + + shutdownCtx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + pool.Stop() + + if err := httpServer.Shutdown(shutdownCtx); err != nil { + logger.Error("http server shutdown error", "error", err) + } + + logger.Info("shutdown complete") + + // Keep the pool and registry accessible for future use (e.g. dashboard) + _ = pool + _ = registry +} + +func createBackend(ctx context.Context, cfg *config.Config) (backend.Backend, error) { + switch cfg.Backend { + case "postgres": + return postgres.NewFromDSN(ctx, cfg.PostgresDSN) + case "redis": + opts, err := goredis.ParseURL("redis://" + cfg.RedisAddr) + if err != nil { + opts = &goredis.Options{Addr: cfg.RedisAddr} + } + return redisbe.NewFromOptions(opts) + default: + return nil, fmt.Errorf("unknown backend: %s", cfg.Backend) + } } diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000..99cb971 --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,45 @@ +services: + postgres: + image: postgres:16-alpine + environment: + POSTGRES_USER: queuekit + POSTGRES_PASSWORD: queuekit + POSTGRES_DB: queuekit + ports: + - "5432:5432" + volumes: + - pgdata:/var/lib/postgresql/data + healthcheck: + test: ["CMD-SHELL", "pg_isready -U queuekit"] + interval: 5s + timeout: 3s + retries: 5 + + redis: + image: redis:7-alpine + ports: + - "6379:6379" + healthcheck: + test: ["CMD", "redis-cli", "ping"] + interval: 5s + timeout: 3s + retries: 5 + + queuekitd: + build: . + ports: + - "8080:8080" + environment: + QUEUEKIT_LISTEN_ADDR: ":8080" + QUEUEKIT_BACKEND: postgres + QUEUEKIT_POSTGRES_DSN: "postgres://queuekit:queuekit@postgres:5432/queuekit?sslmode=disable" + QUEUEKIT_REDIS_ADDR: "redis:6379" + QUEUEKIT_API_KEY: "changeme" + depends_on: + postgres: + condition: service_healthy + redis: + condition: service_healthy + +volumes: + pgdata: diff --git a/examples/cron/main.go b/examples/cron/main.go new file mode 100644 index 0000000..7548454 --- /dev/null +++ b/examples/cron/main.go @@ -0,0 +1,81 @@ +// Command cron demonstrates recurring job scheduling with QueueKit. +package main + +import ( + "context" + "encoding/json" + "fmt" + "log/slog" + "os" + "os/signal" + "syscall" + "time" + + "github.com/reckziegelwilliam/queuekit/internal/backend/postgres" + "github.com/reckziegelwilliam/queuekit/internal/queue" + "github.com/reckziegelwilliam/queuekit/internal/worker" +) + +func main() { + logger := slog.New(slog.NewTextHandler(os.Stdout, nil)) + + dsn := os.Getenv("POSTGRES_DSN") + if dsn == "" { + dsn = "postgres://queuekit:queuekit@localhost:5432/queuekit?sslmode=disable" + } + + ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM) + defer stop() + + be, err := postgres.NewFromDSN(ctx, dsn) + if err != nil { + logger.Error("failed to connect", "error", err) + os.Exit(1) + } + defer func() { _ = be.Close() }() + + registry := worker.NewRegistry() + registry.Register("cleanup", func(ctx context.Context, job *queue.Job) error { + logger.Info("running periodic cleanup", "job_id", job.ID, "time", time.Now().Format(time.RFC3339)) + return nil + }) + + // Enqueue a cleanup job every 30 seconds + go func() { + ticker := time.NewTicker(30 * time.Second) + defer ticker.Stop() + + enqueue := func() { + payload := json.RawMessage(fmt.Sprintf(`{"scheduled_at":"%s"}`, time.Now().Format(time.RFC3339))) + job := queue.NewJob("cleanup", "cron", payload) + if err := be.Enqueue(ctx, job); err != nil { + logger.Error("enqueue cron job failed", "error", err) + return + } + logger.Info("enqueued cron job", "id", job.ID) + } + + enqueue() + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + enqueue() + } + } + }() + + pool := worker.NewPool(be, registry, []worker.QueueConfig{ + {Name: "cron", Concurrency: 1}, + }, worker.WithPoolLogger(logger)) + + if err := pool.Start(ctx); err != nil { + logger.Error("pool start failed", "error", err) + os.Exit(1) + } + + <-ctx.Done() + pool.Stop() + logger.Info("done") +} diff --git a/examples/simple/main.go b/examples/simple/main.go new file mode 100644 index 0000000..8437303 --- /dev/null +++ b/examples/simple/main.go @@ -0,0 +1,74 @@ +// Command simple demonstrates basic QueueKit usage with a worker pool. +package main + +import ( + "context" + "encoding/json" + "fmt" + "log/slog" + "os" + "os/signal" + "syscall" + + "github.com/reckziegelwilliam/queuekit/internal/backend/postgres" + "github.com/reckziegelwilliam/queuekit/internal/queue" + "github.com/reckziegelwilliam/queuekit/internal/worker" +) + +func main() { + logger := slog.New(slog.NewTextHandler(os.Stdout, nil)) + + dsn := os.Getenv("POSTGRES_DSN") + if dsn == "" { + dsn = "postgres://queuekit:queuekit@localhost:5432/queuekit?sslmode=disable" + } + + ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM) + defer stop() + + be, err := postgres.NewFromDSN(ctx, dsn) + if err != nil { + logger.Error("failed to connect", "error", err) + os.Exit(1) + } + defer func() { _ = be.Close() }() + + if err := postgres.RunMigrations(ctx, nil); err != nil { + logger.Warn("migration skipped (run manually)", "error", err) + } + + // Register a handler for "greet" jobs + registry := worker.NewRegistry() + registry.Register("greet", func(ctx context.Context, job *queue.Job) error { + var payload struct { + Name string `json:"name"` + } + if err := json.Unmarshal(job.Payload, &payload); err != nil { + return fmt.Errorf("unmarshal payload: %w", err) + } + logger.Info("Hello!", "name", payload.Name, "job_id", job.ID) + return nil + }) + + // Enqueue a sample job + job := queue.NewJob("greet", "default", json.RawMessage(`{"name":"World"}`)) + if err := be.Enqueue(ctx, job); err != nil { + logger.Error("enqueue failed", "error", err) + os.Exit(1) + } + logger.Info("enqueued job", "id", job.ID) + + // Start a worker pool + pool := worker.NewPool(be, registry, []worker.QueueConfig{ + {Name: "default", Concurrency: 2}, + }, worker.WithPoolLogger(logger)) + + if err := pool.Start(ctx); err != nil { + logger.Error("pool start failed", "error", err) + os.Exit(1) + } + + <-ctx.Done() + pool.Stop() + logger.Info("done") +} diff --git a/go.mod b/go.mod index b7cf1fe..209f305 100644 --- a/go.mod +++ b/go.mod @@ -3,10 +3,13 @@ module github.com/reckziegelwilliam/queuekit go 1.24.0 require ( + github.com/go-chi/chi/v5 v5.2.5 github.com/golang-migrate/migrate/v4 v4.19.1 github.com/google/uuid v1.6.0 github.com/jackc/pgx/v5 v5.7.6 github.com/redis/go-redis/v9 v9.17.2 + github.com/spf13/cobra v1.10.2 + github.com/spf13/viper v1.21.0 github.com/stretchr/testify v1.11.1 ) @@ -14,15 +17,26 @@ require ( github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect + github.com/fsnotify/fsnotify v1.9.0 // indirect + github.com/go-viper/mapstructure/v2 v2.4.0 // indirect + github.com/inconshreveable/mousetrap v1.1.0 // indirect github.com/jackc/pgpassfile v1.0.0 // indirect github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect github.com/jackc/puddle/v2 v2.2.2 // indirect - github.com/kr/text v0.2.0 // indirect github.com/lib/pq v1.10.9 // indirect + github.com/pelletier/go-toml/v2 v2.2.4 // indirect github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect github.com/rogpeppe/go-internal v1.14.1 // indirect + github.com/sagikazarmark/locafero v0.11.0 // indirect + github.com/sourcegraph/conc v0.3.1-0.20240121214520-5f936abd7ae8 // indirect + github.com/spf13/afero v1.15.0 // indirect + github.com/spf13/cast v1.10.0 // indirect + github.com/spf13/pflag v1.0.10 // indirect + github.com/subosito/gotenv v1.6.0 // indirect + go.yaml.in/yaml/v3 v3.0.4 // indirect golang.org/x/crypto v0.45.0 // indirect golang.org/x/sync v0.18.0 // indirect + golang.org/x/sys v0.38.0 // indirect golang.org/x/text v0.31.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index 8f9ea47..d71dc9a 100644 --- a/go.sum +++ b/go.sum @@ -12,7 +12,7 @@ github.com/containerd/errdefs v1.0.0 h1:tg5yIfIlQIrxYtu9ajqY42W3lpS19XqdxRQeEwYG github.com/containerd/errdefs v1.0.0/go.mod h1:+YBYIdtsnF4Iw6nWZhJcqGSg/dwvV7tyJ/kCkyJ2k+M= github.com/containerd/errdefs/pkg v0.3.0 h1:9IKJ06FvyNlexW690DXuQNx2KA2cUJXx151Xdx3ZPPE= github.com/containerd/errdefs/pkg v0.3.0/go.mod h1:NJw6s9HwNuRhnjJhM7pylWwMyAkmCQvQ4GpJHEqRLVk= -github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= +github.com/cpuguy83/go-md2man/v2 v2.0.6/go.mod h1:oOW0eioCTA6cOiMLiUPZOpcVxMig6NIQQ7OS05n1F4g= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM= github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= @@ -30,16 +30,28 @@ github.com/docker/go-units v0.5.0 h1:69rxXcBk27SvSaaxTtLh/8llcHD8vYHT7WSdRZ/jvr4 github.com/docker/go-units v0.5.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk= github.com/felixge/httpsnoop v1.0.4 h1:NFTV2Zj1bL4mc9sqWACXbQFVBBg2W3GPvqp8/ESS2Wg= github.com/felixge/httpsnoop v1.0.4/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U= +github.com/frankban/quicktest v1.14.6 h1:7Xjx+VpznH+oBnejlPUj8oUpdxnVs4f8XU8WnHkI4W8= +github.com/frankban/quicktest v1.14.6/go.mod h1:4ptaffx2x8+WTWXmUCuVU6aPUX1/Mz7zb5vbUoiM6w0= +github.com/fsnotify/fsnotify v1.9.0 h1:2Ml+OJNzbYCTzsxtv8vKSFD9PbJjmhYF14k/jKC7S9k= +github.com/fsnotify/fsnotify v1.9.0/go.mod h1:8jBTzvmWwFyi3Pb8djgCCO5IBqzKJ/Jwo8TRcHyHii0= +github.com/go-chi/chi/v5 v5.2.5 h1:Eg4myHZBjyvJmAFjFvWgrqDTXFyOzjj7YIm3L3mu6Ug= +github.com/go-chi/chi/v5 v5.2.5/go.mod h1:X7Gx4mteadT3eDOMTsXzmI4/rwUpOwBHLpAfupzFJP0= github.com/go-logr/logr v1.4.3 h1:CjnDlHq8ikf6E492q6eKboGOC0T8CDaOvkHCIg8idEI= github.com/go-logr/logr v1.4.3/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= +github.com/go-viper/mapstructure/v2 v2.4.0 h1:EBsztssimR/CONLSZZ04E8qAkxNYq4Qp9LvH92wZUgs= +github.com/go-viper/mapstructure/v2 v2.4.0/go.mod h1:oJDH3BJKyqBA2TXFhDsKDGDTlndYOZ6rGS0BRZIxGhM= github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= github.com/golang-migrate/migrate/v4 v4.19.1 h1:OCyb44lFuQfYXYLx1SCxPZQGU7mcaZ7gH9yH4jSFbBA= github.com/golang-migrate/migrate/v4 v4.19.1/go.mod h1:CTcgfjxhaUtsLipnLoQRWCrjYXycRz/g5+RWDuYgPrE= +github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= +github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8= +github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw= github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM= github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg= github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 h1:iCEnooe7UlwOQYpKFhBabPMi4aNAfoODPEFNiAnClxo= @@ -48,8 +60,8 @@ github.com/jackc/pgx/v5 v5.7.6 h1:rWQc5FwZSPX58r1OQmkuaNicxdmExaEz5A2DO2hUuTk= github.com/jackc/pgx/v5 v5.7.6/go.mod h1:aruU7o91Tc2q2cFp5h4uP3f6ztExVpyVv88Xl/8Vl8M= github.com/jackc/puddle/v2 v2.2.2 h1:PR8nw+E/1w0GLuRFSmiioY6UooMp6KJv0/61nB7icHo= github.com/jackc/puddle/v2 v2.2.2/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4= -github.com/kr/pretty v0.3.0 h1:WgNl7dwNpEZ6jJ9k1snq4pZsg7DOEN8hP9Xw0Tsjwk0= -github.com/kr/pretty v0.3.0/go.mod h1:640gp4NfQd8pI5XOwp5fnNeVWj67G7CFk/SaSQn7NBk= +github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= +github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw= @@ -64,6 +76,8 @@ github.com/opencontainers/go-digest v1.0.0 h1:apOUWs51W5PlhuyGyz9FCeeBIOUDA/6nW8 github.com/opencontainers/go-digest v1.0.0/go.mod h1:0JzlMkj0TRzQZfJkVvzbP0HBR3IKzErnv2BNG4W4MAM= github.com/opencontainers/image-spec v1.1.0 h1:8SG7/vwALn54lVB/0yZ/MMwhFrPYtpEHQb2IpWsCzug= github.com/opencontainers/image-spec v1.1.0/go.mod h1:W4s4sFTMaBeK1BQLXbG4AdM2szdn85PY75RI83NrTrM= +github.com/pelletier/go-toml/v2 v2.2.4 h1:mye9XuhQ6gvn5h28+VilKrrPoQVanw5PMw/TB0t5Ec4= +github.com/pelletier/go-toml/v2 v2.2.4/go.mod h1:2gIqNv+qfxSVS7cM2xJQKtLSTLUE9V8t9Stt+h56mCY= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= @@ -73,11 +87,29 @@ github.com/redis/go-redis/v9 v9.17.2 h1:P2EGsA4qVIM3Pp+aPocCJ7DguDHhqrXNhVcEp4Vi github.com/redis/go-redis/v9 v9.17.2/go.mod h1:u410H11HMLoB+TP67dz8rL9s6QW2j76l0//kSOd3370= github.com/rogpeppe/go-internal v1.14.1 h1:UQB4HGPB6osV0SQTLymcB4TgvyWu6ZyliaW0tI/otEQ= github.com/rogpeppe/go-internal v1.14.1/go.mod h1:MaRKkUm5W0goXpeCfT7UZI6fk/L7L7so1lCWt35ZSgc= +github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= +github.com/sagikazarmark/locafero v0.11.0 h1:1iurJgmM9G3PA/I+wWYIOw/5SyBtxapeHDcg+AAIFXc= +github.com/sagikazarmark/locafero v0.11.0/go.mod h1:nVIGvgyzw595SUSUE6tvCp3YYTeHs15MvlmU87WwIik= +github.com/sourcegraph/conc v0.3.1-0.20240121214520-5f936abd7ae8 h1:+jumHNA0Wrelhe64i8F6HNlS8pkoyMv5sreGx2Ry5Rw= +github.com/sourcegraph/conc v0.3.1-0.20240121214520-5f936abd7ae8/go.mod h1:3n1Cwaq1E1/1lhQhtRK2ts/ZwZEhjcQeJQ1RuC6Q/8U= +github.com/spf13/afero v1.15.0 h1:b/YBCLWAJdFWJTN9cLhiXXcD7mzKn9Dm86dNnfyQw1I= +github.com/spf13/afero v1.15.0/go.mod h1:NC2ByUVxtQs4b3sIUphxK0NioZnmxgyCrfzeuq8lxMg= +github.com/spf13/cast v1.10.0 h1:h2x0u2shc1QuLHfxi+cTJvs30+ZAHOGRic8uyGTDWxY= +github.com/spf13/cast v1.10.0/go.mod h1:jNfB8QC9IA6ZuY2ZjDp0KtFO2LZZlg4S/7bzP6qqeHo= +github.com/spf13/cobra v1.10.2 h1:DMTTonx5m65Ic0GOoRY2c16WCbHxOOw6xxezuLaBpcU= +github.com/spf13/cobra v1.10.2/go.mod h1:7C1pvHqHw5A4vrJfjNwvOdzYu0Gml16OCs2GRiTUUS4= +github.com/spf13/pflag v1.0.9/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg= +github.com/spf13/pflag v1.0.10 h1:4EBh2KAYBwaONj6b2Ye1GiHfwjqyROoF4RwYO+vPwFk= +github.com/spf13/pflag v1.0.10/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg= +github.com/spf13/viper v1.21.0 h1:x5S+0EU27Lbphp4UKm1C+1oQO+rKx36vfCoaVebLFSU= +github.com/spf13/viper v1.21.0/go.mod h1:P0lhsswPGWD/1lZJ9ny3fYnVqxiegrlNrEmgLjbTCAY= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U= github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U= +github.com/subosito/gotenv v1.6.0 h1:9NlTDc1FTs4qu0DDq7AEtTPNw6SVm7uBMsUCUjABIf8= +github.com/subosito/gotenv v1.6.0/go.mod h1:Dk4QP5c2W3ibzajGcXpNraDfq2IrhjMIvMSWPKKo0FU= go.opentelemetry.io/auto/sdk v1.1.0 h1:cH53jehLUN6UFLY71z+NDOiNJqDdPRaXzTel0sJySYA= go.opentelemetry.io/auto/sdk v1.1.0/go.mod h1:3wSPjt5PWp2RhlCcmmOial7AvC4DQqZb7a7wCow3W8A= go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.61.0 h1:F7Jx+6hwnZ41NSFTO5q4LYDtJRXBf2PD0rNBkeB/lus= @@ -88,6 +120,8 @@ go.opentelemetry.io/otel/metric v1.37.0 h1:mvwbQS5m0tbmqML4NqK+e3aDiO02vsf/Wgbsd go.opentelemetry.io/otel/metric v1.37.0/go.mod h1:04wGrZurHYKOc+RKeye86GwKiTb9FKm1WHtO+4EVr2E= go.opentelemetry.io/otel/trace v1.37.0 h1:HLdcFNbRQBE2imdSEgm/kwqmQj1Or1l/7bW6mxVK7z4= go.opentelemetry.io/otel/trace v1.37.0/go.mod h1:TlgrlQ+PtQO5XFerSPUYG0JSgGyryXewPGyayAWSBS0= +go.yaml.in/yaml/v3 v3.0.4 h1:tfq32ie2Jv2UxXFdLJdh3jXuOzWiL1fo0bu/FbuKpbc= +go.yaml.in/yaml/v3 v3.0.4/go.mod h1:DhzuOOF2ATzADvBadXxruRBLzYTpT36CKvDb3+aBEFg= golang.org/x/crypto v0.45.0 h1:jMBrvKuj23MTlT0bQEOBcAE0mjg8mK9RXFhRH6nyF3Q= golang.org/x/crypto v0.45.0/go.mod h1:XTGrrkGJve7CYK7J8PEww4aY7gM3qMCElcJQ8n8JdX4= golang.org/x/sync v0.18.0 h1:kr88TuHDroi+UVf+0hZnirlk8o8T+4MrK6mr60WkH/I= diff --git a/internal/backend/backend.go b/internal/backend/backend.go index 160d768..f5b5fc8 100644 --- a/internal/backend/backend.go +++ b/internal/backend/backend.go @@ -1,3 +1,4 @@ +// Package backend defines the storage interface for job queue backends. package backend import ( diff --git a/internal/backend/postgres/migrations.go b/internal/backend/postgres/migrations.go index ce9eb98..ff62ce1 100644 --- a/internal/backend/postgres/migrations.go +++ b/internal/backend/postgres/migrations.go @@ -38,7 +38,7 @@ func RunMigrations(ctx context.Context, pool *pgxpool.Pool) error { if err != nil { return fmt.Errorf("failed to create migrator: %w", err) } - defer m.Close() + defer m.Close() //nolint:errcheck // best-effort cleanup // Run migrations if err := m.Up(); err != nil && err != migrate.ErrNoChange { diff --git a/internal/backend/postgres/postgres.go b/internal/backend/postgres/postgres.go index 5f181cb..9cc8cb9 100644 --- a/internal/backend/postgres/postgres.go +++ b/internal/backend/postgres/postgres.go @@ -1,8 +1,8 @@ +// Package postgres implements the Backend interface using PostgreSQL. package postgres import ( "context" - "encoding/json" "errors" "fmt" "time" @@ -77,7 +77,7 @@ func (p *PostgresBackend) Reserve(ctx context.Context, queueName string) (*queue if err != nil { return nil, fmt.Errorf("failed to begin transaction: %w", err) } - defer tx.Rollback(ctx) + defer tx.Rollback(ctx) //nolint:errcheck // rollback after commit is a no-op // Find and lock the next available job query := ` @@ -160,7 +160,7 @@ func (p *PostgresBackend) Nack(ctx context.Context, jobID string, jobErr error, if err != nil { return fmt.Errorf("failed to begin transaction: %w", err) } - defer tx.Rollback(ctx) + defer tx.Rollback(ctx) //nolint:errcheck // rollback after commit is a no-op // Get current job state var attempts, maxAttempts int @@ -383,22 +383,3 @@ func (p *PostgresBackend) Close() error { p.pool.Close() return nil } - -// Helper function to scan job payload -func scanJob(rows pgx.Row) (*queue.Job, error) { - job := &queue.Job{} - var payloadBytes []byte - - err := rows.Scan( - &job.ID, &job.Type, &job.Queue, &payloadBytes, &job.Status, &job.Priority, - &job.Attempts, &job.MaxAttempts, &job.ScheduledAt, &job.CreatedAt, - &job.UpdatedAt, &job.CompletedAt, &job.FailedAt, &job.LastError, - ) - - if err != nil { - return nil, err - } - - job.Payload = json.RawMessage(payloadBytes) - return job, nil -} diff --git a/internal/backend/postgres/postgres_test.go b/internal/backend/postgres/postgres_test.go index 37e35b3..83389f9 100644 --- a/internal/backend/postgres/postgres_test.go +++ b/internal/backend/postgres/postgres_test.go @@ -204,7 +204,7 @@ func TestPostgresBackend_Nack(t *testing.T) { err = backend.Nack(ctx, retrying.ID, testErr, 0) require.NoError(t, err) - dead, err := backend.GetJob(ctx, failed.ID) + dead, err := backend.GetJob(ctx, retrying.ID) require.NoError(t, err) assert.Equal(t, queue.StatusDead, dead.Status) assert.Equal(t, 2, dead.Attempts) diff --git a/internal/backend/redis/redis.go b/internal/backend/redis/redis.go index 5bc07e1..e9a614a 100644 --- a/internal/backend/redis/redis.go +++ b/internal/backend/redis/redis.go @@ -1,3 +1,4 @@ +// Package redis implements the Backend interface using Redis. package redis import ( @@ -190,8 +191,14 @@ func (r *RedisBackend) Nack(ctx context.Context, jobID string, jobErr error, ret return fmt.Errorf("job not found: %s", jobID) } - attempts, _ := strconv.Atoi(jobData["attempts"]) - maxAttempts, _ := strconv.Atoi(jobData["max_attempts"]) + attempts, err := strconv.Atoi(jobData["attempts"]) + if err != nil { + return fmt.Errorf("invalid attempts value: %w", err) + } + maxAttempts, err := strconv.Atoi(jobData["max_attempts"]) + if err != nil { + return fmt.Errorf("invalid max_attempts value: %w", err) + } queueName := jobData["queue"] lastError := "" @@ -290,27 +297,29 @@ func (r *RedisBackend) ListQueues(ctx context.Context) ([]queue.Queue, error) { func (r *RedisBackend) ListJobs(ctx context.Context, queueName, status string, limit, offset int) ([]*queue.Job, error) { var jobIDs []string - if status != "" { + switch { + case status != "": // Get job IDs from status set members, err := r.client.SMembers(ctx, statusKey(queueName, status)).Result() if err != nil { return nil, fmt.Errorf("failed to get job IDs from status set: %w", err) } jobIDs = members - } else if queueName != "" { + case queueName != "": // Get job IDs from queue sorted set members, err := r.client.ZRange(ctx, queueKey(queueName), 0, -1).Result() if err != nil { return nil, fmt.Errorf("failed to get job IDs from queue: %w", err) } jobIDs = members - } else { + default: // Get all job keys keys, err := r.client.Keys(ctx, "job:*").Result() if err != nil { return nil, fmt.Errorf("failed to list job keys: %w", err) } // Extract job IDs from keys + jobIDs = make([]string, 0, len(keys)) for _, key := range keys { jobIDs = append(jobIDs, key[4:]) // Remove "job:" prefix } diff --git a/internal/backend/redis/redis_test.go b/internal/backend/redis/redis_test.go index 8933e17..69d6189 100644 --- a/internal/backend/redis/redis_test.go +++ b/internal/backend/redis/redis_test.go @@ -202,7 +202,7 @@ func TestRedisBackend_Nack(t *testing.T) { err = backend.Nack(ctx, retrying.ID, testErr, 0) require.NoError(t, err) - dead, err := backend.GetJob(ctx, failed.ID) + dead, err := backend.GetJob(ctx, retrying.ID) require.NoError(t, err) assert.Equal(t, queue.StatusDead, dead.Status) assert.Equal(t, 2, dead.Attempts) diff --git a/internal/cli/client.go b/internal/cli/client.go new file mode 100644 index 0000000..8aaad14 --- /dev/null +++ b/internal/cli/client.go @@ -0,0 +1,71 @@ +package cli + +import ( + "bytes" + "encoding/json" + "fmt" + "io" + "net/http" + "time" +) + +type Client struct { + baseURL string + apiKey string + httpClient *http.Client +} + +func NewClient(baseURL, apiKey string) *Client { + return &Client{ + baseURL: baseURL, + apiKey: apiKey, + httpClient: &http.Client{ + Timeout: 30 * time.Second, + }, + } +} + +func (c *Client) do(method, path string, body any) (*http.Response, error) { + var buf io.Reader + if body != nil { + data, err := json.Marshal(body) + if err != nil { + return nil, fmt.Errorf("marshal request: %w", err) + } + buf = bytes.NewReader(data) + } + + req, err := http.NewRequest(method, c.baseURL+path, buf) + if err != nil { + return nil, fmt.Errorf("create request: %w", err) + } + req.Header.Set("Content-Type", "application/json") + if c.apiKey != "" { + req.Header.Set("Authorization", "Bearer "+c.apiKey) + } + + resp, err := c.httpClient.Do(req) + if err != nil { + return nil, fmt.Errorf("request failed: %w", err) + } + return resp, nil +} + +func (c *Client) decodeOrError(resp *http.Response, v any) error { + defer resp.Body.Close() //nolint:errcheck // best-effort cleanup + + if resp.StatusCode >= 400 { + var errBody struct { + Error string `json:"error"` + } + if err := json.NewDecoder(resp.Body).Decode(&errBody); err == nil && errBody.Error != "" { + return fmt.Errorf("server error (%d): %s", resp.StatusCode, errBody.Error) + } + return fmt.Errorf("server error: %s", resp.Status) + } + + if v != nil { + return json.NewDecoder(resp.Body).Decode(v) + } + return nil +} diff --git a/internal/cli/cmd_cancel.go b/internal/cli/cmd_cancel.go new file mode 100644 index 0000000..9a0a915 --- /dev/null +++ b/internal/cli/cmd_cancel.go @@ -0,0 +1,31 @@ +package cli + +import ( + "fmt" + "net/http" + + "github.com/spf13/cobra" +) + +func init() { + rootCmd.AddCommand(&cobra.Command{ + Use: "cancel ", + Short: "Cancel a pending job", + Args: cobra.ExactArgs(1), + RunE: runCancel, + }) +} + +func runCancel(cmd *cobra.Command, args []string) error { + resp, err := client.do(http.MethodPost, "/v1/jobs/"+args[0]+"/cancel", nil) + if err != nil { + return err + } + + if err := client.decodeOrError(resp, nil); err != nil { + return err + } + + fmt.Println("Job canceled.") + return nil +} diff --git a/internal/cli/cmd_enqueue.go b/internal/cli/cmd_enqueue.go new file mode 100644 index 0000000..77c6651 --- /dev/null +++ b/internal/cli/cmd_enqueue.go @@ -0,0 +1,59 @@ +package cli + +import ( + "encoding/json" + "fmt" + "net/http" + + "github.com/spf13/cobra" +) + +func init() { + cmd := &cobra.Command{ + Use: "enqueue ", + Short: "Enqueue a new job", + Args: cobra.ExactArgs(1), + RunE: runEnqueue, + } + + cmd.Flags().String("queue", "default", "target queue name") + cmd.Flags().String("payload", "{}", "JSON payload") + cmd.Flags().Int("priority", 10, "job priority (0=low, 10=normal, 20=high, 30=critical)") + cmd.Flags().Int("max-attempts", 3, "maximum retry attempts") + + rootCmd.AddCommand(cmd) +} + +func runEnqueue(cmd *cobra.Command, args []string) error { + queueName, _ := cmd.Flags().GetString("queue") + payloadStr, _ := cmd.Flags().GetString("payload") + priority, _ := cmd.Flags().GetInt("priority") + maxAttempts, _ := cmd.Flags().GetInt("max-attempts") + + if !json.Valid([]byte(payloadStr)) { + return fmt.Errorf("invalid JSON payload: %s", payloadStr) + } + + body := map[string]any{ + "type": args[0], + "queue": queueName, + "payload": json.RawMessage(payloadStr), + "priority": priority, + "max_attempts": maxAttempts, + } + + resp, err := client.do(http.MethodPost, "/v1/jobs", body) + if err != nil { + return err + } + + var job struct { + ID string `json:"id"` + } + if err := client.decodeOrError(resp, &job); err != nil { + return err + } + + fmt.Printf("Job enqueued: %s\n", job.ID) + return nil +} diff --git a/internal/cli/cmd_inspect.go b/internal/cli/cmd_inspect.go new file mode 100644 index 0000000..444fd75 --- /dev/null +++ b/internal/cli/cmd_inspect.go @@ -0,0 +1,120 @@ +package cli + +import ( + "fmt" + "net/http" + "net/url" + "os" + "strconv" + "text/tabwriter" + + "github.com/spf13/cobra" +) + +func init() { + inspectCmd := &cobra.Command{ + Use: "inspect", + Short: "Inspect queues and jobs", + } + + queuesCmd := &cobra.Command{ + Use: "queues", + Short: "List all queues with statistics", + RunE: runInspectQueues, + } + + jobsCmd := &cobra.Command{ + Use: "jobs", + Short: "List jobs in a queue", + RunE: runInspectJobs, + } + jobsCmd.Flags().String("queue", "", "queue name (required)") + jobsCmd.Flags().String("status", "", "filter by status") + jobsCmd.Flags().Int("limit", 20, "number of jobs to show") + _ = jobsCmd.MarkFlagRequired("queue") + + inspectCmd.AddCommand(queuesCmd, jobsCmd) + rootCmd.AddCommand(inspectCmd) +} + +func runInspectQueues(cmd *cobra.Command, args []string) error { + resp, err := client.do(http.MethodGet, "/v1/queues", nil) + if err != nil { + return err + } + + var queues []struct { + Name string `json:"name"` + Size int64 `json:"size"` + ProcessingCount int64 `json:"processing_count"` + CompletedCount int64 `json:"completed_count"` + FailedCount int64 `json:"failed_count"` + DeadCount int64 `json:"dead_count"` + } + if err := client.decodeOrError(resp, &queues); err != nil { + return err + } + + w := tabwriter.NewWriter(os.Stdout, 0, 0, 2, ' ', 0) + _, _ = fmt.Fprintln(w, "NAME\tPENDING\tRUNNING\tCOMPLETED\tFAILED\tDEAD") + for _, q := range queues { + _, _ = fmt.Fprintf(w, "%s\t%d\t%d\t%d\t%d\t%d\n", + q.Name, q.Size, q.ProcessingCount, q.CompletedCount, q.FailedCount, q.DeadCount) + } + _ = w.Flush() + return nil +} + +func runInspectJobs(cmd *cobra.Command, args []string) error { + queueName, _ := cmd.Flags().GetString("queue") + status, _ := cmd.Flags().GetString("status") + limit, _ := cmd.Flags().GetInt("limit") + + params := url.Values{} + if status != "" { + params.Set("status", status) + } + params.Set("limit", strconv.Itoa(limit)) + + path := "/v1/queues/" + url.PathEscape(queueName) + "/jobs" + if len(params) > 0 { + path += "?" + params.Encode() + } + + resp, err := client.do(http.MethodGet, path, nil) + if err != nil { + return err + } + + var jobs []struct { + ID string `json:"id"` + Type string `json:"type"` + Status string `json:"status"` + Attempts int `json:"attempts"` + CreatedAt string `json:"created_at"` + LastError string `json:"last_error"` + } + if err := client.decodeOrError(resp, &jobs); err != nil { + return err + } + + w := tabwriter.NewWriter(os.Stdout, 0, 0, 2, ' ', 0) + _, _ = fmt.Fprintln(w, "ID\tTYPE\tSTATUS\tATTEMPTS\tCREATED\tERROR") + for _, j := range jobs { + id := j.ID + if len(id) > 8 { + id = id[:8] + } + errMsg := j.LastError + if len(errMsg) > 40 { + errMsg = errMsg[:40] + "..." + } + if errMsg == "" { + errMsg = "-" + } + _, _ = fmt.Fprintf(w, "%s\t%s\t%s\t%d\t%s\t%s\n", + id, j.Type, j.Status, j.Attempts, j.CreatedAt, errMsg) + } + _ = w.Flush() + return nil +} diff --git a/internal/cli/cmd_retry.go b/internal/cli/cmd_retry.go new file mode 100644 index 0000000..22f1a1e --- /dev/null +++ b/internal/cli/cmd_retry.go @@ -0,0 +1,34 @@ +package cli + +import ( + "fmt" + "net/http" + + "github.com/spf13/cobra" +) + +func init() { + rootCmd.AddCommand(&cobra.Command{ + Use: "retry ", + Short: "Retry a failed or dead job", + Args: cobra.ExactArgs(1), + RunE: runRetry, + }) +} + +func runRetry(cmd *cobra.Command, args []string) error { + resp, err := client.do(http.MethodPost, "/v1/jobs/"+args[0]+"/retry", nil) + if err != nil { + return err + } + + var job struct { + ID string `json:"id"` + } + if err := client.decodeOrError(resp, &job); err != nil { + return err + } + + fmt.Printf("Job retried. New job ID: %s\n", job.ID) + return nil +} diff --git a/internal/cli/root.go b/internal/cli/root.go new file mode 100644 index 0000000..97aea55 --- /dev/null +++ b/internal/cli/root.go @@ -0,0 +1,67 @@ +// Package cli implements the queuekit command-line interface. +package cli + +import ( + "fmt" + "os" + "path/filepath" + + "github.com/spf13/cobra" + "github.com/spf13/viper" +) + +var ( + cfgFile string + client *Client +) + +var rootCmd = &cobra.Command{ + Use: "queuekit", + Short: "QueueKit CLI — manage jobs and queues", + PersistentPreRunE: func(cmd *cobra.Command, args []string) error { + server := viper.GetString("server") + apiKey := viper.GetString("api_key") + if server == "" { + server = "http://localhost:8080" + } + client = NewClient(server, apiKey) + return nil + }, +} + +func init() { + cobra.OnInitialize(initConfig) + + rootCmd.PersistentFlags().StringVar(&cfgFile, "config", "", "config file (default ~/.config/queuekit/config.yaml)") + rootCmd.PersistentFlags().String("server", "", "server URL (default http://localhost:8080)") + rootCmd.PersistentFlags().String("api-key", "", "API key for authentication") + + _ = viper.BindPFlag("server", rootCmd.PersistentFlags().Lookup("server")) + _ = viper.BindPFlag("api_key", rootCmd.PersistentFlags().Lookup("api-key")) +} + +func initConfig() { + if cfgFile != "" { + viper.SetConfigFile(cfgFile) + } else { + home, err := os.UserHomeDir() + if err != nil { + return + } + configDir := filepath.Join(home, ".config", "queuekit") + viper.AddConfigPath(configDir) + viper.SetConfigName("config") + viper.SetConfigType("yaml") + } + + viper.SetEnvPrefix("QUEUEKIT") + viper.AutomaticEnv() + _ = viper.ReadInConfig() +} + +func Execute() { + if err := rootCmd.Execute(); err != nil { + fmt.Fprintln(os.Stderr, err) + os.Exit(1) + } +} diff --git a/internal/config/config.go b/internal/config/config.go new file mode 100644 index 0000000..74f8fc3 --- /dev/null +++ b/internal/config/config.go @@ -0,0 +1,45 @@ +// Package config handles environment-based configuration for queuekitd. +package config + +import ( + "fmt" + "os" +) + +type Config struct { + ListenAddr string + APIKey string + Backend string // "postgres" or "redis" + PostgresDSN string + RedisAddr string +} + +func Load() (*Config, error) { + c := &Config{ + ListenAddr: envOrDefault("QUEUEKIT_LISTEN_ADDR", ":8080"), + APIKey: os.Getenv("QUEUEKIT_API_KEY"), + Backend: envOrDefault("QUEUEKIT_BACKEND", "postgres"), + PostgresDSN: os.Getenv("QUEUEKIT_POSTGRES_DSN"), + RedisAddr: envOrDefault("QUEUEKIT_REDIS_ADDR", "localhost:6379"), + } + + switch c.Backend { + case "postgres": + if c.PostgresDSN == "" { + return nil, fmt.Errorf("QUEUEKIT_POSTGRES_DSN is required when backend is postgres") + } + case "redis": + // RedisAddr has a default, so no hard requirement + default: + return nil, fmt.Errorf("unsupported backend %q: must be postgres or redis", c.Backend) + } + + return c, nil +} + +func envOrDefault(key, fallback string) string { + if v := os.Getenv(key); v != "" { + return v + } + return fallback +} diff --git a/internal/dashboard/.gitkeep b/internal/dashboard/.gitkeep deleted file mode 100644 index 2f80107..0000000 --- a/internal/dashboard/.gitkeep +++ /dev/null @@ -1,2 +0,0 @@ -# Templates and dashboard handlers will go here - diff --git a/internal/dashboard/dashboard.go b/internal/dashboard/dashboard.go new file mode 100644 index 0000000..d68a0b8 --- /dev/null +++ b/internal/dashboard/dashboard.go @@ -0,0 +1,197 @@ +// Package dashboard provides server-rendered HTML views for queue management. +package dashboard + +import ( + "bytes" + "embed" + "encoding/json" + "html/template" + "log/slog" + "net/http" + "strconv" + + "github.com/go-chi/chi/v5" + + "github.com/reckziegelwilliam/queuekit/internal/backend" + "github.com/reckziegelwilliam/queuekit/internal/queue" +) + +//go:embed templates/*.html +var templateFS embed.FS + +type Dashboard struct { + backend backend.Backend + logger *slog.Logger + tmpl *template.Template + router chi.Router +} + +func New(b backend.Backend, logger *slog.Logger) *Dashboard { + d := &Dashboard{ + backend: b, + logger: logger, + tmpl: template.Must(template.ParseFS(templateFS, "templates/*.html")), + router: chi.NewRouter(), + } + d.routes() + return d +} + +func (d *Dashboard) ServeHTTP(w http.ResponseWriter, r *http.Request) { + d.router.ServeHTTP(w, r) +} + +func (d *Dashboard) routes() { + d.router.Get("/", d.handleQueues) + d.router.Get("/queues/{name}", d.handleQueueDetail) + d.router.Get("/jobs/{id}", d.handleJobDetail) + d.router.Post("/jobs/{id}/retry", d.handleRetryJob) +} + +// --- page data types --------------------------------------------------------- + +type queuesPage struct { + Title string + Queues []queue.Queue +} + +type queueDetailPage struct { + Title string + QueueName string + Status string + Jobs []*queue.Job + Limit int + Offset int + PrevOffset int + NextOffset int + HasNext bool +} + +type jobDetailPage struct { + Title string + Job *queue.Job + PayloadPretty string +} + +// --- handlers ---------------------------------------------------------------- + +func (d *Dashboard) handleQueues(w http.ResponseWriter, r *http.Request) { + queues, err := d.backend.ListQueues(r.Context()) + if err != nil { + d.renderError(w, "Failed to list queues", err) + return + } + + d.render(w, "queues.html", queuesPage{ + Title: "Queues", + Queues: queues, + }) +} + +func (d *Dashboard) handleQueueDetail(w http.ResponseWriter, r *http.Request) { + name := chi.URLParam(r, "name") + status := r.URL.Query().Get("status") + + limit := 50 + if v := r.URL.Query().Get("limit"); v != "" { + if parsed, err := strconv.Atoi(v); err == nil && parsed > 0 { + limit = parsed + } + } + + offset := 0 + if v := r.URL.Query().Get("offset"); v != "" { + if parsed, err := strconv.Atoi(v); err == nil && parsed >= 0 { + offset = parsed + } + } + + jobs, err := d.backend.ListJobs(r.Context(), name, status, limit+1, offset) + if err != nil { + d.renderError(w, "Failed to list jobs", err) + return + } + + hasNext := len(jobs) > limit + if hasNext { + jobs = jobs[:limit] + } + + prevOffset := offset - limit + if prevOffset < 0 { + prevOffset = 0 + } + + d.render(w, "queue_detail.html", queueDetailPage{ + Title: "Queue: " + name, + QueueName: name, + Status: status, + Jobs: jobs, + Limit: limit, + Offset: offset, + PrevOffset: prevOffset, + NextOffset: offset + limit, + HasNext: hasNext, + }) +} + +func (d *Dashboard) handleJobDetail(w http.ResponseWriter, r *http.Request) { + id := chi.URLParam(r, "id") + + job, err := d.backend.GetJob(r.Context(), id) + if err != nil { + http.Error(w, "Job not found", http.StatusNotFound) + return + } + + var pretty bytes.Buffer + if err := json.Indent(&pretty, job.Payload, "", " "); err != nil { + pretty.Write(job.Payload) + } + + d.render(w, "job_detail.html", jobDetailPage{ + Title: "Job: " + id[:8], + Job: job, + PayloadPretty: pretty.String(), + }) +} + +func (d *Dashboard) handleRetryJob(w http.ResponseWriter, r *http.Request) { + id := chi.URLParam(r, "id") + + job, err := d.backend.GetJob(r.Context(), id) + if err != nil { + http.Error(w, "Job not found", http.StatusNotFound) + return + } + + if err := d.backend.DeleteJob(r.Context(), id); err != nil { + d.renderError(w, "Failed to delete old job for retry", err) + return + } + + retried := queue.NewJob(job.Type, job.Queue, job.Payload) + retried.Priority = job.Priority + retried.MaxAttempts = job.MaxAttempts + + if err := d.backend.Enqueue(r.Context(), retried); err != nil { + d.renderError(w, "Failed to enqueue retried job", err) + return + } + + http.Redirect(w, r, "/dashboard/jobs/"+retried.ID, http.StatusSeeOther) +} + +// --- render helpers ---------------------------------------------------------- + +func (d *Dashboard) render(w http.ResponseWriter, name string, data any) { + w.Header().Set("Content-Type", "text/html; charset=utf-8") + if err := d.tmpl.ExecuteTemplate(w, "layout.html", data); err != nil { + d.logger.Error("template render failed", "template", name, "error", err) + } +} + +func (d *Dashboard) renderError(w http.ResponseWriter, msg string, err error) { + d.logger.Error(msg, "error", err) + http.Error(w, msg, http.StatusInternalServerError) +} diff --git a/internal/dashboard/templates/job_detail.html b/internal/dashboard/templates/job_detail.html new file mode 100644 index 0000000..c512877 --- /dev/null +++ b/internal/dashboard/templates/job_detail.html @@ -0,0 +1,32 @@ +{{define "content"}} +

Job: {{slice .Job.ID 0 8}}…

+ +
+

Details

+
+
ID
{{.Job.ID}}
+
Type
{{.Job.Type}}
+
Queue
{{.Job.Queue}}
+
Status
{{.Job.Status}}
+
Priority
{{.Job.Priority}}
+
Attempts
{{.Job.Attempts}} / {{.Job.MaxAttempts}}
+
Scheduled
{{.Job.ScheduledAt.Format "2006-01-02 15:04:05 UTC"}}
+
Created
{{.Job.CreatedAt.Format "2006-01-02 15:04:05 UTC"}}
+
Updated
{{.Job.UpdatedAt.Format "2006-01-02 15:04:05 UTC"}}
+ {{if .Job.CompletedAt}}
Completed
{{.Job.CompletedAt.Format "2006-01-02 15:04:05 UTC"}}
{{end}} + {{if .Job.FailedAt}}
Failed
{{.Job.FailedAt.Format "2006-01-02 15:04:05 UTC"}}
{{end}} + {{if .Job.LastError}}
Last Error
{{.Job.LastError}}
{{end}} +
+
+ +
+

Payload

+
{{.PayloadPretty}}
+
+ +{{if or (eq .Job.Status "failed") (eq .Job.Status "dead")}} +
+ +
+{{end}} +{{end}} diff --git a/internal/dashboard/templates/layout.html b/internal/dashboard/templates/layout.html new file mode 100644 index 0000000..462fc73 --- /dev/null +++ b/internal/dashboard/templates/layout.html @@ -0,0 +1,68 @@ + + + + + + {{.Title}} — QueueKit + + + + +
+ {{template "content" .}} +
+ + diff --git a/internal/dashboard/templates/queue_detail.html b/internal/dashboard/templates/queue_detail.html new file mode 100644 index 0000000..648b00f --- /dev/null +++ b/internal/dashboard/templates/queue_detail.html @@ -0,0 +1,54 @@ +{{define "content"}} +

Queue: {{.QueueName}}

+ + + +{{if .Jobs}} + + + + + + + + + + + + + {{range .Jobs}} + + + + + + + + + {{end}} + +
IDTypeStatusAttemptsCreatedError
{{slice .ID 0 8}}…{{.Type}}{{.Status}}{{.Attempts}}/{{.MaxAttempts}}{{.CreatedAt.Format "2006-01-02 15:04:05"}}{{if .LastError}}{{.LastError}}{{else}}—{{end}}
+ + +{{else}} +

No jobs found.

+{{end}} +{{end}} diff --git a/internal/dashboard/templates/queues.html b/internal/dashboard/templates/queues.html new file mode 100644 index 0000000..5e24211 --- /dev/null +++ b/internal/dashboard/templates/queues.html @@ -0,0 +1,35 @@ +{{define "content"}} +

Queues

+{{if .Queues}} + + + + + + + + + + + + + + + {{range .Queues}} + + + + + + + + + + + {{end}} + +
NamePendingRunningCompletedFailedDeadTotalHealth
{{.Name}}{{.Size}}{{.ProcessingCount}}{{.CompletedCount}}{{.FailedCount}}{{.DeadCount}}{{.TotalJobs}}{{printf "%.0f" .HealthScore}}%
+{{else}} +

No queues found. Enqueue a job to get started.

+{{end}} +{{end}} diff --git a/internal/httpapi/.gitkeep b/internal/httpapi/.gitkeep deleted file mode 100644 index efe4867..0000000 --- a/internal/httpapi/.gitkeep +++ /dev/null @@ -1,2 +0,0 @@ -# HTTP handlers will go here - diff --git a/internal/httpapi/handlers.go b/internal/httpapi/handlers.go new file mode 100644 index 0000000..cae2afc --- /dev/null +++ b/internal/httpapi/handlers.go @@ -0,0 +1,208 @@ +package httpapi + +import ( + "encoding/json" + "net/http" + "strconv" + "strings" + "time" + + "github.com/go-chi/chi/v5" + + "github.com/reckziegelwilliam/queuekit/internal/queue" +) + +// --- request / response types ------------------------------------------------ + +type enqueueRequest struct { + Type string `json:"type"` + Queue string `json:"queue"` + Payload json.RawMessage `json:"payload"` + Priority *int `json:"priority,omitempty"` + MaxAttempts *int `json:"max_attempts,omitempty"` + ScheduledAt *time.Time `json:"scheduled_at,omitempty"` +} + +// --- helpers ----------------------------------------------------------------- + +func writeJSON(w http.ResponseWriter, status int, v any) { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(status) + _ = json.NewEncoder(w).Encode(v) +} + +func writeError(w http.ResponseWriter, status int, msg string) { + writeJSON(w, status, map[string]string{"error": msg}) +} + +// --- handlers ---------------------------------------------------------------- + +func (s *Server) handleEnqueue(w http.ResponseWriter, r *http.Request) { + var req enqueueRequest + if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + writeError(w, http.StatusBadRequest, "invalid JSON body") + return + } + + job := queue.NewJob(req.Type, req.Queue, req.Payload) + + if req.Priority != nil { + job.Priority = *req.Priority + } + if req.MaxAttempts != nil { + job.MaxAttempts = *req.MaxAttempts + } + if req.ScheduledAt != nil { + job.ScheduledAt = *req.ScheduledAt + } + + if err := job.Validate(); err != nil { + writeError(w, http.StatusUnprocessableEntity, err.Error()) + return + } + + if err := s.backend.Enqueue(r.Context(), job); err != nil { + s.logger.Error("enqueue failed", "error", err) + writeError(w, http.StatusInternalServerError, "failed to enqueue job") + return + } + + writeJSON(w, http.StatusCreated, job) +} + +func (s *Server) handleGetJob(w http.ResponseWriter, r *http.Request) { + id := chi.URLParam(r, "id") + + job, err := s.backend.GetJob(r.Context(), id) + if err != nil { + if strings.Contains(err.Error(), "not found") { + writeError(w, http.StatusNotFound, "job not found") + return + } + s.logger.Error("get job failed", "error", err) + writeError(w, http.StatusInternalServerError, "failed to get job") + return + } + + writeJSON(w, http.StatusOK, job) +} + +func (s *Server) handleRetryJob(w http.ResponseWriter, r *http.Request) { + id := chi.URLParam(r, "id") + + job, err := s.backend.GetJob(r.Context(), id) + if err != nil { + if strings.Contains(err.Error(), "not found") { + writeError(w, http.StatusNotFound, "job not found") + return + } + s.logger.Error("get job for retry failed", "error", err) + writeError(w, http.StatusInternalServerError, "failed to get job") + return + } + + if err := s.backend.DeleteJob(r.Context(), job.ID); err != nil { + s.logger.Error("delete old job for retry failed", "error", err) + writeError(w, http.StatusInternalServerError, "failed to retry job") + return + } + + retried := queue.NewJob(job.Type, job.Queue, job.Payload) + retried.Priority = job.Priority + retried.MaxAttempts = job.MaxAttempts + + if err := s.backend.Enqueue(r.Context(), retried); err != nil { + s.logger.Error("enqueue retry failed", "error", err) + writeError(w, http.StatusInternalServerError, "failed to retry job") + return + } + + writeJSON(w, http.StatusOK, retried) +} + +func (s *Server) handleCancelJob(w http.ResponseWriter, r *http.Request) { + id := chi.URLParam(r, "id") + + job, err := s.backend.GetJob(r.Context(), id) + if err != nil { + if strings.Contains(err.Error(), "not found") { + writeError(w, http.StatusNotFound, "job not found") + return + } + s.logger.Error("get job for cancel failed", "error", err) + writeError(w, http.StatusInternalServerError, "failed to get job") + return + } + + if job.Status != queue.StatusPending { + writeError(w, http.StatusConflict, "only pending jobs can be canceled") + return + } + + if err := s.backend.DeleteJob(r.Context(), id); err != nil { + s.logger.Error("cancel job failed", "error", err) + writeError(w, http.StatusInternalServerError, "failed to cancel job") + return + } + + writeJSON(w, http.StatusOK, map[string]string{"status": "canceled"}) +} + +func (s *Server) handleDeleteJob(w http.ResponseWriter, r *http.Request) { + id := chi.URLParam(r, "id") + + if err := s.backend.DeleteJob(r.Context(), id); err != nil { + if strings.Contains(err.Error(), "not found") { + writeError(w, http.StatusNotFound, "job not found") + return + } + s.logger.Error("delete job failed", "error", err) + writeError(w, http.StatusInternalServerError, "failed to delete job") + return + } + + w.WriteHeader(http.StatusNoContent) +} + +func (s *Server) handleListQueues(w http.ResponseWriter, r *http.Request) { + queues, err := s.backend.ListQueues(r.Context()) + if err != nil { + s.logger.Error("list queues failed", "error", err) + writeError(w, http.StatusInternalServerError, "failed to list queues") + return + } + + writeJSON(w, http.StatusOK, queues) +} + +func (s *Server) handleListJobs(w http.ResponseWriter, r *http.Request) { + name := chi.URLParam(r, "name") + status := r.URL.Query().Get("status") + + limit := 50 + if v := r.URL.Query().Get("limit"); v != "" { + if parsed, err := strconv.Atoi(v); err == nil && parsed > 0 { + limit = parsed + } + } + + offset := 0 + if v := r.URL.Query().Get("offset"); v != "" { + if parsed, err := strconv.Atoi(v); err == nil && parsed >= 0 { + offset = parsed + } + } + + jobs, err := s.backend.ListJobs(r.Context(), name, status, limit, offset) + if err != nil { + s.logger.Error("list jobs failed", "error", err) + writeError(w, http.StatusInternalServerError, "failed to list jobs") + return + } + + if jobs == nil { + jobs = []*queue.Job{} + } + + writeJSON(w, http.StatusOK, jobs) +} diff --git a/internal/httpapi/handlers_test.go b/internal/httpapi/handlers_test.go new file mode 100644 index 0000000..9209643 --- /dev/null +++ b/internal/httpapi/handlers_test.go @@ -0,0 +1,455 @@ +// Package httpapi_test contains integration tests for the HTTP API handlers. +package httpapi_test + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "log/slog" + "net/http" + "net/http/httptest" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/reckziegelwilliam/queuekit/internal/backend" + "github.com/reckziegelwilliam/queuekit/internal/httpapi" + "github.com/reckziegelwilliam/queuekit/internal/queue" +) + +// --- mock backend ------------------------------------------------------------ + +var _ backend.Backend = (*mockBackend)(nil) + +type mockBackend struct { + mu sync.Mutex + jobs map[string]*queue.Job +} + +func newMockBackend() *mockBackend { + return &mockBackend{jobs: make(map[string]*queue.Job)} +} + +func (m *mockBackend) Enqueue(_ context.Context, job *queue.Job) error { + m.mu.Lock() + defer m.mu.Unlock() + m.jobs[job.ID] = job + return nil +} + +func (m *mockBackend) Reserve(_ context.Context, queueName string) (*queue.Job, error) { + m.mu.Lock() + defer m.mu.Unlock() + for _, j := range m.jobs { + if j.Queue == queueName && j.Status == queue.StatusPending { + j.Status = queue.StatusRunning + return j, nil + } + } + return nil, nil +} + +func (m *mockBackend) Ack(_ context.Context, jobID string) error { + m.mu.Lock() + defer m.mu.Unlock() + j, ok := m.jobs[jobID] + if !ok { + return fmt.Errorf("job not found: %s", jobID) + } + j.Status = queue.StatusCompleted + return nil +} + +func (m *mockBackend) Nack(_ context.Context, jobID string, jobErr error, _ time.Duration) error { + m.mu.Lock() + defer m.mu.Unlock() + j, ok := m.jobs[jobID] + if !ok { + return fmt.Errorf("job not found: %s", jobID) + } + j.Attempts++ + if jobErr != nil { + j.LastError = jobErr.Error() + } + if j.Attempts >= j.MaxAttempts { + j.Status = queue.StatusDead + } else { + j.Status = queue.StatusPending + } + return nil +} + +func (m *mockBackend) MoveToDLQ(_ context.Context, jobID string) error { + m.mu.Lock() + defer m.mu.Unlock() + j, ok := m.jobs[jobID] + if !ok { + return fmt.Errorf("job not found: %s", jobID) + } + j.Status = queue.StatusDead + return nil +} + +func (m *mockBackend) ListQueues(_ context.Context) ([]queue.Queue, error) { + m.mu.Lock() + defer m.mu.Unlock() + stats := make(map[string]*queue.Queue) + for _, j := range m.jobs { + q, ok := stats[j.Queue] + if !ok { + q = &queue.Queue{Name: j.Queue} + stats[j.Queue] = q + } + switch j.Status { + case queue.StatusPending: + q.Size++ + case queue.StatusRunning: + q.ProcessingCount++ + case queue.StatusCompleted: + q.CompletedCount++ + case queue.StatusFailed: + q.FailedCount++ + case queue.StatusDead: + q.DeadCount++ + } + } + result := make([]queue.Queue, 0, len(stats)) + for _, q := range stats { + result = append(result, *q) + } + return result, nil +} + +func (m *mockBackend) ListJobs(_ context.Context, queueName, status string, limit, offset int) ([]*queue.Job, error) { + m.mu.Lock() + defer m.mu.Unlock() + result := make([]*queue.Job, 0, len(m.jobs)) + for _, j := range m.jobs { + if queueName != "" && j.Queue != queueName { + continue + } + if status != "" && j.Status != status { + continue + } + result = append(result, j) + } + if offset > len(result) { + return nil, nil + } + result = result[offset:] + if limit > 0 && limit < len(result) { + result = result[:limit] + } + return result, nil +} + +func (m *mockBackend) GetJob(_ context.Context, jobID string) (*queue.Job, error) { + m.mu.Lock() + defer m.mu.Unlock() + j, ok := m.jobs[jobID] + if !ok { + return nil, fmt.Errorf("job not found: %s", jobID) + } + return j, nil +} + +func (m *mockBackend) DeleteJob(_ context.Context, jobID string) error { + m.mu.Lock() + defer m.mu.Unlock() + if _, ok := m.jobs[jobID]; !ok { + return fmt.Errorf("job not found: %s", jobID) + } + delete(m.jobs, jobID) + return nil +} + +func (m *mockBackend) Close() error { return nil } + +// --- test helpers ------------------------------------------------------------ + +const testAPIKey = "test-secret-key" + +func newTestServer(b backend.Backend) *httpapi.Server { + logger := slog.Default() + return httpapi.NewServer(b, testAPIKey, logger) +} + +func authHeader() http.Header { + h := http.Header{} + h.Set("Authorization", "Bearer "+testAPIKey) + return h +} + +func doRequest(srv http.Handler, method, path string, body any, headers http.Header) *httptest.ResponseRecorder { + var buf bytes.Buffer + if body != nil { + json.NewEncoder(&buf).Encode(body) + } + req := httptest.NewRequest(method, path, &buf) + req.Header.Set("Content-Type", "application/json") + for k, vals := range headers { + for _, v := range vals { + req.Header.Set(k, v) + } + } + w := httptest.NewRecorder() + srv.ServeHTTP(w, req) + return w +} + +// --- auth tests -------------------------------------------------------------- + +func TestAuthRequired(t *testing.T) { + srv := newTestServer(newMockBackend()) + + w := doRequest(srv, http.MethodGet, "/v1/queues", nil, nil) + assert.Equal(t, http.StatusUnauthorized, w.Code) +} + +func TestAuthInvalidKey(t *testing.T) { + srv := newTestServer(newMockBackend()) + + h := http.Header{} + h.Set("Authorization", "Bearer wrong-key") + w := doRequest(srv, http.MethodGet, "/v1/queues", nil, h) + assert.Equal(t, http.StatusUnauthorized, w.Code) +} + +func TestAuthValidKey(t *testing.T) { + srv := newTestServer(newMockBackend()) + + w := doRequest(srv, http.MethodGet, "/v1/queues", nil, authHeader()) + assert.Equal(t, http.StatusOK, w.Code) +} + +// --- enqueue tests ----------------------------------------------------------- + +func TestEnqueue(t *testing.T) { + mb := newMockBackend() + srv := newTestServer(mb) + + body := map[string]any{ + "type": "send_email", + "queue": "emails", + "payload": map[string]string{"to": "test@example.com"}, + } + + w := doRequest(srv, http.MethodPost, "/v1/jobs", body, authHeader()) + require.Equal(t, http.StatusCreated, w.Code) + + var job queue.Job + require.NoError(t, json.NewDecoder(w.Body).Decode(&job)) + assert.Equal(t, "send_email", job.Type) + assert.Equal(t, "emails", job.Queue) + assert.Equal(t, queue.StatusPending, job.Status) + assert.Equal(t, queue.PriorityNormal, job.Priority) + assert.Equal(t, 3, job.MaxAttempts) +} + +func TestEnqueueWithOptionalFields(t *testing.T) { + mb := newMockBackend() + srv := newTestServer(mb) + + body := map[string]any{ + "type": "send_email", + "queue": "emails", + "payload": map[string]string{"to": "test@example.com"}, + "priority": 20, + "max_attempts": 5, + } + + w := doRequest(srv, http.MethodPost, "/v1/jobs", body, authHeader()) + require.Equal(t, http.StatusCreated, w.Code) + + var job queue.Job + require.NoError(t, json.NewDecoder(w.Body).Decode(&job)) + assert.Equal(t, 20, job.Priority) + assert.Equal(t, 5, job.MaxAttempts) +} + +func TestEnqueueValidationError(t *testing.T) { + srv := newTestServer(newMockBackend()) + + body := map[string]any{ + "type": "", + "queue": "emails", + } + + w := doRequest(srv, http.MethodPost, "/v1/jobs", body, authHeader()) + assert.Equal(t, http.StatusUnprocessableEntity, w.Code) +} + +func TestEnqueueBadJSON(t *testing.T) { + srv := newTestServer(newMockBackend()) + + req := httptest.NewRequest(http.MethodPost, "/v1/jobs", bytes.NewBufferString("{bad")) + req.Header.Set("Authorization", "Bearer "+testAPIKey) + w := httptest.NewRecorder() + srv.ServeHTTP(w, req) + assert.Equal(t, http.StatusBadRequest, w.Code) +} + +// --- get job tests ----------------------------------------------------------- + +func TestGetJob(t *testing.T) { + mb := newMockBackend() + srv := newTestServer(mb) + + job := queue.NewJob("test", "default", json.RawMessage(`{"key":"val"}`)) + mb.Enqueue(context.Background(), job) + + w := doRequest(srv, http.MethodGet, "/v1/jobs/"+job.ID, nil, authHeader()) + require.Equal(t, http.StatusOK, w.Code) + + var got queue.Job + require.NoError(t, json.NewDecoder(w.Body).Decode(&got)) + assert.Equal(t, job.ID, got.ID) +} + +func TestGetJobNotFound(t *testing.T) { + srv := newTestServer(newMockBackend()) + + w := doRequest(srv, http.MethodGet, "/v1/jobs/nonexistent", nil, authHeader()) + assert.Equal(t, http.StatusNotFound, w.Code) +} + +// --- retry tests ------------------------------------------------------------- + +func TestRetryJob(t *testing.T) { + mb := newMockBackend() + srv := newTestServer(mb) + + job := queue.NewJob("test", "default", json.RawMessage(`{"key":"val"}`)) + job.Status = queue.StatusDead + mb.Enqueue(context.Background(), job) + + w := doRequest(srv, http.MethodPost, "/v1/jobs/"+job.ID+"/retry", nil, authHeader()) + require.Equal(t, http.StatusOK, w.Code) + + var retried queue.Job + require.NoError(t, json.NewDecoder(w.Body).Decode(&retried)) + assert.Equal(t, queue.StatusPending, retried.Status) + assert.NotEqual(t, job.ID, retried.ID) + assert.Equal(t, "test", retried.Type) +} + +func TestRetryJobNotFound(t *testing.T) { + srv := newTestServer(newMockBackend()) + + w := doRequest(srv, http.MethodPost, "/v1/jobs/nonexistent/retry", nil, authHeader()) + assert.Equal(t, http.StatusNotFound, w.Code) +} + +// --- cancel tests ------------------------------------------------------------ + +func TestCancelPendingJob(t *testing.T) { + mb := newMockBackend() + srv := newTestServer(mb) + + job := queue.NewJob("test", "default", json.RawMessage(`{"key":"val"}`)) + mb.Enqueue(context.Background(), job) + + w := doRequest(srv, http.MethodPost, "/v1/jobs/"+job.ID+"/cancel", nil, authHeader()) + assert.Equal(t, http.StatusOK, w.Code) + + _, err := mb.GetJob(context.Background(), job.ID) + assert.Error(t, err) +} + +func TestCancelRunningJobConflict(t *testing.T) { + mb := newMockBackend() + srv := newTestServer(mb) + + job := queue.NewJob("test", "default", json.RawMessage(`{"key":"val"}`)) + job.Status = queue.StatusRunning + mb.Enqueue(context.Background(), job) + + w := doRequest(srv, http.MethodPost, "/v1/jobs/"+job.ID+"/cancel", nil, authHeader()) + assert.Equal(t, http.StatusConflict, w.Code) +} + +// --- delete tests ------------------------------------------------------------ + +func TestDeleteJob(t *testing.T) { + mb := newMockBackend() + srv := newTestServer(mb) + + job := queue.NewJob("test", "default", json.RawMessage(`{"key":"val"}`)) + mb.Enqueue(context.Background(), job) + + w := doRequest(srv, http.MethodDelete, "/v1/jobs/"+job.ID, nil, authHeader()) + assert.Equal(t, http.StatusNoContent, w.Code) +} + +func TestDeleteJobNotFound(t *testing.T) { + srv := newTestServer(newMockBackend()) + + w := doRequest(srv, http.MethodDelete, "/v1/jobs/nonexistent", nil, authHeader()) + assert.Equal(t, http.StatusNotFound, w.Code) +} + +// --- list queues tests ------------------------------------------------------- + +func TestListQueues(t *testing.T) { + mb := newMockBackend() + srv := newTestServer(mb) + + mb.Enqueue(context.Background(), queue.NewJob("a", "q1", json.RawMessage(`{}`))) + mb.Enqueue(context.Background(), queue.NewJob("b", "q2", json.RawMessage(`{}`))) + + w := doRequest(srv, http.MethodGet, "/v1/queues", nil, authHeader()) + require.Equal(t, http.StatusOK, w.Code) + + var queues []queue.Queue + require.NoError(t, json.NewDecoder(w.Body).Decode(&queues)) + assert.Len(t, queues, 2) +} + +// --- list jobs tests --------------------------------------------------------- + +func TestListJobs(t *testing.T) { + mb := newMockBackend() + srv := newTestServer(mb) + + mb.Enqueue(context.Background(), queue.NewJob("a", "emails", json.RawMessage(`{}`))) + mb.Enqueue(context.Background(), queue.NewJob("b", "emails", json.RawMessage(`{}`))) + mb.Enqueue(context.Background(), queue.NewJob("c", "other", json.RawMessage(`{}`))) + + w := doRequest(srv, http.MethodGet, "/v1/queues/emails/jobs", nil, authHeader()) + require.Equal(t, http.StatusOK, w.Code) + + var jobs []*queue.Job + require.NoError(t, json.NewDecoder(w.Body).Decode(&jobs)) + assert.Len(t, jobs, 2) +} + +func TestListJobsWithStatusFilter(t *testing.T) { + mb := newMockBackend() + srv := newTestServer(mb) + + j1 := queue.NewJob("a", "emails", json.RawMessage(`{}`)) + j2 := queue.NewJob("b", "emails", json.RawMessage(`{}`)) + j2.Status = queue.StatusCompleted + mb.Enqueue(context.Background(), j1) + mb.Enqueue(context.Background(), j2) + + w := doRequest(srv, http.MethodGet, "/v1/queues/emails/jobs?status=pending", nil, authHeader()) + require.Equal(t, http.StatusOK, w.Code) + + var jobs []*queue.Job + require.NoError(t, json.NewDecoder(w.Body).Decode(&jobs)) + assert.Len(t, jobs, 1) + assert.Equal(t, queue.StatusPending, jobs[0].Status) +} + +func TestListJobsEmptyReturnsEmptyArray(t *testing.T) { + srv := newTestServer(newMockBackend()) + + w := doRequest(srv, http.MethodGet, "/v1/queues/nothing/jobs", nil, authHeader()) + require.Equal(t, http.StatusOK, w.Code) + assert.Equal(t, "[]\n", w.Body.String()) +} diff --git a/internal/httpapi/middleware.go b/internal/httpapi/middleware.go new file mode 100644 index 0000000..e911001 --- /dev/null +++ b/internal/httpapi/middleware.go @@ -0,0 +1,29 @@ +package httpapi + +import ( + "net/http" + "strings" +) + +func (s *Server) apiKeyAuth(next http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if s.apiKey == "" { + next.ServeHTTP(w, r) + return + } + + auth := r.Header.Get("Authorization") + if auth == "" { + writeError(w, http.StatusUnauthorized, "missing Authorization header") + return + } + + token := strings.TrimPrefix(auth, "Bearer ") + if token == auth || token != s.apiKey { + writeError(w, http.StatusUnauthorized, "invalid API key") + return + } + + next.ServeHTTP(w, r) + }) +} diff --git a/internal/httpapi/server.go b/internal/httpapi/server.go new file mode 100644 index 0000000..be6d263 --- /dev/null +++ b/internal/httpapi/server.go @@ -0,0 +1,58 @@ +// Package httpapi provides the REST API server and handlers. +package httpapi + +import ( + "log/slog" + "net/http" + + "github.com/go-chi/chi/v5" + "github.com/go-chi/chi/v5/middleware" + + "github.com/reckziegelwilliam/queuekit/internal/backend" +) + +type Server struct { + router chi.Router + backend backend.Backend + logger *slog.Logger + apiKey string +} + +func NewServer(b backend.Backend, apiKey string, logger *slog.Logger) *Server { + s := &Server{ + router: chi.NewRouter(), + backend: b, + logger: logger, + apiKey: apiKey, + } + s.routes() + return s +} + +func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) { + s.router.ServeHTTP(w, r) +} + +func (s *Server) routes() { + s.router.Use(middleware.Recoverer) + s.router.Use(middleware.RequestID) + s.router.Use(middleware.RealIP) + + s.router.Route("/v1", func(r chi.Router) { + r.Use(s.apiKeyAuth) + + r.Post("/jobs", s.handleEnqueue) + r.Get("/jobs/{id}", s.handleGetJob) + r.Post("/jobs/{id}/retry", s.handleRetryJob) + r.Post("/jobs/{id}/cancel", s.handleCancelJob) + r.Delete("/jobs/{id}", s.handleDeleteJob) + + r.Get("/queues", s.handleListQueues) + r.Get("/queues/{name}/jobs", s.handleListJobs) + }) +} + +// MountDashboard attaches dashboard routes under /dashboard. +func (s *Server) MountDashboard(h http.Handler) { + s.router.Mount("/dashboard", h) +} diff --git a/internal/queue/.gitkeep b/internal/queue/.gitkeep deleted file mode 100644 index b13d2a6..0000000 --- a/internal/queue/.gitkeep +++ /dev/null @@ -1,2 +0,0 @@ -# Core domain types will go here - diff --git a/internal/queue/job.go b/internal/queue/job.go index f432c05..1706c50 100644 --- a/internal/queue/job.go +++ b/internal/queue/job.go @@ -1,3 +1,4 @@ +// Package queue defines the core domain models for jobs and queues. package queue import ( diff --git a/internal/worker/backoff.go b/internal/worker/backoff.go index 1bf679d..76da105 100644 --- a/internal/worker/backoff.go +++ b/internal/worker/backoff.go @@ -1,3 +1,4 @@ +// Package worker implements the worker pool, job execution, and retry strategies. package worker import ( diff --git a/internal/worker/handler.go b/internal/worker/handler.go index b0b1dea..2071227 100644 --- a/internal/worker/handler.go +++ b/internal/worker/handler.go @@ -8,7 +8,7 @@ import ( ) // Handler is a function that processes a single job. -// It receives a context (which may be cancelled on shutdown) and the job to process. +// It receives a context (which may be canceled on shutdown) and the job to process. // Return nil to indicate success, or a non-nil error to trigger a retry/DLQ. type Handler func(ctx context.Context, job *queue.Job) error diff --git a/internal/worker/pool.go b/internal/worker/pool.go index 8d35798..a23c22b 100644 --- a/internal/worker/pool.go +++ b/internal/worker/pool.go @@ -65,7 +65,7 @@ func NewPool(b backend.Backend, r *Registry, queues []QueueConfig, opts ...PoolO } // Start launches all workers as goroutines. It returns immediately; workers run -// in the background until Stop is called or the parent context is cancelled. +// in the background until Stop is called or the parent context is canceled. // // Start returns an error if the pool is already running. func (p *Pool) Start(ctx context.Context) error { diff --git a/internal/worker/worker.go b/internal/worker/worker.go index 1f6ff0f..0379378 100644 --- a/internal/worker/worker.go +++ b/internal/worker/worker.go @@ -100,7 +100,7 @@ func (w *Worker) State() WorkerState { } } -// Run starts the worker's polling loop. It blocks until ctx is cancelled. +// Run starts the worker's polling loop. It blocks until ctx is canceled. func (w *Worker) Run(ctx context.Context) { w.setState(StatusIdle, "") w.logger.Info("worker started", "worker_id", w.id, "queue", w.queueName) diff --git a/internal/worker/worker_test.go b/internal/worker/worker_test.go index 33ad0dc..da238e4 100644 --- a/internal/worker/worker_test.go +++ b/internal/worker/worker_test.go @@ -200,15 +200,15 @@ func TestRegistry_Overwrite(t *testing.T) { // Worker tests // --------------------------------------------------------------------------- -func newTestJob(jobType, queueName string) *queue.Job { - return queue.NewJob(jobType, queueName, json.RawMessage(`{"test":true}`)) +func newTestJob(jobType string) *queue.Job { + return queue.NewJob(jobType, "default", json.RawMessage(`{"test":true}`)) } func TestWorker_SuccessfulJob(t *testing.T) { b := newMockBackend() r := NewRegistry() - job := newTestJob("email.send", "default") + job := newTestJob("email.send") require.NoError(t, b.Enqueue(context.Background(), job)) executed := make(chan string, 1) @@ -246,7 +246,7 @@ func TestWorker_FailedJobNacked(t *testing.T) { b := newMockBackend() r := NewRegistry() - job := newTestJob("risky.task", "default") + job := newTestJob("risky.task") job.MaxAttempts = 3 require.NoError(t, b.Enqueue(context.Background(), job)) @@ -277,7 +277,7 @@ func TestWorker_NoHandlerNacks(t *testing.T) { b := newMockBackend() r := NewRegistry() // no handlers registered - job := newTestJob("unknown.type", "default") + job := newTestJob("unknown.type") require.NoError(t, b.Enqueue(context.Background(), job)) w := NewWorker("w1", "default", b, r, @@ -308,7 +308,7 @@ func TestWorker_StateTransitions(t *testing.T) { return nil }) - job := newTestJob("blocking.job", "default") + job := newTestJob("blocking.job") require.NoError(t, b.Enqueue(context.Background(), job)) w := NewWorker("w1", "default", b, r, @@ -393,7 +393,7 @@ func TestPool_ProcessesJobs(t *testing.T) { }) for i := 0; i < numJobs; i++ { - job := newTestJob("test.job", "default") + job := newTestJob("test.job") require.NoError(t, b.Enqueue(context.Background(), job)) }