From 49b08ab77cc688dbb092563bb038827403c921ed Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alexander=20Lagerstr=C3=B6m?= Date: Wed, 15 Oct 2025 09:40:13 +0200 Subject: [PATCH 1/2] add Celery task ID to the context To make it possible for the consumer of the message to see the task ID of the given task. This is necessary when interacting with flower for example. --- celery.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/celery.go b/celery.go index 5b06447..6e31659 100644 --- a/celery.go +++ b/celery.go @@ -286,6 +286,7 @@ type contextKey int const ( // ContextKeyTaskName is a context key to access task names. ContextKeyTaskName contextKey = iota + ContextKeyTaskId ) // executeTask calls the task function with args and kwargs from the message. @@ -304,6 +305,7 @@ func (a *App) executeTask(ctx context.Context, m *protocol.Task) (err error) { } ctx = context.WithValue(ctx, ContextKeyTaskName, m.Name) + ctx = context.WithValue(ctx, ContextKeyTaskId, m.ID) p := NewTaskParam(m.Args, m.Kwargs) return task(ctx, p) } From dd094a06879dbcc731ac2c44c70565268ece2c6d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alexander=20Lagerstr=C3=B6m?= Date: Wed, 15 Oct 2025 10:17:11 +0200 Subject: [PATCH 2/2] replaces github.com/marselester with /lagerstrom instead --- README.md | 4 ++-- bench-old.txt | 4 ++-- celery.go | 4 ++-- celery_test.go | 6 +++--- config.go | 2 +- examples/go.mod | 6 +++--- examples/rabbitmq/consumer/main.go | 4 ++-- examples/rabbitmq/producer/main.go | 4 ++-- examples/redis/consumer/main.go | 2 +- examples/redis/goredis/main.go | 4 ++-- examples/redis/metrics/main.go | 2 +- examples/redis/producer/main.go | 2 +- examples/redis/redis/main.go | 4 ++-- examples/redis/retry/main.go | 2 +- go.mod | 2 +- goredis/broker.go | 2 +- rabbitmq/broker.go | 2 +- redis/broker.go | 2 +- 18 files changed, 29 insertions(+), 29 deletions(-) diff --git a/README.md b/README.md index 6dc5bf8..3a43776 100644 --- a/README.md +++ b/README.md @@ -1,7 +1,7 @@ # Gopher Celery 🥬 -[![Documentation](https://godoc.org/github.com/marselester/gopher-celery?status.svg)](https://pkg.go.dev/github.com/marselester/gopher-celery) -[![Go Report Card](https://goreportcard.com/badge/github.com/marselester/gopher-celery)](https://goreportcard.com/report/github.com/marselester/gopher-celery) +[![Documentation](https://godoc.org/github.com/lagerstrom/gopher-celery?status.svg)](https://pkg.go.dev/github.com/lagerstrom/gopher-celery) +[![Go Report Card](https://goreportcard.com/badge/github.com/lagerstrom/gopher-celery)](https://goreportcard.com/report/github.com/lagerstrom/gopher-celery) The objective of this project is to provide the very basic mechanism to efficiently produce and consume Celery tasks on Go side. diff --git a/bench-old.txt b/bench-old.txt index 32f209a..425b5e2 100644 --- a/bench-old.txt +++ b/bench-old.txt @@ -1,6 +1,6 @@ goos: darwin goarch: amd64 -pkg: github.com/marselester/gopher-celery/internal/protocol +pkg: github.com/lagerstrom/gopher-celery/internal/protocol cpu: Intel(R) Core(TM) i5-10600 CPU @ 3.30GHz BenchmarkJSONSerializerEncode_v2NoParams-12 405000379 2.960 ns/op 0 B/op 0 allocs/op BenchmarkJSONSerializerEncode_v2NoParams-12 403379344 2.970 ns/op 0 B/op 0 allocs/op @@ -83,4 +83,4 @@ BenchmarkJSONSerializerEncode_v1ArgsKwargs-12 646405 1768 ns/op BenchmarkJSONSerializerEncode_v1ArgsKwargs-12 660436 1771 ns/op 1001 B/op 10 allocs/op BenchmarkJSONSerializerEncode_v1ArgsKwargs-12 668602 1773 ns/op 1001 B/op 10 allocs/op PASS -ok github.com/marselester/gopher-celery/internal/protocol 115.912s +ok github.com/lagerstrom/gopher-celery/internal/protocol 115.912s diff --git a/celery.go b/celery.go index 6e31659..1be4bb2 100644 --- a/celery.go +++ b/celery.go @@ -13,8 +13,8 @@ import ( "github.com/google/uuid" "golang.org/x/sync/errgroup" - "github.com/marselester/gopher-celery/protocol" - "github.com/marselester/gopher-celery/redis" + "github.com/lagerstrom/gopher-celery/protocol" + "github.com/lagerstrom/gopher-celery/redis" ) // TaskF represents a Celery task implemented by the client. diff --git a/celery_test.go b/celery_test.go index 9215f4a..0693bcb 100644 --- a/celery_test.go +++ b/celery_test.go @@ -11,9 +11,9 @@ import ( "github.com/go-kit/log" - "github.com/marselester/gopher-celery/goredis" - "github.com/marselester/gopher-celery/protocol" - "github.com/marselester/gopher-celery/rabbitmq" + "github.com/lagerstrom/gopher-celery/goredis" + "github.com/lagerstrom/gopher-celery/protocol" + "github.com/lagerstrom/gopher-celery/rabbitmq" ) func TestExecuteTaskPanic(t *testing.T) { diff --git a/config.go b/config.go index 2e7a2cf..b9b3a98 100644 --- a/config.go +++ b/config.go @@ -3,7 +3,7 @@ package celery import ( "github.com/go-kit/log" - "github.com/marselester/gopher-celery/protocol" + "github.com/lagerstrom/gopher-celery/protocol" ) // DefaultMaxWorkers is the default upper limit of goroutines diff --git a/examples/go.mod b/examples/go.mod index 0b0368b..02226e1 100644 --- a/examples/go.mod +++ b/examples/go.mod @@ -1,8 +1,8 @@ -module github.com/marselester/gopher-celery/examples +module github.com/lagerstrom/gopher-celery/examples go 1.21 -replace github.com/marselester/gopher-celery => ../ +replace github.com/lagerstrom/gopher-celery => ../ require ( github.com/go-kit/log v0.2.1 @@ -11,7 +11,7 @@ require ( github.com/oklog/run v1.1.0 github.com/prometheus/client_golang v1.20.5 github.com/redis/go-redis/v9 v9.7.0 - github.com/marselester/gopher-celery v0.0.0-00010101000000-000000000000 + github.com/lagerstrom/gopher-celery v0.0.0-00010101000000-000000000000 ) require ( diff --git a/examples/rabbitmq/consumer/main.go b/examples/rabbitmq/consumer/main.go index ed2cb2a..1b4dcf0 100644 --- a/examples/rabbitmq/consumer/main.go +++ b/examples/rabbitmq/consumer/main.go @@ -8,8 +8,8 @@ import ( "os/signal" "github.com/go-kit/log" - celery "github.com/marselester/gopher-celery" - celeryrabbitmq "github.com/marselester/gopher-celery/rabbitmq" + celery "github.com/lagerstrom/gopher-celery" + celeryrabbitmq "github.com/lagerstrom/gopher-celery/rabbitmq" ) func main() { diff --git a/examples/rabbitmq/producer/main.go b/examples/rabbitmq/producer/main.go index 1d8216c..cd562a9 100644 --- a/examples/rabbitmq/producer/main.go +++ b/examples/rabbitmq/producer/main.go @@ -5,8 +5,8 @@ import ( "os" "github.com/go-kit/log" - celery "github.com/marselester/gopher-celery" - celeryrabbitmq "github.com/marselester/gopher-celery/rabbitmq" + celery "github.com/lagerstrom/gopher-celery" + celeryrabbitmq "github.com/lagerstrom/gopher-celery/rabbitmq" ) func main() { diff --git a/examples/redis/consumer/main.go b/examples/redis/consumer/main.go index cf34eee..26632c4 100644 --- a/examples/redis/consumer/main.go +++ b/examples/redis/consumer/main.go @@ -8,7 +8,7 @@ import ( "os/signal" "github.com/go-kit/log" - celery "github.com/marselester/gopher-celery" + celery "github.com/lagerstrom/gopher-celery" ) func main() { diff --git a/examples/redis/goredis/main.go b/examples/redis/goredis/main.go index cf126f3..2f11e90 100644 --- a/examples/redis/goredis/main.go +++ b/examples/redis/goredis/main.go @@ -9,8 +9,8 @@ import ( "github.com/go-kit/log" "github.com/go-kit/log/level" - celery "github.com/marselester/gopher-celery" - celeryredis "github.com/marselester/gopher-celery/goredis" + celery "github.com/lagerstrom/gopher-celery" + celeryredis "github.com/lagerstrom/gopher-celery/goredis" "github.com/redis/go-redis/v9" ) diff --git a/examples/redis/metrics/main.go b/examples/redis/metrics/main.go index bf461f2..716d1f5 100644 --- a/examples/redis/metrics/main.go +++ b/examples/redis/metrics/main.go @@ -10,7 +10,7 @@ import ( "time" "github.com/go-kit/log" - celery "github.com/marselester/gopher-celery" + celery "github.com/lagerstrom/gopher-celery" "github.com/oklog/run" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promhttp" diff --git a/examples/redis/producer/main.go b/examples/redis/producer/main.go index dd76980..e8b0f19 100644 --- a/examples/redis/producer/main.go +++ b/examples/redis/producer/main.go @@ -5,7 +5,7 @@ import ( "os" "github.com/go-kit/log" - celery "github.com/marselester/gopher-celery" + celery "github.com/lagerstrom/gopher-celery" ) func main() { diff --git a/examples/redis/redis/main.go b/examples/redis/redis/main.go index 1599dbf..0053174 100644 --- a/examples/redis/redis/main.go +++ b/examples/redis/redis/main.go @@ -11,8 +11,8 @@ import ( "github.com/go-kit/log" "github.com/go-kit/log/level" "github.com/gomodule/redigo/redis" - celery "github.com/marselester/gopher-celery" - celeryredis "github.com/marselester/gopher-celery/redis" + celery "github.com/lagerstrom/gopher-celery" + celeryredis "github.com/lagerstrom/gopher-celery/redis" ) func main() { diff --git a/examples/redis/retry/main.go b/examples/redis/retry/main.go index 2d87c27..c6d0d77 100644 --- a/examples/redis/retry/main.go +++ b/examples/redis/retry/main.go @@ -11,7 +11,7 @@ import ( "github.com/go-kit/log" "github.com/marselester/backoff" - celery "github.com/marselester/gopher-celery" + celery "github.com/lagerstrom/gopher-celery" ) func main() { diff --git a/go.mod b/go.mod index 8fe680f..6338e75 100644 --- a/go.mod +++ b/go.mod @@ -1,4 +1,4 @@ -module github.com/marselester/gopher-celery +module github.com/lagerstrom/gopher-celery go 1.19 diff --git a/goredis/broker.go b/goredis/broker.go index 39f6da7..1a9d154 100644 --- a/goredis/broker.go +++ b/goredis/broker.go @@ -9,7 +9,7 @@ import ( "github.com/redis/go-redis/v9" - "github.com/marselester/gopher-celery/internal/broker" + "github.com/lagerstrom/gopher-celery/internal/broker" ) // DefaultReceiveTimeout defines how many seconds the broker's Receive command diff --git a/rabbitmq/broker.go b/rabbitmq/broker.go index 6aad314..4c470a2 100644 --- a/rabbitmq/broker.go +++ b/rabbitmq/broker.go @@ -9,7 +9,7 @@ import ( amqp "github.com/rabbitmq/amqp091-go" - "github.com/marselester/gopher-celery/internal/broker" + "github.com/lagerstrom/gopher-celery/internal/broker" ) // DefaultAmqpUri defines the default AMQP URI which is used to connect to RabbitMQ. diff --git a/redis/broker.go b/redis/broker.go index 747fbe9..e004b3a 100644 --- a/redis/broker.go +++ b/redis/broker.go @@ -8,7 +8,7 @@ import ( "github.com/gomodule/redigo/redis" - "github.com/marselester/gopher-celery/internal/broker" + "github.com/lagerstrom/gopher-celery/internal/broker" ) // DefaultReceiveTimeout defines how many seconds the broker's Receive command