From 4d8a21c2e52e600fabaf8c19fa33f4776dfc9b0b Mon Sep 17 00:00:00 2001 From: Ross Nelson Date: Fri, 10 Jan 2025 16:15:30 -0500 Subject: [PATCH] Add Nats Jetstream --- .DS_Store | Bin 6148 -> 6148 bytes README.md | 2 +- go.mod | 13 +- go.sum | 19 ++- messaging/natsjs/argo-event.go | 73 +++++++++++ messaging/natsjs/argo-event_test.go | 182 ++++++++++++++++++++++++++++ messaging/natsjs/client.go | 33 +++++ messaging/natsjs/consumer.go | 127 +++++++++++++++++++ messaging/natsjs/consumer_test.go | 131 ++++++++++++++++++++ messaging/natsjs/event.go | 16 +++ messaging/natsjs/event_test.go | 46 +++++++ messaging/natsjs/message.go | 52 ++++++++ messaging/natsjs/message_test.go | 95 +++++++++++++++ messaging/natsjs/stream.go | 91 ++++++++++++++ messaging/natsjs/stream_test.go | 83 +++++++++++++ mocks/nats/argo-event.go | 39 ++++++ mocks/nats/argo-event_test.go | 32 +++++ mocks/nats/msg.go | 96 +++++++++++++++ mocks/nats/msg_test.go | 72 +++++++++++ mocks/nats/server.go | 33 +++++ 20 files changed, 1230 insertions(+), 5 deletions(-) create mode 100644 messaging/natsjs/argo-event.go create mode 100644 messaging/natsjs/argo-event_test.go create mode 100644 messaging/natsjs/client.go create mode 100644 messaging/natsjs/consumer.go create mode 100644 messaging/natsjs/consumer_test.go create mode 100644 messaging/natsjs/event.go create mode 100644 messaging/natsjs/event_test.go create mode 100644 messaging/natsjs/message.go create mode 100644 messaging/natsjs/message_test.go create mode 100644 messaging/natsjs/stream.go create mode 100644 messaging/natsjs/stream_test.go create mode 100644 mocks/nats/argo-event.go create mode 100644 mocks/nats/argo-event_test.go create mode 100644 mocks/nats/msg.go create mode 100644 mocks/nats/msg_test.go create mode 100644 mocks/nats/server.go diff --git a/.DS_Store b/.DS_Store index 5008ddfcf53c02e82d7eee2e57c38e5672ef89f6..b80f2142e2877f1ad659307722e3057066eb0017 100644 GIT binary patch delta 300 zcmZoMXfc=|#>B`mF;Q%yo}wrt0|NsP3otMwGNdzPGUPF&CzVf3Tt3-Agr%O7A(tVQ zp%@5}B#R4@a`Kaa+IA!r+X zKR+i4#!gHM%S%}XhcjzE&(1cG=0 ziRx-2V?!MULv!O=9ffL3BLf`;6JxX5T22m8Wqs?Q`0SkAy!|GKFarQuMN9$! delta 69 zcmZoMXfc=|#>AjHu~2NHo+1YW5HK<@2yFhyD8{ylX%^#Vb`E|Hpgd6EJM(0I5k*d* SG(!SN-DDFU<;^i7E0_TqCl31n diff --git a/README.md b/README.md index 890ed7c..2fb5252 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,6 @@ # Simian Go -[![Coverage Status](https://coveralls.io/repos/github/simiancreative/simiango/badge.svg?branch=master)](https://coveralls.io/github/simiancreative/simiango?branch=master) +[![codecov](https://codecov.io/gh/simiancreative/simiango/graph/badge.svg?token=OYPECDAI3X)](https://codecov.io/gh/simiancreative/simiango) [![tests](https://github.com/simiancreative/simiango/workflows/CI/badge.svg)](https://github.com/simiancreative/simiango/actions) [![Godoc](http://img.shields.io/badge/godoc-reference-blue.svg?style=flat)](https://godoc.org/github.com/simiancreative/simiango) [![license](http://img.shields.io/badge/license-MIT-red.svg?style=flat)](https://github.com/simiancreative/simiango/blob/master/LICENSE) diff --git a/go.mod b/go.mod index c6f2294..29ec9e5 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,8 @@ module github.com/simiancreative/simiango -go 1.21.7 +go 1.22 + +toolchain go1.22.10 require ( github.com/AlecAivazis/survey/v2 v2.3.5 @@ -28,6 +30,8 @@ require ( github.com/joho/godotenv v1.4.0 github.com/kr/pretty v0.3.0 github.com/mandrigin/gin-spa v0.0.0-20200212133200-790d0c0c7335 + github.com/nats-io/nats-server/v2 v2.10.24 + github.com/nats-io/nats.go v1.38.0 github.com/p768lwy3/gin-server-timing v0.0.0-20200316080543-ab69795cf847 github.com/pkg/errors v0.9.1 github.com/prometheus/client_golang v1.13.0 @@ -72,7 +76,7 @@ require ( github.com/jackc/pgio v1.0.0 // indirect github.com/json-iterator/go v1.1.12 // indirect github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51 // indirect - github.com/klauspost/compress v1.16.0 // indirect + github.com/klauspost/compress v1.17.11 // indirect github.com/klauspost/cpuid/v2 v2.2.7 // indirect github.com/kr/text v0.2.0 // indirect github.com/leodido/go-urn v1.4.0 // indirect @@ -82,10 +86,14 @@ require ( github.com/mattn/go-runewidth v0.0.13 // indirect github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect github.com/mgutz/ansi v0.0.0-20170206155736-9520e82c474b // indirect + github.com/minio/highwayhash v1.0.3 // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect github.com/muesli/reflow v0.2.1-0.20210115123740-9e1d0d53df68 // indirect github.com/muesli/termenv v0.11.1-0.20220204035834-5ac8409525e0 // indirect + github.com/nats-io/jwt/v2 v2.7.3 // indirect + github.com/nats-io/nkeys v0.4.9 // indirect + github.com/nats-io/nuid v1.0.1 // indirect github.com/onsi/gomega v1.27.1 // indirect github.com/pelletier/go-toml/v2 v2.2.1 // indirect github.com/pierrec/lz4/v4 v4.1.15 // indirect @@ -103,6 +111,7 @@ require ( golang.org/x/sys v0.28.0 // indirect golang.org/x/term v0.27.0 // indirect golang.org/x/text v0.21.0 // indirect + golang.org/x/time v0.8.0 // indirect google.golang.org/protobuf v1.34.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index 7e0678d..3cab38e 100644 --- a/go.sum +++ b/go.sum @@ -351,8 +351,8 @@ github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51 h1:Z9n2FFNU github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51/go.mod h1:CzGEWj7cYgsdH8dAjBGEr58BoE7ScuLd+fwFZ44+/x8= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= github.com/klauspost/compress v1.15.7/go.mod h1:PhcZ0MbTNciWF3rruxRgKxI5NkcHHrHUDtV4Yw2GlzU= -github.com/klauspost/compress v1.16.0 h1:iULayQNOReoYUe+1qtKOqw9CwJv3aNQu8ivo7lw1HU4= -github.com/klauspost/compress v1.16.0/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= +github.com/klauspost/compress v1.17.11 h1:In6xLpyWOi1+C7tXUUWv2ot1QvBjxevKAaI6IXrJmUc= +github.com/klauspost/compress v1.17.11/go.mod h1:pMDklpSncoRMuLFrf1W9Ss9KT+0rH90U12bZKk7uwG0= github.com/klauspost/cpuid/v2 v2.0.9/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg= github.com/klauspost/cpuid/v2 v2.2.7 h1:ZWSB3igEs+d0qvnxR/ZBzXVmxkgt8DdzP6m9pfuVLDM= github.com/klauspost/cpuid/v2 v2.2.7/go.mod h1:Lcz8mBdAVJIBVzewtcLocK12l3Y+JytZYpaMropDUws= @@ -414,6 +414,8 @@ github.com/matttproud/golang_protobuf_extensions v1.0.1 h1:4hp9jkHxhMHkqkrB3Ix0j github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0= github.com/mgutz/ansi v0.0.0-20170206155736-9520e82c474b h1:j7+1HpAFS1zy5+Q4qx1fWh90gTKwiN4QCGoY9TWyyO4= github.com/mgutz/ansi v0.0.0-20170206155736-9520e82c474b/go.mod h1:01TrycV0kFyexm33Z7vhZRXopbI8J3TDReVlkTgMUxE= +github.com/minio/highwayhash v1.0.3 h1:kbnuUMoHYyVl7szWjSxJnxw11k2U709jqFPPmIUyD6Q= +github.com/minio/highwayhash v1.0.3/go.mod h1:GGYsuwP/fPD6Y9hMiXuapVvlIUEhFhMTh0rxU3ik1LQ= github.com/mitchellh/mapstructure v0.0.0-20170523030023-d0303fe80992/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y= github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg= @@ -429,6 +431,16 @@ github.com/muesli/termenv v0.11.1-0.20220204035834-5ac8409525e0 h1:STjmj0uFfRryL github.com/muesli/termenv v0.11.1-0.20220204035834-5ac8409525e0/go.mod h1:Bd5NYQ7pd+SrtBSrSNoBBmXlcY8+Xj4BMJgh8qcZrvs= github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= +github.com/nats-io/jwt/v2 v2.7.3 h1:6bNPK+FXgBeAqdj4cYQ0F8ViHRbi7woQLq4W29nUAzE= +github.com/nats-io/jwt/v2 v2.7.3/go.mod h1:GvkcbHhKquj3pkioy5put1wvPxs78UlZ7D/pY+BgZk4= +github.com/nats-io/nats-server/v2 v2.10.24 h1:KcqqQAD0ZZcG4yLxtvSFJY7CYKVYlnlWoAiVZ6i/IY4= +github.com/nats-io/nats-server/v2 v2.10.24/go.mod h1:olvKt8E5ZlnjyqBGbAXtxvSQKsPodISK5Eo/euIta4s= +github.com/nats-io/nats.go v1.38.0 h1:A7P+g7Wjp4/NWqDOOP/K6hfhr54DvdDQUznt5JFg9XA= +github.com/nats-io/nats.go v1.38.0/go.mod h1:IGUM++TwokGnXPs82/wCuiHS02/aKrdYUQkU8If6yjw= +github.com/nats-io/nkeys v0.4.9 h1:qe9Faq2Gxwi6RZnZMXfmGMZkg3afLLOtrU+gDZJ35b0= +github.com/nats-io/nkeys v0.4.9/go.mod h1:jcMqs+FLG+W5YO36OX6wFIFcmpdAns+w1Wm6D3I/evE= +github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= +github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A= github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE= github.com/nxadm/tail v1.4.8/go.mod h1:+ncqLTQzXmGhMZNUePPaPqPvBxHAIsmXswZKocGu+AU= @@ -799,6 +811,7 @@ golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.17.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.21.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/sys v0.28.0 h1:Fksou7UEQUWlKvIdsqzJmUmCX3cZuD2+P3XyyzwMhlA= golang.org/x/sys v0.28.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw= @@ -828,6 +841,8 @@ golang.org/x/time v0.0.0-20170424234030-8be79e1e0910/go.mod h1:tRJNPiyCQ0inRvYxb golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= +golang.org/x/time v0.8.0 h1:9i3RxcPv3PZnitoVGMPDKZSq1xW1gK1Xy3ArNOGZfEg= +golang.org/x/time v0.8.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY= diff --git a/messaging/natsjs/argo-event.go b/messaging/natsjs/argo-event.go new file mode 100644 index 0000000..922f32c --- /dev/null +++ b/messaging/natsjs/argo-event.go @@ -0,0 +1,73 @@ +package natsjs + +import ( + "encoding/base64" + "encoding/json" + + "github.com/simiancreative/simiango/errors" +) + +func ArgoEventFromString(str string) (ArgoEvent, error) { + var e ArgoEvent + err := json.Unmarshal([]byte(str), &e) + return e, err +} + +func ArgoUnmarshalEvent(dest interface{}, str string) (err error) { + newEvent, err := ArgoEventFromString(str) + if err != nil { + return errors.Wrap(err, "failed to unmarshal event") + } + + str, err = newEvent.JSONBody() + if err != nil { + return errors.Wrap(err, "failed to extract JSON body") + } + + err = json.Unmarshal([]byte(str), &dest) + if err != nil { + return errors.Wrap(err, "failed to unmarshal event") + } + + return nil +} + +// EventContext represents the context of an event +type ArgoEventContext struct { + ID string `json:"id"` + Source string `json:"source"` + SpecVersion string `json:"specversion"` + Type string `json:"type"` + DataContentType string `json:"datacontenttype"` + Subject string `json:"subject"` + Time string `json:"time"` +} + +// Event represents an event received from an argo event source +type ArgoEvent struct { + Context ArgoEventContext `json:"context"` + Data string `json:"data"` +} + +type ArgoEventData struct { + Subject string `json:"subject"` + Body interface{} `json:"body"` +} + +func (e ArgoEvent) JSONBody() (string, error) { + var data ArgoEventData + + bytes, err := base64.StdEncoding.DecodeString(e.Data) + if err != nil { + return "", err + } + + err = json.Unmarshal(bytes, &data) + if err != nil { + return "", err + } + + bytes, err = json.Marshal(data.Body) + + return string(bytes), err +} diff --git a/messaging/natsjs/argo-event_test.go b/messaging/natsjs/argo-event_test.go new file mode 100644 index 0000000..7611258 --- /dev/null +++ b/messaging/natsjs/argo-event_test.go @@ -0,0 +1,182 @@ +package natsjs_test + +import ( + "encoding/base64" + "encoding/json" + "testing" + + "github.com/simiancreative/simiango/messaging/natsjs" + "github.com/stretchr/testify/assert" +) + +func TestArgoEventFromString(t *testing.T) { + testEvent := natsjs.ArgoEvent{ + Context: natsjs.ArgoEventContext{ + ID: "testID", + Source: "testSource", + SpecVersion: "testSpecVersion", + Type: "testType", + DataContentType: "testDataContentType", + Subject: "testSubject", + Time: "testTime", + }, + Data: "testData", + } + + bytes, _ := json.Marshal(testEvent) + str := string(bytes) + + event, err := natsjs.ArgoEventFromString(str) + + assert.NoError(t, err) + assert.Equal(t, testEvent.Context.ID, event.Context.ID) + + // Test with invalid JSON + _, err = natsjs.ArgoEventFromString("{invalid json}") + assert.Error(t, err) +} + +func TestJSONBody(t *testing.T) { + testData := natsjs.ArgoEventData{ + Subject: "testSubject", + Body: "testBody", + } + + bytes, _ := json.Marshal(testData) + str := base64.StdEncoding.EncodeToString(bytes) + + testEvent := natsjs.ArgoEvent{ + Context: natsjs.ArgoEventContext{ + ID: "testID", + Source: "testSource", + SpecVersion: "testSpecVersion", + Type: "testType", + DataContentType: "testDataContentType", + Subject: "testSubject", + Time: "testTime", + }, + Data: str, + } + + body, err := testEvent.JSONBody() + + assert.NoError(t, err) + assert.Equal(t, `"testBody"`, body) + + // Test with invalid base64 data + testEvent.Data = "invalid base64" + + _, err = testEvent.JSONBody() + assert.Error(t, err) +} + +func TestBadJSONBody(t *testing.T) { + str := base64.StdEncoding.EncodeToString([]byte("invalidJSON")) + + testEvent := natsjs.ArgoEvent{ + Context: natsjs.ArgoEventContext{ + ID: "testID", + Source: "testSource", + SpecVersion: "testSpecVersion", + Type: "testType", + DataContentType: "testDataContentType", + Subject: "testSubject", + Time: "testTime", + }, + Data: str, + } + + body, err := testEvent.JSONBody() + assert.Error(t, err) + assert.Equal(t, "", body) +} + +func TestArgoUnmarshalEvent(t *testing.T) { + // Sample JSON string representing an ArgoEvent + sampleJSON := `{ + "context": { + "id": "1234", + "source": "source", + "specversion": "1.0", + "type": "type", + "datacontenttype": "application/json", + "subject": "subject", + "time": "2023-10-01T00:00:00Z" + }, + "data": "eyJzdWJqZWN0IjoiZXhhbXBsZSIsImJvZHkiOnsia2V5IjoidmFsdWUifX0=" + }` + + // Destination variable + var dest map[string]interface{} + + // Call the function + err := natsjs.ArgoUnmarshalEvent(&dest, sampleJSON) + if err != nil { + t.Fatalf("Expected no error, got %v", err) + } + + // Verify the results + expectedBody := map[string]interface{}{"key": "value"} + if dest["key"] != expectedBody["key"] { + t.Errorf("Expected body %v, got %v", expectedBody, dest) + } +} + +func TestArgoUnmarshalEvent_Errors(t *testing.T) { + // Test case 1: Invalid JSON string + invalidJSON := `{"context": { "id": "1234" "source": "source" }}` // Missing comma + var dest1 map[string]interface{} + err := natsjs.ArgoUnmarshalEvent(&dest1, invalidJSON) + assert.Error(t, err) + + // Test case 2: Invalid base64 data + invalidBase64 := `{ + "context": { + "id": "1234", + "source": "source", + "specversion": "1.0", + "type": "type", + "datacontenttype": "application/json", + "subject": "subject", + "time": "2023-10-01T00:00:00Z" + }, + "data": "invalid_base64_data" + }` + var dest2 map[string]interface{} + err = natsjs.ArgoUnmarshalEvent(&dest2, invalidBase64) + assert.Error(t, err) + + // test case 3: invalid json body + invalidjsonbody := `{ + "context": { + "id": "1234", + "source": "source", + "specversion": "1.0", + "type": "type", + "datacontenttype": "application/json", + "subject": "subject", + "time": "2023-10-01t00:00:00z" + }, + "data": "W30K" // {"subject":"example","body":{"foo":"bar"}} + }` + var dest3 map[string]interface{} + err = natsjs.ArgoUnmarshalEvent(&dest3, invalidjsonbody) + assert.Error(t, err) + + // test case 3: invalid json body + invalidjsonMessage := `{ + "context": { + "id": "1234", + "source": "source", + "specversion": "1.0", + "type": "type", + "datacontenttype": "application/json", + "subject": "subject", + "time": "2023-10-01t00:00:00z" + }, + "data": "eyJzdWJqZWN0IjoiZXhhbXBsZSIsImJvZHkiOiJ7XSJ9Cg==" + }` + var dest4 map[string]interface{} + err = natsjs.ArgoUnmarshalEvent(&dest4, invalidjsonMessage) + assert.Error(t, err) +} diff --git a/messaging/natsjs/client.go b/messaging/natsjs/client.go new file mode 100644 index 0000000..d0f83b7 --- /dev/null +++ b/messaging/natsjs/client.go @@ -0,0 +1,33 @@ +package natsjs + +import ( + "context" + "time" + + "github.com/nats-io/nats.go/jetstream" +) + +type Client struct{} + +func (c *Client) InitStream(streamName string) *Client { + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + if connection == nil { + panic("Connection is not established") + } + + _, err := connection.js.CreateOrUpdateStream(ctx, jetstream.StreamConfig{ + Name: streamName, + Subjects: []string{streamName + ".>"}, + }) + if err != nil { + panic(err) + } + + return c +} + +func (c *Client) NewMessage() *Message { + return &Message{} +} diff --git a/messaging/natsjs/consumer.go b/messaging/natsjs/consumer.go new file mode 100644 index 0000000..57f7178 --- /dev/null +++ b/messaging/natsjs/consumer.go @@ -0,0 +1,127 @@ +package natsjs + +import ( + "context" + + "github.com/nats-io/nats.go/jetstream" + "github.com/simiancreative/simiango/errors" + "github.com/simiancreative/simiango/logger" +) + +func NewConsumer(name, stream, subject string) *Consumer { + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + + if connection == nil { + panic("Connection is not established") + } + + // retrieve consumer handle from a stream + cons, err := connection.js.CreateOrUpdateConsumer( + ctx, stream, jetstream.ConsumerConfig{ + FilterSubject: subject, + Durable: name, + Name: name, + }, + ) + if err != nil { + panic(err) + } + + return &Consumer{cons: cons} +} + +type Consumer struct { + cons jetstream.Consumer + // batchSize int +} + +// func (c *Consumer) BatchConfiguration(size int) *Consumer { +// c.batchSize = size +// +// return c +// } + +func (c Consumer) Consume(callback func(jetstream.Msg) error) (stop func(), err error) { + cc, err := c.cons.Consume(func(msg jetstream.Msg) { + err := callback(msg) + + if err != nil { + logger.Errorf("Error consuming message: %#v", errors.Unwrap(err)) + } + + logger.Warnf("Acking message: %s", msg.Data()) + msg.Ack() + }) + + stop = func() { + logger.Debugf("Stopping consumer") + cc.Stop() + } + + return stop, err +} + +// Some thinking to do before we implement a batch consumer. + +// The nats side is relatively easy. We need to decide how we want to handle the +// application logic and how we ack messages. + +// Do we want to ack messages in the callback or after the callback has been executed? +// +// 1. If we ack messages after the callback we need to make sure that the callback is +// idempotent on a per message basis. +// +// 2. If we ack messages inside the callback we need to make sure that the messages +// can be acked out of order in the case where some messages are acked but not +// others. + +// func (c Consumer) BatchConsume(callback func([]string) error) (stop func(), err error) { +// var shouldStop bool +// +// go func() { +// for { +// if shouldStop { +// break +// } +// +// batch, err := c.cons.Fetch(c.batchSize) +// if err != nil { +// log.Printf("Error fetching messages: %v", err) +// continue +// } +// +// msgs, data := collectMessages(batch) +// +// if len(msgs) == 0 { +// continue +// } +// +// err = callback(data) +// if err != nil { +// logger.Errorf("Error consuming message: %v", err) +// continue +// } +// +// for _, msg := range msgs { +// msg.Ack() +// } +// } +// }() +// +// stop = func() { +// logger.Debugf("Stopping consumer") +// shouldStop = true +// } +// +// return stop, err +// } +// +// func collectMessages(batch jetstream.MessageBatch) (msgs []jetstream.Msg, data []string) { +// for msg := range batch.Messages() { +// data = append(data, string(msg.Data())) +// msgs = append(msgs, msg) +// } +// +// return +// } diff --git a/messaging/natsjs/consumer_test.go b/messaging/natsjs/consumer_test.go new file mode 100644 index 0000000..9fffc59 --- /dev/null +++ b/messaging/natsjs/consumer_test.go @@ -0,0 +1,131 @@ +package natsjs_test + +import ( + "testing" + "time" + + "github.com/nats-io/nats.go/jetstream" + "github.com/simiancreative/simiango/errors" + "github.com/simiancreative/simiango/logger" + "github.com/simiancreative/simiango/messaging/natsjs" + "github.com/simiancreative/simiango/mocks/nats" + "github.com/stretchr/testify/assert" +) + +func testNewConsumer(t *testing.T) { + defer nats.MockServer()() + + assert.NotPanics(t, func() { + natsjs.SetTimeout(30) + natsjs.Connect() + + consumer := natsjs.NewConsumer("test", "test", "test") + assert.NotNil(t, consumer) + }) +} + +func testNewConsumerFailure(t *testing.T) { + assert.Panics(t, func() { + natsjs.SetTimeout(1) + natsjs.NewConsumer("test", "test", "test") + }) +} + +func testNewConsumerStop(t *testing.T) { + logger.Enable() + defer nats.MockServer()() + + natsjs.SetTimeout(30) + natsjs.Connect() + + stop, err := natsjs. + NewConsumer("test", "test", "test.>"). + Consume(func(_ jetstream.Msg) error { + return nil + }) + + assert.NoError(t, err) + assert.NotNil(t, stop) + assert.NotPanics(t, stop) +} + +func testNewConsumerConsume(t *testing.T) { + logger.Enable() + defer nats.MockServer()() + + result := make(chan string) + + natsjs.SetTimeout(30) + natsjs.Connect() + + stop, err := natsjs. + NewConsumer("test", "test", "test.>"). + Consume(func(msg jetstream.Msg) error { + logger.Debugf("Received message: %v", msg) + result <- string(msg.Data()) + return nil + }) + + assert.NoError(t, err) + assert.NotNil(t, stop) + + natsjs.New(). + NewMessage(). + SetSubject("test", "test.test"). + SetData("test data"). + Publish() + + select { + case value := <-result: + assert.Equal(t, value, `"test data"`) + case <-time.After(500 * time.Millisecond): + t.Errorf("Timeout waiting for async result") + } +} + +func testNewConsumerConsumeFailure(t *testing.T) { + hook := logger.Mock() + defer nats.MockServer()() + + natsjs.SetTimeout(30) + natsjs.Connect() + + result := make(chan string) + + callback := func(msg jetstream.Msg) error { + result <- string(msg.Data()) + return errors.New("test error") + } + + natsjs. + NewConsumer("test", "test", "test.>"). + Consume(callback) + + natsjs.New(). + NewMessage(). + SetSubject("test", "test.test"). + SetData("test data"). + Publish() + + select { + case <-result: + time.Sleep(100 * time.Millisecond) + case <-time.After(500 * time.Millisecond): + t.Errorf("Timeout waiting for async result") + } + + assert.Equal(t, `Acking message: "test data"`, hook.LastEntry().Message) +} + +func testNewConsumerNoConnection(t *testing.T) { + natsjs.Close() + + assert.Panics(t, func() { + natsjs. + NewConsumer("test", "test", "test.>"). + Consume(func(msg jetstream.Msg) error { + logger.Debugf("Received message: %v", msg) + return errors.New("test error") + }) + }) +} diff --git a/messaging/natsjs/event.go b/messaging/natsjs/event.go new file mode 100644 index 0000000..932a935 --- /dev/null +++ b/messaging/natsjs/event.go @@ -0,0 +1,16 @@ +package natsjs + +import ( + "encoding/json" + + "github.com/simiancreative/simiango/errors" +) + +func UnmarshalEvent(dest interface{}, str string) (err error) { + err = json.Unmarshal([]byte(str), &dest) + if err != nil { + return errors.Wrap(err, "failed to unmarshal event") + } + + return nil +} diff --git a/messaging/natsjs/event_test.go b/messaging/natsjs/event_test.go new file mode 100644 index 0000000..2c76fa1 --- /dev/null +++ b/messaging/natsjs/event_test.go @@ -0,0 +1,46 @@ +package natsjs_test + +import ( + "testing" + + "github.com/simiancreative/simiango/messaging/natsjs" +) + +type TestEvent struct { + Name string `json:"name"` +} + +func TestUnmarshalEvent(t *testing.T) { + tests := []struct { + name string + input string + wantErr bool + wantEvent TestEvent + }{ + { + name: "valid JSON", + input: `{"name": "test event"}`, + wantErr: false, + wantEvent: TestEvent{Name: "test event"}, + }, + { + name: "invalid JSON", + input: `{"name": "test event"`, + wantErr: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + var event TestEvent + err := natsjs.UnmarshalEvent(&event, tt.input) + if (err != nil) != tt.wantErr { + t.Errorf("UnmarshalEvent() error = %v, wantErr %v", err, tt.wantErr) + return + } + if !tt.wantErr && event != tt.wantEvent { + t.Errorf("UnmarshalEvent() = %v, want %v", event, tt.wantEvent) + } + }) + } +} diff --git a/messaging/natsjs/message.go b/messaging/natsjs/message.go new file mode 100644 index 0000000..8151e4f --- /dev/null +++ b/messaging/natsjs/message.go @@ -0,0 +1,52 @@ +package natsjs + +import ( + "context" + "encoding/json" + "strings" +) + +type Message struct { + Subject string + Data []byte +} + +// SetSubject sets the subject of the message. The subject is used to route the +// message to the correct stream. +// +// even though stream name is not required to relate a message to a stream, it +// forces our naming convention and makes it easier to find the stream that the +// message belongs to +func (m *Message) SetSubject(streamName, prefix string, suffix ...string) *Message { + suffix = append([]string{streamName, prefix}, suffix...) + m.Subject = strings.ToLower(strings.Join(suffix, ".")) + + return m +} + +func (m *Message) SetData(raw interface{}) *Message { + data, err := json.Marshal(raw) + if err != nil { + panic(err) + } + + m.Data = data + + return m +} + +func (m *Message) Publish() error { + if connection == nil { + panic("Connection is not established") + } + + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + + _, err := connection.js.Publish(ctx, m.Subject, m.Data) + if err != nil { + return err + } + + return nil +} diff --git a/messaging/natsjs/message_test.go b/messaging/natsjs/message_test.go new file mode 100644 index 0000000..cbb1cba --- /dev/null +++ b/messaging/natsjs/message_test.go @@ -0,0 +1,95 @@ +package natsjs_test + +import ( + "encoding/json" + "testing" + + "github.com/simiancreative/simiango/messaging/natsjs" + "github.com/simiancreative/simiango/mocks/nats" + "github.com/stretchr/testify/assert" +) + +func testMessageSetSubject(t *testing.T) { + msg := &natsjs.Message{} + msg.SetSubject("prefix", "suffix") + assert.Equal(t, "prefix.suffix", msg.Subject) +} + +func testMessageSetData(t *testing.T) { + msg := &natsjs.Message{} + data := map[string]string{"key": "value"} + msg.SetData(data) + + expectedData, _ := json.Marshal(data) + assert.Equal(t, expectedData, msg.Data) +} + +func testMessageSetDataError(t *testing.T) { + assert.Panics( + t, + func() { + msg := &natsjs.Message{} + data := make(chan int) + msg.SetData(data) + }, + "SetData should panic if data is not serializable", + ) +} + +func testMessagePublish(t *testing.T) { + defer nats.MockServer()() + natsjs.Connect() + natsjs.SetTimeout(2) + + err := natsjs.New(). + InitStream("test"). + NewMessage(). + SetSubject("test", "subject"). + SetData("test data"). + Publish() + + assert.Nil(t, err) +} + +func testMessagePublishError(t *testing.T) { + defer nats.MockServer()() + natsjs.Connect() + natsjs.SetTimeout(0) + + err := natsjs.New(). + NewMessage(). + SetSubject("test", "subject"). + SetData("test data"). + Publish() + + assert.Error(t, err) +} + +func testPublishNoConnection(t *testing.T) { + natsjs.Close() + + assert.PanicsWithValue( + t, + "Connection is not established", + func() { + natsjs.New(). + NewMessage(). + SetSubject("test", "subject"). + SetData("test data"). + Publish() + }, + "Publish should panic if connection is not set", + ) +} + +func testPublishNoData(t *testing.T) { + defer nats.MockServer()() + natsjs.Connect() + + err := natsjs.New(). + NewMessage(). + SetSubject("test", "subject"). + Publish() + + assert.Error(t, err) +} diff --git a/messaging/natsjs/stream.go b/messaging/natsjs/stream.go new file mode 100644 index 0000000..efeb517 --- /dev/null +++ b/messaging/natsjs/stream.go @@ -0,0 +1,91 @@ +package natsjs + +import ( + "crypto/tls" + "os" + "time" + + "github.com/nats-io/nats.go" + "github.com/nats-io/nats.go/jetstream" +) + +type conn struct { + nc *nats.Conn + js jetstream.JetStream +} + +var reconnect = true +var connection *conn +var timeout = 30 * time.Second + +func SetTimeout(t time.Duration) { + timeout = t * time.Second +} + +func SetReconnect(r bool) { + reconnect = r +} + +// Connect establishes a connection to the NATS server. +// +// USAGE: +// +// import "api/lib/stream" +// +// stream.New(). +// +// InitStream("default"). +// NewMessage(). +// SetSubject("notifications.email", params.Kind, params.Category). +// SetData(map[string]int{"pending_notifications_id": row.ID}). +// Publish() +func Connect() { + if connection != nil && connection.nc != nil && !connection.nc.IsClosed() { + return + } + + host := os.Getenv("NATS_HOST") + if host == "" { + panic("NATS_HOST is not set") + } + + opts := nats.Options{ + Url: host, + + AllowReconnect: reconnect, + RetryOnFailedConnect: true, + MaxReconnect: 10, + ReconnectWait: 10 * time.Second, + + // this is acceptable because the nats server is not exposed outside the cluster + // it is currently using a self signed certificate so we need to skip the verification + TLSConfig: &tls.Config{ + InsecureSkipVerify: true, + }, + } + + nc, err := opts.Connect() + if err != nil { + panic(err) + } + + js, err := jetstream.New(nc) + if err != nil { + panic(err) + } + + connection = &conn{nc, js} +} + +func Close() { + if connection == nil || connection.nc == nil { + return + } + + connection.nc.Close() + connection = nil +} + +func New() *Client { + return &Client{} +} diff --git a/messaging/natsjs/stream_test.go b/messaging/natsjs/stream_test.go new file mode 100644 index 0000000..00bfe37 --- /dev/null +++ b/messaging/natsjs/stream_test.go @@ -0,0 +1,83 @@ +package natsjs_test + +import ( + "os" + "reflect" + "runtime" + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/simiancreative/simiango/messaging/natsjs" + "github.com/simiancreative/simiango/mocks/nats" +) + +func TestStream(t *testing.T) { + natsjs.SetReconnect(false) + + for _, test := range tests { + // gets the function name to apply to the test run + funcValue := reflect.ValueOf(test) + name := runtime.FuncForPC(funcValue.Pointer()).Name() + + // runs test sequentially so the mock server doesnt trip over itself + t.Run(name, test) + } +} + +var tests = []func(*testing.T){ + testConnect, + testConnectFailure, + testInitStreamNoConnection, + testMessageSetSubject, + testMessageSetData, + testMessageSetDataError, + testMessagePublish, + testMessagePublishError, + testPublishNoConnection, + testPublishNoData, + testNewConsumer, + testNewConsumerFailure, + testNewConsumerConsume, + testNewConsumerStop, + testNewConsumerNoConnection, + testNewConsumerConsumeFailure, +} + +func testConnectFailure(t *testing.T) { + natsjs.Close() + + assert.PanicsWithValue( + t, + "NATS_HOST is not set", + func() { + os.Unsetenv("NATS_HOST") + natsjs.Connect() + }, + "Connect should panic if NATS_HOST is not set", + ) +} + +func testConnect(t *testing.T) { + defer nats.MockServer()() + + assert.NotPanics( + t, + func() { + natsjs.Connect() + }, + "Connect should not panic if NATS_HOST is set", + ) +} + +func testInitStreamNoConnection(t *testing.T) { + assert.PanicsWithValue( + t, + "Connection is not established", + func() { + natsjs.Close() + natsjs.New().InitStream("test") + }, + "Connect should panic if connection is not set", + ) +} diff --git a/mocks/nats/argo-event.go b/mocks/nats/argo-event.go new file mode 100644 index 0000000..01b08ee --- /dev/null +++ b/mocks/nats/argo-event.go @@ -0,0 +1,39 @@ +package nats + +import ( + "crypto/md5" + "encoding/base64" + "encoding/json" + "fmt" + "time" + + "github.com/simiancreative/simiango/messaging/natsjs" +) + +func BuildArgoEvent(workflowName, action, msg string) string { + subject := fmt.Sprintf("%s-%s", workflowName, action) + + data := natsjs.ArgoEventData{ + Subject: subject, + } + + json.Unmarshal([]byte(msg), &data.Body) + dataStr, _ := json.Marshal(data) + + event := natsjs.ArgoEvent{ + Context: natsjs.ArgoEventContext{ + ID: fmt.Sprintf("%x", md5.Sum([]byte(subject))), + Source: "nats-event-source", + SpecVersion: "1.0", + Type: "nats", + DataContentType: "application/json", + Subject: subject, + Time: time.Now().Format(time.RFC3339), + }, + Data: base64.StdEncoding.EncodeToString(dataStr), + } + + jsonData, _ := json.Marshal(event) + + return string(jsonData) +} diff --git a/mocks/nats/argo-event_test.go b/mocks/nats/argo-event_test.go new file mode 100644 index 0000000..794d584 --- /dev/null +++ b/mocks/nats/argo-event_test.go @@ -0,0 +1,32 @@ +package nats_test + +import ( + "encoding/base64" + "encoding/json" + "testing" + + "github.com/simiancreative/simiango/messaging/natsjs" + "github.com/simiancreative/simiango/mocks/nats" + "github.com/stretchr/testify/assert" +) + +func TestBuildArgoEvent(t *testing.T) { + // Test the happy path + workflowName := "test-workflow" + action := "test-action" + msg := `{"key":"value"}` + + result := nats.BuildArgoEvent(workflowName, action, msg) + + var event natsjs.ArgoEvent + err := json.Unmarshal([]byte(result), &event) + assert.NoError(t, err) + + expectedSubject := workflowName + "-" + action + assert.Equal(t, expectedSubject, event.Context.Subject) + + var data natsjs.ArgoEventData + dataBytes, _ := base64.StdEncoding.DecodeString(event.Data) + json.Unmarshal(dataBytes, &data) + assert.Equal(t, data.Subject, expectedSubject) +} diff --git a/mocks/nats/msg.go b/mocks/nats/msg.go new file mode 100644 index 0000000..68bb510 --- /dev/null +++ b/mocks/nats/msg.go @@ -0,0 +1,96 @@ +package nats + +import ( + "context" + "sync" + "time" + + "github.com/nats-io/nats.go" + "github.com/nats-io/nats.go/jetstream" +) + +var NewJetStreamMsg = func(subject string, data []byte) *jetStreamMsg { + return &jetStreamMsg{ + msg: &nats.Msg{ + Subject: subject, + Data: data, + }, + } +} + +type jetStreamMsg struct { + msg *nats.Msg + ackd bool + js *jetstream.JetStream + sync.Mutex +} + +func (m *jetStreamMsg) Metadata() (*jetstream.MsgMetadata, error) { + return nil, nil +} + +// Data returns the message body. +func (m *jetStreamMsg) Data() []byte { + return m.msg.Data +} + +// Headers returns a map of headers for a message. +func (m *jetStreamMsg) Headers() nats.Header { + return m.msg.Header +} + +// Subject returns a subject on which a message is published. +func (m *jetStreamMsg) Subject() string { + return m.msg.Subject +} + +// Reply returns a reply subject for a JetStream message. +func (m *jetStreamMsg) Reply() string { + return m.msg.Reply +} + +func (m *jetStreamMsg) Ack() error { + return nil +} + +func (m *jetStreamMsg) DoubleAck(_ context.Context) error { + return nil +} + +func (m *jetStreamMsg) Nak() error { + return nil +} + +func (m *jetStreamMsg) NakWithDelay(_ time.Duration) error { + return nil +} + +func (m *jetStreamMsg) InProgress() error { + return nil +} + +func (m *jetStreamMsg) Term() error { + return nil +} + +func (m *jetStreamMsg) TermWithReason(_ string) error { + return nil +} + +type ackOpts struct { + nakDelay time.Duration + termReason string +} + +func (m *jetStreamMsg) ackReply( + _ context.Context, + _ []byte, + _ bool, + _ ackOpts, +) error { + return nil +} + +func (m *jetStreamMsg) checkReply() error { + return nil +} diff --git a/mocks/nats/msg_test.go b/mocks/nats/msg_test.go new file mode 100644 index 0000000..663711f --- /dev/null +++ b/mocks/nats/msg_test.go @@ -0,0 +1,72 @@ +package nats + +import ( + "testing" + + "github.com/nats-io/nats.go" + "github.com/stretchr/testify/assert" +) + +func TestJetStreamMsg(t *testing.T) { + subject := "test.subject" + data := []byte("test data") + msg := NewJetStreamMsg(subject, data) + + t.Run("Metadata", func(t *testing.T) { + metadata, err := msg.Metadata() + assert.Nil(t, metadata) + assert.Nil(t, err) + }) + + t.Run("Data", func(t *testing.T) { + assert.Equal(t, data, msg.Data()) + }) + + t.Run("Headers", func(t *testing.T) { + assert.IsType(t, nats.Header{}, msg.Headers()) + }) + + t.Run("Subject", func(t *testing.T) { + assert.Equal(t, subject, msg.Subject()) + }) + + t.Run("Reply", func(t *testing.T) { + assert.Equal(t, "", msg.Reply()) + }) + + t.Run("Ack", func(t *testing.T) { + assert.Nil(t, msg.Ack()) + }) + + t.Run("DoubleAck", func(t *testing.T) { + assert.Nil(t, msg.DoubleAck(nil)) + }) + + t.Run("Nak", func(t *testing.T) { + assert.Nil(t, msg.Nak()) + }) + + t.Run("NakWithDelay", func(t *testing.T) { + assert.Nil(t, msg.NakWithDelay(0)) + }) + + t.Run("InProgress", func(t *testing.T) { + assert.Nil(t, msg.InProgress()) + }) + + t.Run("Term", func(t *testing.T) { + assert.Nil(t, msg.Term()) + }) + + t.Run("TermWithReason", func(t *testing.T) { + assert.Nil(t, msg.TermWithReason("")) + }) + + t.Run("ackReply", func(t *testing.T) { + assert.Nil(t, msg.ackReply(nil, nil, false, ackOpts{})) + }) + + t.Run("checkReply", func(t *testing.T) { + assert.Nil(t, msg.checkReply()) + }) +} diff --git a/mocks/nats/server.go b/mocks/nats/server.go new file mode 100644 index 0000000..ff08b26 --- /dev/null +++ b/mocks/nats/server.go @@ -0,0 +1,33 @@ +package nats + +import ( + "fmt" + "net" + "os" + + natsserver "github.com/nats-io/nats-server/v2/test" +) + +func MockServer() func() { + port, err := getPort() + if err != nil { + panic(err) + } + + os.Setenv("NATS_HOST", fmt.Sprintf("localhost:%v", port)) + + opts := natsserver.DefaultTestOptions + opts.Port = port + opts.JetStream = true + + return natsserver.RunServer(&opts).Shutdown +} + +func getPort() (int, error) { + addr, _ := net.ResolveTCPAddr("tcp", "localhost:0") + + listener, _ := net.ListenTCP("tcp", addr) + defer listener.Close() + + return listener.Addr().(*net.TCPAddr).Port, nil +}