diff --git a/messaging/natsjsdlq/natsjsdlq.go b/messaging/natsjsdlq/natsjsdlq.go new file mode 100644 index 0000000..ab4f469 --- /dev/null +++ b/messaging/natsjsdlq/natsjsdlq.go @@ -0,0 +1,148 @@ +package natsjsdlq + +import ( + "fmt" + "time" + + "github.com/nats-io/nats.go" +) + +type JetStreamContext interface { + AddStream(cfg *nats.StreamConfig, opts ...nats.JSOpt) (*nats.StreamInfo, error) + PublishMsg(msg *nats.Msg, opts ...nats.PubOpt) (*nats.PubAck, error) +} + +type Msg interface { + Metadata() (*nats.MsgMetadata, error) +} + +// Config holds DLQ configuration +type Config struct { + // StreamName for the DLQ + StreamName string + + // Subject to publish dead letters to + Subject string + + // MaxDeliveries before message is considered dead + MaxDeliveries int + + // Storage type for the DLQ stream + Storage nats.StorageType + + // Optional handler for DLQ errors + ErrorHandler func(error) +} + +type Dependencies struct { + JetStream JetStreamContext +} + +// Handler manages dead letter queue operations +type Handler struct { + config Config + js JetStreamContext +} + +// NewHandler creates a new DLQ handler +func NewHandler(deps Dependencies, config Config) (*Handler, error) { + if err := validateConfig(deps, config); err != nil { + return nil, fmt.Errorf("invalid DLQ configuration: %w", err) + } + + handler := &Handler{ + config: config, + js: deps.JetStream, + } + + if err := handler.setup(); err != nil { + return nil, err + } + + return handler, nil +} + +func validateConfig(deps Dependencies, config Config) error { + if deps.JetStream == nil { + return fmt.Errorf("JetStream context is required") + } + + if config.StreamName == "" { + return fmt.Errorf("stream name is required") + } + + if config.Subject == "" { + return fmt.Errorf("subject is required") + } + + if config.MaxDeliveries <= 0 { + return fmt.Errorf("max deliveries must be greater than 0") + } + + if config.Storage == 0 { + config.Storage = nats.FileStorage + } + + return nil +} + +// setup ensures the DLQ stream exists +func (h *Handler) setup() error { + streamConfig := &nats.StreamConfig{ + Name: h.config.StreamName, + Subjects: []string{h.config.Subject}, + Storage: h.config.Storage, + Retention: nats.WorkQueuePolicy, + } + + _, err := h.js.AddStream(streamConfig) + if err != nil && err != nats.ErrStreamNameAlreadyInUse { + return fmt.Errorf("failed to create DLQ stream: %w", err) + } + + return nil +} + +// PublishMessage sends a message to the DLQ +func (h *Handler) PublishMessage(msg *nats.Msg, reason string) error { + // Clone original message headers + headers := nats.Header{} + if msg.Header != nil { + for k, v := range msg.Header { + headers[k] = v + } + } + + // Add DLQ metadata + headers.Set("DLQ-Reason", reason) + headers.Set("DLQ-Timestamp", time.Now().UTC().Format(time.RFC3339)) + headers.Set("Original-Subject", msg.Subject) + if msg.Header != nil { + headers.Set("Original-Message-ID", msg.Header.Get("Nats-Msg-Id")) + } + + dlqMsg := nats.NewMsg(h.config.Subject) + dlqMsg.Header = headers + dlqMsg.Data = msg.Data + + // Publish to DLQ + _, err := h.js.PublishMsg(dlqMsg) + if err != nil && h.config.ErrorHandler != nil { + h.config.ErrorHandler(fmt.Errorf("failed to publish to DLQ: %w", err)) + } + + return err +} + +// ShouldDLQ determines if a message should be sent to DLQ based on delivery count +func (h *Handler) ShouldDLQ(msg Msg) bool { + metadata, err := msg.Metadata() + if err != nil { + if h.config.ErrorHandler != nil { + h.config.ErrorHandler(fmt.Errorf("failed to get message metadata: %w", err)) + } + return false + } + + return metadata.NumDelivered >= uint64(h.config.MaxDeliveries) +} diff --git a/messaging/natsjsdlq/natsjsdlq_test.go b/messaging/natsjsdlq/natsjsdlq_test.go new file mode 100644 index 0000000..71ce596 --- /dev/null +++ b/messaging/natsjsdlq/natsjsdlq_test.go @@ -0,0 +1,282 @@ +package natsjsdlq_test + +import ( + "errors" + "testing" + + "github.com/nats-io/nats.go" + "github.com/simiancreative/simiango/messaging/natsjsdlq" + "github.com/stretchr/testify/assert" +) + +// MockJetStreamContext is a test double for nats.JetStreamContext +type MockJetStreamContext struct { + AddStreamFunc func(*nats.StreamConfig, ...nats.JSOpt) (*nats.StreamInfo, error) + PublishMsgFunc func(*nats.Msg, ...nats.PubOpt) (*nats.PubAck, error) + publishCalls []*nats.Msg +} + +func (m *MockJetStreamContext) AddStream( + cfg *nats.StreamConfig, + opts ...nats.JSOpt, +) (*nats.StreamInfo, error) { + if m.AddStreamFunc != nil { + return m.AddStreamFunc(cfg, opts...) + } + return &nats.StreamInfo{Config: *cfg}, nil +} + +func (m *MockJetStreamContext) PublishMsg( + msg *nats.Msg, + opts ...nats.PubOpt, +) (*nats.PubAck, error) { + m.publishCalls = append(m.publishCalls, msg) + if m.PublishMsgFunc != nil { + return m.PublishMsgFunc(msg, opts...) + } + return &nats.PubAck{}, nil +} + +func TestNewHandler(t *testing.T) { + tests := []struct { + name string + deps natsjsdlq.Dependencies + config natsjsdlq.Config + wantErr bool + errMsg string + }{ + { + name: "valid configuration", + deps: natsjsdlq.Dependencies{ + JetStream: &MockJetStreamContext{}, + }, + config: natsjsdlq.Config{ + StreamName: "test_dlq", + Subject: "test.dlq", + MaxDeliveries: 3, + Storage: nats.FileStorage, + }, + wantErr: false, + }, + { + name: "missing jetstream", + deps: natsjsdlq.Dependencies{ + JetStream: nil, + }, + config: natsjsdlq.Config{ + StreamName: "test_dlq", + Subject: "test.dlq", + }, + wantErr: true, + errMsg: "JetStream context is required", + }, + { + name: "missing required config", + deps: natsjsdlq.Dependencies{ + JetStream: &MockJetStreamContext{}, + }, + config: natsjsdlq.Config{}, + wantErr: true, + errMsg: "stream name is required", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + handler, err := natsjsdlq.NewHandler(tt.deps, tt.config) + if tt.wantErr { + assert.Error(t, err) + assert.Contains(t, err.Error(), tt.errMsg) + assert.Nil(t, handler) + } else { + assert.NoError(t, err) + assert.NotNil(t, handler) + } + }) + } +} + +func TestHandlerPublishMessage(t *testing.T) { + tests := []struct { + name string + msg *nats.Msg + reason string + publishErr error + wantErr bool + validateHeader func(*testing.T, nats.Header) + }{ + { + name: "successful publish", + msg: &nats.Msg{ + Subject: "original.subject", + Data: []byte("test data"), + Header: nats.Header{"Original-Key": []string{"value"}}, + }, + reason: "test failure", + validateHeader: func(t *testing.T, h nats.Header) { + assert.Equal(t, "test failure", h.Get("DLQ-Reason")) + assert.Equal(t, "original.subject", h.Get("Original-Subject")) + assert.Equal(t, "value", h.Get("Original-Key")) + assert.NotEmpty(t, h.Get("DLQ-Timestamp")) + }, + }, + { + name: "publish with no headers", + msg: &nats.Msg{ + Subject: "original.subject", + Data: []byte("test data"), + }, + reason: "test failure", + validateHeader: func(t *testing.T, h nats.Header) { + assert.Equal(t, "test failure", h.Get("DLQ-Reason")) + assert.Equal(t, "original.subject", h.Get("Original-Subject")) + }, + }, + { + name: "publish error", + msg: &nats.Msg{ + Subject: "original.subject", + Data: []byte("test data"), + }, + reason: "test failure", + publishErr: errors.New("publish failed"), + wantErr: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + mock := &MockJetStreamContext{ + PublishMsgFunc: func(msg *nats.Msg, opts ...nats.PubOpt) (*nats.PubAck, error) { + if tt.publishErr != nil { + return nil, tt.publishErr + } + return &nats.PubAck{}, nil + }, + } + + var errorCaught error + handler, err := natsjsdlq.NewHandler( + natsjsdlq.Dependencies{JetStream: mock}, + natsjsdlq.Config{ + StreamName: "test_dlq", + Subject: "test.dlq", + MaxDeliveries: 3, + ErrorHandler: func(err error) { + errorCaught = err + }, + }, + ) + assert.NoError(t, err) + + err = handler.PublishMessage(tt.msg, tt.reason) + if tt.wantErr { + assert.Error(t, err) + assert.NotNil(t, errorCaught) + } else { + assert.NoError(t, err) + assert.Nil(t, errorCaught) + assert.Len(t, mock.publishCalls, 1) + if tt.validateHeader != nil { + tt.validateHeader(t, mock.publishCalls[0].Header) + } + } + }) + } +} + +type MockMsg struct { + *nats.Msg + metadata *nats.MsgMetadata + metadataError error +} + +func (m *MockMsg) Metadata() (*nats.MsgMetadata, error) { + if m.metadataError != nil { + return nil, m.metadataError + } + return m.metadata, nil +} + +func TestHandlerShouldDLQ(t *testing.T) { + tests := []struct { + name string + msg func() *MockMsg + maxDeliveries int + want bool + wantErr bool + }{ + { + name: "should dlq when deliveries exceeded", + msg: func() *MockMsg { + return &MockMsg{ + Msg: &nats.Msg{ + Subject: "test.subject", + Data: []byte("test data"), + }, + metadata: &nats.MsgMetadata{ + NumDelivered: 4, + }, + } + }, + maxDeliveries: 3, + want: true, + }, + { + name: "should not dlq when under max deliveries", + msg: func() *MockMsg { + return &MockMsg{ + Msg: &nats.Msg{ + Subject: "test.subject", + Data: []byte("test data"), + }, + metadata: &nats.MsgMetadata{ + NumDelivered: 2, + }, + } + }, + maxDeliveries: 3, + want: false, + }, + { + name: "should not dlq on metadata error", + msg: func() *MockMsg { + return &MockMsg{ + Msg: &nats.Msg{ + Subject: "test.subject", + Data: []byte("test data"), + }, + metadataError: errors.New("metadata error"), + } + }, + maxDeliveries: 3, + want: false, + wantErr: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + // Create a message with mock metadata function + msg := tt.msg() + + var errorCaught error + handler, err := natsjsdlq.NewHandler(natsjsdlq.Dependencies{ + JetStream: &MockJetStreamContext{}, + }, natsjsdlq.Config{ + StreamName: "test_dlq", + Subject: "test.dlq", + MaxDeliveries: tt.maxDeliveries, + ErrorHandler: func(err error) { + errorCaught = err + }, + }) + assert.NoError(t, err) + + got := handler.ShouldDLQ(msg) + assert.Equal(t, tt.want, got) + + assert.Equal(t, tt.wantErr, errorCaught != nil) + }) + } +}