-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathgoro.go
More file actions
160 lines (135 loc) · 4.05 KB
/
goro.go
File metadata and controls
160 lines (135 loc) · 4.05 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
// Package goro is pure Go client library for dealing with Event Store (versions 3.2.0 and later).
// It includes a high-level API for reading and writing events. Usage examples for the high-level
// APIs are provided inline with their full documentation.
package goro
import (
"context"
"encoding/json"
"errors"
"net/http"
"time"
"github.com/dghubble/sling"
uuid "github.com/satori/go.uuid"
)
// Author represents the Author of an Event
type Author struct {
Name string `json:"name"`
}
// Event represents an Event in Event Store
// the data and Metadata must be json encoded
type Event struct {
At time.Time `json:"updated,omitempty"`
Author Author `json:"author,omitempty"`
Stream string `json:"streamId,omitempty"`
Type string `json:"eventType"`
PositionStream string `json:"positionStreamId,omitempty"`
Data json.RawMessage `json:"data,omitempty"`
Metadata json.RawMessage `json:"metadata,omitempty"`
ID uuid.UUID `json:"eventID"`
Version int64 `json:"eventNumber"`
Position int64 `json:"positionEventNumber,omitempty"`
}
// CreateEvent initializes a new Event with an event type, some data, metadata, and a version you
// specify. It then creates a random uuid and sets the time it was created at.
func CreateEvent(eventType string, data, metadata json.RawMessage, version int64) Event {
return Event{
ID: uuid.Must(uuid.NewV4()),
Type: eventType,
Data: data,
Metadata: metadata,
Version: version,
At: time.Now(),
}
}
// Events is an array of events that implements the sort.Interface interface.
type Events []Event
// Len implements sort.Interface
func (e Events) Len() int {
return len(e)
}
// Swap implements sort.Interface
func (e Events) Swap(a, b int) {
e[b], e[a] = e[a], e[b]
}
// Less implements sort.Interface
func (e Events) Less(a, b int) bool {
return e[a].Version < e[b].Version
}
// StreamMessage contains an Event or an error
type StreamMessage struct {
Event Event
Acknowledger Acknowledger
Error error
}
// Ack acknowledges an Event or fails
func (m StreamMessage) Ack() error {
if m.Acknowledger != nil {
return m.Acknowledger.Ack()
}
return errors.New("no Acknowledger set")
}
// Nack rejects`` an Event or fails
func (m StreamMessage) Nack(action Action) error {
if m.Acknowledger != nil {
return m.Acknowledger.Nack(action)
}
return errors.New("no Acknowledger set")
}
// Action represents the action to take when Nacking an Event
type Action string
// Action enum
const (
ActionPark Action = "park"
ActionRetry = "retry"
ActionSkip = "skip"
ActionStop = "stop"
)
// Acknowledger can acknowledge or Not-Acknowledge an Event in a Persistent Subscription
type Acknowledger interface {
Ack() error
Nack(action Action) error
}
// ExpectedVersions
const (
ExpectedVersionAny int64 = -2
ExpectedVersionNone int64 = -1
ExpectedVersionEmpty int64 = 0
)
// Subscriber streams events
type Subscriber interface {
Subscribe(ctx context.Context) <-chan StreamMessage
}
// Writer writes events to a stream
type Writer interface {
Write(ctx context.Context, expectedVersion int64, events ...Event) error
}
// Reader reads a couple of Events from a stream
type Reader interface {
Read(ctx context.Context, start int64, count int) (Events, error)
}
// Slinger is something that can return a sling object
type Slinger interface {
Sling() *sling.Sling
}
// SlingerFunc is something that can return a sling object
type SlingerFunc func() *sling.Sling
// Sling implements the Slinger interface
func (f SlingerFunc) Sling() *sling.Sling {
return f()
}
func relevantError(statusCode int) error {
switch statusCode {
case http.StatusNotFound:
return ErrStreamNotFound
case http.StatusUnauthorized:
return ErrUnauthorized
case http.StatusInternalServerError:
return ErrInternalError
case http.StatusBadRequest:
return ErrInvalidContentType
case http.StatusNotAcceptable:
return ErrInvalidContentType
default:
return nil
}
}