forked from jet/equinox
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathFavorites.fsx
More file actions
178 lines (140 loc) · 6.61 KB
/
Favorites.fsx
File metadata and controls
178 lines (140 loc) · 6.61 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
#if LOCAL
// Compile Tutorial.fsproj by either a) right-clicking or b) typing
// dotnet build samples/Tutorial before attempting to send this to FSI with Alt-Enter
#I "bin/Debug/net6.0/"
#r "Serilog.dll"
#r "Serilog.Sinks.Console.dll"
#r "Equinox.dll"
#r "Equinox.Core.dll"
#r "Equinox.MemoryStore.dll"
#r "FSharp.UMX.dll"
#r "FSCodec.dll"
#else
#r "nuget:Equinox, *-*"
#r "nuget:Equinox.MemoryStore, *-*"
#r "nuget:Serilog.Sinks.Console"
#endif
(*
* EVENTS
*)
(* Define the events that will be saved in the stream *)
// Note using strings and DateTimes etc as Event payloads is not supported for .Cosmos or .EventStore using the UnionCodec support
// i.e. typically records are used for the Event Payloads even in cases where you feel you'll only ever have a single primitive value
type Event =
| Added of string
| Removed of string
// No IUnionContract or Codec required as we're using a custom encoder in this example
// interface TypeShape.UnionContract.IUnionContract
type State = string list
let initial : State = []
let evolve state = function
| Added sku -> sku :: state
| Removed sku -> state |> List.filter (fun x -> x <> sku)
let fold s xs = Seq.fold evolve s xs
(* With the basic Events and `fold` defined, we have enough to build the state from the Events:- *)
let initialState = initial
//val initialState : string list = []
let favesCba = fold initialState [Added "a"; Added "b"; Added "c"]
//val favesCba : string list = ["c"; "b"; "a"]
(*
* COMMANDS
*)
(* Now we can build a State from the Events, we can interpret a Command in terms of how we'd represent that in the stream *)
type Command =
| Add of string
| Remove of string
let interpret command state =
match command with
| Add sku -> if state |> List.contains sku then [] else [Added sku]
| Remove sku -> if state |> List.contains sku |> not then [] else [Removed sku]
(* Note we don't yield events if they won't have a relevant effect - the interpret function makes the processing idempotent
if a retry of a command happens, it should not make a difference *)
let removeBEffect = interpret (Remove "b") favesCba
//val removeBEffect : Event list = [Removed "b"]
let favesCa = fold favesCba removeBEffect
// val favesCa : string list = ["c"; "a"]
let _removeBAgainEffect = interpret (Remove "b") favesCa
//val _removeBAgainEffect : Event list = []
(*
* STREAM API
*)
(* Equinox.Decider provides low level functions against an IStream given
a) a log to send metrics and store roundtrip info to
b) a maximum number of attempts to make if we clash with a conflicting write *)
// Example of wrapping Decider to encapsulate stream access patterns (see DOCUMENTATION.md for reasons why this is not advised in real apps)
type Handler(decider : Equinox.Decider<Event, State>) =
member _.Execute command : Async<unit> =
decider.Transact(interpret command)
member _.Read : Async<string list> =
decider.Query id
(* When we Execute a command, Equinox.Decider will use `fold` and `interpret` to Decide whether Events need to be written
Normally, we'll let failures percolate via exceptions, but not return a result (i.e. we don't say "your command caused 1 event") *)
// For now, write logs to the Console (in practice we'd connect it to a concrete log sink)
open Serilog
let log = LoggerConfiguration().WriteTo.Console().CreateLogger()
// related streams are termed a Category; Each client will have it's own Stream.
let Category = "Favorites"
let clientAFavoritesStreamId = Equinox.StreamId.gen id "ClientA"
// For test purposes, we use the in-memory store
let store = Equinox.MemoryStore.VolatileStore()
// MemoryStore (as with most Event Stores) provides a way to observe events that have been persisted to a stream
// For demo purposes we emit those to the log (which emits to the console)
let logEvents sn (events: FsCodec.ITimelineEvent<_>[]) =
log.Information("Committed to {sn}, events: {@events}", FsCodec.StreamName.toString sn, seq { for x in events -> x.EventType })
let _ = store.Committed.Subscribe(fun struct (sn, xs) -> logEvents sn xs)
let codec =
// For this example, we hand-code; normally one uses one of the FsCodec auto codecs, which codegen something similar
let encode = function
| Added x -> struct ("Add",box x)
| Removed x -> "Remove",box x
let tryDecode name (e : obj) : Event voption =
match name, e with
| "Add", (:? string as x) -> Added x |> ValueSome
| "Remove", (:? string as x) -> Removed x |> ValueSome
| _ -> ValueNone
FsCodec.Codec.Create(encode, tryDecode)
// Each store has a <Store>Category that is used to resolve IStream instances binding to a specific stream in a specific store
// ... because the nature of the contract with the handler is such that the store hands over State, we also pass the `initial` and `fold` as we used above
let cat = Equinox.MemoryStore.MemoryStoreCategory(store, codec, fold, initial)
let resolve = Equinox.Decider.resolve log cat
// We hand the streamId to the resolver
let clientAStream = resolve Category clientAFavoritesStreamId
// ... and pass the stream to the Handler
let handler = Handler(clientAStream)
(* Run some commands *)
handler.Execute(Add "a") |> Async.RunSynchronously
handler.Execute(Add "b") |> Async.RunSynchronously
// Idempotency comes into play if we run it twice:
handler.Execute(Add "b") |> Async.RunSynchronously
(* Read the current state *)
handler.Read |> Async.RunSynchronously
// val it : string list = ["b"; "a"]
(*
* SERVICES
*)
(* Building a service to package Command Handling and related functions
No, this is not doing CQRS! *)
type Service(deciderFor : string -> Handler) =
member _.Favorite(clientId, sku) =
let decider = deciderFor clientId
decider.Execute(Add sku)
member _.Unfavorite(clientId, skus) =
let decider = deciderFor clientId
decider.Execute(Remove skus)
member _.List(clientId): Async<string list> =
let decider = deciderFor clientId
decider.Read
(* See Counter.fsx and Cosmos.fsx for a more compact representation which makes the Handler wiring less obtrusive *)
let streamFor (clientId: string) =
let streamId = Equinox.StreamId.gen id clientId
let decider = Equinox.Decider.resolve log cat Category streamId
Handler(decider)
let service = Service(streamFor)
let client = "ClientB"
service.Favorite(client, "a") |> Async.RunSynchronously
service.Favorite(client, "b") |> Async.RunSynchronously
service.List(client) |> Async.RunSynchronously
// val it : string list = ["b"; "a"]
service.Unfavorite(client, "b") |> Async.RunSynchronously
service.List(client) |> Async.RunSynchronously
//val it : string list = ["a"]