diff --git a/adapter/broker/nsq/nsq.go b/adapter/broker/nsq/nsq.go new file mode 100644 index 0000000..2f36763 --- /dev/null +++ b/adapter/broker/nsq/nsq.go @@ -0,0 +1,359 @@ +// Package nsq provides an NSQ broker +package nsq + +import ( + "context" + "math/rand" + "sync" + "time" + + "github.com/google/uuid" + "github.com/nextmicro/next/broker" + "github.com/nsqio/go-nsq" +) + +type nsqBroker struct { + lookupdAddrs []string + addrs []string + opts broker.Options + config *nsq.Config + + sync.Mutex + running bool + p []*nsq.Producer + c []*subscriber +} + +type publication struct { + topic string + m *broker.Message + nm *nsq.Message + opts broker.PublishOptions + err error +} + +type subscriber struct { + topic string + opts broker.SubscribeOptions + + c *nsq.Consumer + + // handler so we can resubcribe + h nsq.HandlerFunc + // concurrency + n int +} + +var ( + DefaultConcurrentHandlers = 1 +) + +func (n *nsqBroker) Init(opts ...broker.Option) error { + for _, o := range opts { + o(&n.opts) + } + + var addrs []string + + for _, addr := range n.opts.Addrs { + if len(addr) > 0 { + addrs = append(addrs, addr) + } + } + + if len(addrs) == 0 { + addrs = []string{"127.0.0.1:4150"} + } + + n.addrs = addrs + n.configure(n.opts.Context) + return nil +} + +func (n *nsqBroker) configure(ctx context.Context) { + if v, ok := ctx.Value(lookupdAddrsKey{}).([]string); ok { + n.lookupdAddrs = v + } + + if v, ok := ctx.Value(consumerOptsKey{}).([]string); ok { + cfgFlag := &nsq.ConfigFlag{Config: n.config} + for _, opt := range v { + cfgFlag.Set(opt) + } + } +} + +func (n *nsqBroker) Options() broker.Options { + return n.opts +} + +func (n *nsqBroker) Address() string { + return n.addrs[rand.Intn(len(n.addrs))] +} + +func (n *nsqBroker) Connect() error { + n.Lock() + defer n.Unlock() + + if n.running { + return nil + } + + producers := make([]*nsq.Producer, 0, len(n.addrs)) + + // create producers + for _, addr := range n.addrs { + p, err := nsq.NewProducer(addr, n.config) + if err != nil { + return err + } + if err = p.Ping(); err != nil { + return err + } + producers = append(producers, p) + } + + // create consumers + for _, c := range n.c { + channel := c.opts.Queue + if len(channel) == 0 { + channel = uuid.New().String() + "#ephemeral" + } + + cm, err := nsq.NewConsumer(c.topic, channel, n.config) + if err != nil { + return err + } + + cm.AddConcurrentHandlers(c.h, c.n) + + c.c = cm + + if len(n.lookupdAddrs) > 0 { + c.c.ConnectToNSQLookupds(n.lookupdAddrs) + } else { + err = c.c.ConnectToNSQDs(n.addrs) + if err != nil { + return err + } + } + } + + n.p = producers + n.running = true + return nil +} + +func (n *nsqBroker) Disconnect() error { + n.Lock() + defer n.Unlock() + + if !n.running { + return nil + } + + // stop the producers + for _, p := range n.p { + p.Stop() + } + + // stop the consumers + for _, c := range n.c { + c.c.Stop() + + if len(n.lookupdAddrs) > 0 { + // disconnect from all lookupd + for _, addr := range n.lookupdAddrs { + c.c.DisconnectFromNSQLookupd(addr) + } + } else { + // disconnect from all nsq brokers + for _, addr := range n.addrs { + c.c.DisconnectFromNSQD(addr) + } + } + } + + n.p = nil + n.running = false + return nil +} + +func (n *nsqBroker) Publish(ctx context.Context, topic string, message *broker.Message, opts ...broker.PublishOption) error { + p := n.p[rand.Intn(len(n.p))] + + options := broker.PublishOptions{} + for _, o := range opts { + o(&options) + } + + var ( + doneChan chan *nsq.ProducerTransaction + delay time.Duration + ) + if options.Context != nil { + if v, ok := options.Context.Value(asyncPublishKey{}).(chan *nsq.ProducerTransaction); ok { + doneChan = v + } + if v, ok := options.Context.Value(deferredPublishKey{}).(time.Duration); ok { + delay = v + } + } + + b, err := n.opts.Codec.Marshal(message) + if err != nil { + return err + } + + if doneChan != nil { + if delay > 0 { + return p.DeferredPublishAsync(topic, delay, b, doneChan) + } + return p.PublishAsync(topic, b, doneChan) + } else { + if delay > 0 { + return p.DeferredPublish(topic, delay, b) + } + return p.Publish(topic, b) + } +} + +func (n *nsqBroker) Subscribe(topic string, handler broker.Handler, opts ...broker.SubscribeOption) (broker.Subscriber, error) { + options := broker.SubscribeOptions{ + AutoAck: true, + } + + for _, o := range opts { + o(&options) + } + + concurrency, maxInFlight := DefaultConcurrentHandlers, DefaultConcurrentHandlers + if options.Context != nil { + if v, ok := options.Context.Value(concurrentHandlerKey{}).(int); ok { + maxInFlight, concurrency = v, v + } + if v, ok := options.Context.Value(maxInFlightKey{}).(int); ok { + maxInFlight = v + } + } + channel := options.Queue + if len(channel) == 0 { + channel = uuid.New().String() + "#ephemeral" + } + config := *n.config + config.MaxInFlight = maxInFlight + + c, err := nsq.NewConsumer(topic, channel, &config) + if err != nil { + return nil, err + } + + h := nsq.HandlerFunc(func(nm *nsq.Message) error { + if !options.AutoAck { + nm.DisableAutoResponse() + } + + var m broker.Message + + if err := n.opts.Codec.Unmarshal(nm.Body, &m); err != nil { + return err + } + + p := &publication{topic: topic, nm: nm, m: &m} + p.err = handler(p) + return p.err + }) + + c.AddConcurrentHandlers(h, concurrency) + + if len(n.lookupdAddrs) > 0 { + err = c.ConnectToNSQLookupds(n.lookupdAddrs) + } else { + err = c.ConnectToNSQDs(n.addrs) + } + if err != nil { + return nil, err + } + + sub := &subscriber{ + c: c, + opts: options, + topic: topic, + h: h, + n: concurrency, + } + + n.c = append(n.c, sub) + + return sub, nil +} + +func (n *nsqBroker) String() string { + return "nsq" +} + +func (p *publication) Topic() string { + return p.topic +} + +func (p *publication) Message() *broker.Message { + return p.m +} + +func (p *publication) Ack() error { + p.nm.Finish() + return nil +} + +func (p *publication) Error() error { + return p.err +} + +func (s *subscriber) Options() broker.SubscribeOptions { + return s.opts +} + +func (s *subscriber) Topic() string { + return s.topic +} + +func (s *subscriber) Unsubscribe() error { + s.c.Stop() + return nil +} + +func NewBroker(opts ...broker.Option) broker.Broker { + options := broker.Options{ + // Default context + Context: context.Background(), + } + + for _, o := range opts { + o(&options) + } + + var addrs []string + + for _, addr := range options.Addrs { + if len(addr) > 0 { + addrs = append(addrs, addr) + } + } + + if len(addrs) == 0 { + addrs = []string{"127.0.0.1:4150"} + } + + n := &nsqBroker{ + addrs: addrs, + opts: options, + config: nsq.NewConfig(), + } + n.configure(n.opts.Context) + // wrap in reverse + for i := len(options.Wrappers); i > 0; i-- { + n = options.Wrappers[i-1](n) + } + + return n +} diff --git a/adapter/broker/nsq/options.go b/adapter/broker/nsq/options.go new file mode 100644 index 0000000..5a5985a --- /dev/null +++ b/adapter/broker/nsq/options.go @@ -0,0 +1,64 @@ +package nsq + +import ( + "context" + "time" + + "github.com/nextmicro/next/broker" + nsq "github.com/nsqio/go-nsq" +) + +type concurrentHandlerKey struct{} +type maxInFlightKey struct{} +type asyncPublishKey struct{} +type deferredPublishKey struct{} +type lookupdAddrsKey struct{} +type consumerOptsKey struct{} + +func WithConcurrentHandlers(n int) broker.SubscribeOption { + return func(o *broker.SubscribeOptions) { + if o.Context == nil { + o.Context = context.Background() + } + o.Context = context.WithValue(o.Context, concurrentHandlerKey{}, n) + } +} + +func WithMaxInFlight(n int) broker.SubscribeOption { + return func(o *broker.SubscribeOptions) { + if o.Context == nil { + o.Context = context.Background() + } + o.Context = context.WithValue(o.Context, maxInFlightKey{}, n) + } +} + +func WithAsyncPublish(doneChan chan *nsq.ProducerTransaction) broker.PublishOption { + return func(o *broker.PublishOptions) { + if o.Context == nil { + o.Context = context.Background() + } + o.Context = context.WithValue(o.Context, asyncPublishKey{}, doneChan) + } +} + +func WithDeferredPublish(delay time.Duration) broker.PublishOption { + return func(o *broker.PublishOptions) { + if o.Context == nil { + o.Context = context.Background() + } + o.Context = context.WithValue(o.Context, deferredPublishKey{}, delay) + } +} + +func WithLookupdAddrs(addrs []string) broker.Option { + return func(o *broker.Options) { + o.Context = context.WithValue(o.Context, lookupdAddrsKey{}, addrs) + } +} + +func WithConsumerOpts(consumerOpts []string) broker.Option { + return func(o *broker.Options) { + o.Context = context.WithValue(o.Context, consumerOptsKey{}, consumerOpts) + } +} diff --git a/adapter/broker/nsq/otelnsq/message.go b/adapter/broker/nsq/otelnsq/message.go new file mode 100644 index 0000000..90349b7 --- /dev/null +++ b/adapter/broker/nsq/otelnsq/message.go @@ -0,0 +1,115 @@ +package otelnsq + +import ( + "time" + + "github.com/IBM/sarama" + "go.opentelemetry.io/otel/propagation" +) + +var _ propagation.TextMapCarrier = (*ProducerMessageCarrier)(nil) +var _ propagation.TextMapCarrier = (*ConsumerMessageCarrier)(nil) + +// RecordHeader stores key and value for a record header +type RecordHeader struct { + Key []byte + Value []byte +} + +// ProducerMessage is the collection of elements passed to the Producer in order to send a message. +type ProducerMessage struct { + Topic string // The Kafka topic for this message. + Delay time.Duration // The delay to wait before sending the message. + Body []byte // The message contents. + // The headers are key-value pairs that are transparently passed + // by Kafka between producers and consumers. + Headers []RecordHeader +} + +// ProducerMessageCarrier injects and extracts traces from a sarama.ProducerMessage. +type ProducerMessageCarrier struct { + msg *ProducerMessage +} + +// NewProducerMessageCarrier creates a new ProducerMessageCarrier. +func NewProducerMessageCarrier(msg *ProducerMessage) ProducerMessageCarrier { + return ProducerMessageCarrier{msg: msg} +} + +// Get retrieves a single value for a given key. +func (c ProducerMessageCarrier) Get(key string) string { + for _, h := range c.msg.Headers { + if string(h.Key) == key { + return string(h.Value) + } + } + return "" +} + +// Set sets a header. +func (c ProducerMessageCarrier) Set(key, val string) { + // Ensure uniqueness of keys + for i := 0; i < len(c.msg.Headers); i++ { + if string(c.msg.Headers[i].Key) == key { + c.msg.Headers = append(c.msg.Headers[:i], c.msg.Headers[i+1:]...) + i-- + } + } + c.msg.Headers = append(c.msg.Headers, RecordHeader{ + Key: []byte(key), + Value: []byte(val), + }) +} + +// Keys returns a slice of all key identifiers in the carrier. +func (c ProducerMessageCarrier) Keys() []string { + out := make([]string, len(c.msg.Headers)) + for i, h := range c.msg.Headers { + out[i] = string(h.Key) + } + return out +} + +// ConsumerMessageCarrier injects and extracts traces from a sarama.ConsumerMessage. +type ConsumerMessageCarrier struct { + msg *sarama.ConsumerMessage +} + +// NewConsumerMessageCarrier creates a new ConsumerMessageCarrier. +func NewConsumerMessageCarrier(msg *sarama.ConsumerMessage) ConsumerMessageCarrier { + return ConsumerMessageCarrier{msg: msg} +} + +// Get retrieves a single value for a given key. +func (c ConsumerMessageCarrier) Get(key string) string { + for _, h := range c.msg.Headers { + if h != nil && string(h.Key) == key { + return string(h.Value) + } + } + return "" +} + +// Set sets a header. +func (c ConsumerMessageCarrier) Set(key, val string) { + // Ensure uniqueness of keys + for i := 0; i < len(c.msg.Headers); i++ { + if c.msg.Headers[i] != nil && string(c.msg.Headers[i].Key) == key { + c.msg.Headers = append(c.msg.Headers[:i], c.msg.Headers[i+1:]...) + i-- + } + } + c.msg.Headers = append(c.msg.Headers, &sarama.RecordHeader{ + Key: []byte(key), + Value: []byte(val), + }) +} + +// Keys returns a slice of all key identifiers in the carrier. +func (c ConsumerMessageCarrier) Keys() []string { + out := make([]string, len(c.msg.Headers)) + for i, h := range c.msg.Headers { + out[i] = string(h.Key) + } + return out +} diff --git a/go.mod b/go.mod index de0c1b5..5c396bb 100644 --- a/go.mod +++ b/go.mod @@ -23,6 +23,7 @@ require ( github.com/hashicorp/consul/api v1.26.1 github.com/nacos-group/nacos-sdk-go/v2 v2.2.5 github.com/nextmicro/logger v1.0.7 + github.com/nsqio/go-nsq v1.1.0 github.com/pkg/errors v0.9.1 github.com/prometheus/client_golang v1.17.0 go.etcd.io/etcd/client/v3 v3.5.11 diff --git a/go.sum b/go.sum index 2e4a5a8..207fb29 100644 --- a/go.sum +++ b/go.sum @@ -149,6 +149,7 @@ github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaS github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg= github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= +github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM= github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= @@ -302,6 +303,8 @@ github.com/nextmicro/gokit/trace v1.0.11/go.mod h1:CWM26HGU5jVdsx6mbH2gsVfGSB/3T github.com/nextmicro/logger v1.0.7 h1:pkWVdlkEyM8Gd2E4jBZDEokRym+SJYW7NZyqS+vQU1w= github.com/nextmicro/logger v1.0.7/go.mod h1:hRmEO3C1TcFGY/Bh0jYC2i54ZU7ZW0WXCo9tcrcyuw8= github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno= +github.com/nsqio/go-nsq v1.1.0 h1:PQg+xxiUjA7V+TLdXw7nVrJ5Jbl3sN86EhGCQj4+FYE= +github.com/nsqio/go-nsq v1.1.0/go.mod h1:vKq36oyeVXgsS5Q8YEO7WghqidAVXQlcFxzQbQTuDEY= github.com/opentracing/opentracing-go v1.2.1-0.20220228012449-10b1cf09e00b h1:FfH+VrHHk6Lxt9HdVS0PXzSXFyS2NbZKXv33FYPol0A= github.com/opentracing/opentracing-go v1.2.1-0.20220228012449-10b1cf09e00b/go.mod h1:AC62GU6hc0BrNm+9RK9VSiwa/EUe1bkIeFORAMcHvJU= github.com/openzipkin/zipkin-go v0.4.2 h1:zjqfqHjUpPmB3c1GlCvvgsM1G4LkvqQbBDueDOCg/jA= diff --git a/internal/matcher/middleware_test.go b/internal/matcher/middleware_test.go index 9c948ae..8b71f10 100644 --- a/internal/matcher/middleware_test.go +++ b/internal/matcher/middleware_test.go @@ -60,3 +60,4 @@ func TestMatcher(t *testing.T) { t.Fatal("not equal") } } +