forked from jet/equinox
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathCounter.fsx
More file actions
112 lines (94 loc) · 4.24 KB
/
Counter.fsx
File metadata and controls
112 lines (94 loc) · 4.24 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
#if !LOCAL
// Compile Tutorial.fsproj before attempting to send this to FSI with Alt-Enter by either:
// a) right-clicking or
// b) typing dotnet build samples/Tutorial
#I "bin/Debug/net6.0/"
#r "Serilog.dll"
#r "Serilog.Sinks.Console.dll"
#r "Equinox.dll"
#r "Equinox.Core.dll"
#r "Equinox.MemoryStore.dll"
#r "TypeShape.dll"
#r "FSharp.UMX.dll"
#r "FsCodec.dll"
#r "FsCodec.Box.dll"
#r "FsCodec.NewtonsoftJson.dll"
#else
#r "nuget:Equinox.MemoryStore, *-*"
#r "nuget:FsCodec.Box"
#r "nuget:Serilog.Sinks.Console"
#endif
// Contributed by @voronoipotato
(* Events are things that have already happened,
they always exist in the past, and should always be past tense verbs*)
(* A counter going up might clear to 0, but a counter going down might clear to 100. *)
type Cleared = { value : int }
type Event =
| Incremented
| Decremented
| Cleared of Cleared
interface TypeShape.UnionContract.IUnionContract
// Events for a given DDD aggregate are considered to be in the same 'Category' for indexing purposes
// When reacting to events (using Propulsion), the Category will be a key thing to filter events based on
let [<Literal>] Category = "Counter"
// Maps from an app-level counter name (perhaps a strongly typed id), to a well-formed StreamId that can be stored in the Event Store
// For this sample, we let callers just pass a string, and we trust it's suitable for use as a StreamId directly
let streamId = Equinox.StreamId.gen id
type State = State of int
let initial : State = State 0
(* Evolve takes the present state and one event and figures out the next state*)
let evolve state event =
match event, state with
| Incremented, State s -> State (s + 1)
| Decremented, State s -> State (s - 1)
| Cleared { value = x }, _ -> State x
(* Fold is folding the evolve function over all events to get the current state
It's equivalent to LINQ's Aggregate function *)
let fold state events = Seq.fold evolve state events
(* Commands are the things we intend to happen, though they may not*)
type Command =
| Increment
| Decrement
| Clear of int
(* Decide consumes a command and the current state to decide what events actually happened.
This particular counter allows numbers from 0 to 100. *)
let decide command (State state) =
match command with
| Increment ->
if state > 100 then [] else [Incremented]
| Decrement ->
if state <= 0 then [] else [Decremented]
| Clear i ->
if state = i then [] else [Cleared {value = i}]
type Service internal (resolve : string -> Equinox.Decider<Event, State>) =
member _.Execute(instanceId, command) : Async<unit> =
let decider = resolve instanceId
decider.Transact(decide command)
member x.Reset(instanceId, value) : Async<unit> =
x.Execute(instanceId, Clear value)
member _.Read instanceId : Async<int> =
let decider = resolve instanceId
decider.Query(fun (State value) -> value)
(* Out of the box, logging is via Serilog (can be wired to anything imaginable).
We wire up logging for demo purposes using MemoryStore.VolatileStore's Committed event
MemoryStore itself, by design, has no intrinsic logging
(other store bindings have rich relevant logging about roundtrips to physical stores etc) *)
open Serilog
let log = LoggerConfiguration().WriteTo.Console().CreateLogger()
let logEvents sn (events : FsCodec.ITimelineEvent<_>[]) =
log.Information("Committed to {streamName}, events: {@events}", sn, seq { for x in events -> x.EventType })
(* We can integration test using an in-memory store
See other examples such as Cosmos.fsx to see how we integrate with CosmosDB and/or other concrete stores *)
let store = Equinox.MemoryStore.VolatileStore()
let _ = store.Committed.Subscribe(fun struct (sn, xs) -> logEvents sn xs)
let codec = FsCodec.Box.Codec.Create()
let resolve =
Equinox.MemoryStore.MemoryStoreCategory(store, codec, fold, initial)
|> Equinox.Decider.resolve log
let service = Service(streamId >> resolve Category)
let clientId = "ClientA"
service.Read(clientId) |> Async.RunSynchronously
service.Execute(clientId, Increment) |> Async.RunSynchronously
service.Read(clientId) |> Async.RunSynchronously
service.Reset(clientId, 5) |> Async.RunSynchronously
service.Read(clientId) |> Async.RunSynchronously