-
Notifications
You must be signed in to change notification settings - Fork 70
/
Copy pathIndex.fs
62 lines (49 loc) · 2.46 KB
/
Index.fs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
module Index
let [<Literal>] private CategoryName = "Index"
let private streamId = FsCodec.StreamId.gen IndexId.toString
// NOTE - these types and the union case names reflect the actual storage formats and hence need to be versioned with care
module Events =
type ItemIds = { items: string[] }
type Items<'v> = { items: Map<string,'v> }
type Event<'v> =
| Added of Items<'v>
| Deleted of ItemIds
| Snapshotted of Items<'v>
interface TypeShape.UnionContract.IUnionContract
let codec<'v> = EventCodec.genJsonElement<Event<'v>>
module Fold =
type State<'v> = Map<string,'v>
let initial = Map.empty
let private evolve state = function
| Events.Deleted { items = xs } ->
(state, xs) ||> Array.fold (fun state k -> Map.remove k state)
| Events.Added { items = xs }
| Events.Snapshotted { items = xs } ->
(state, xs) ||> Map.fold (fun state k v -> Map.add k v state)
let fold state = Array.fold evolve state
let snapshot state = Events.Snapshotted { items = state }
let interpret add remove (state: Fold.State<'v>) =
let fresh = [| for k,v in add do if not (state |> Map.containsKey k) then yield k,v |]
let dead = [| for k in remove do if state |> Map.containsKey k then yield k |]
match fresh,dead with
| [||],[||] -> (0, 0), [||]
| adds,removes ->
(adds.Length, removes.Length),
[| if adds.Length <> 0 then Events.Added { items = Map.ofSeq adds }
if removes.Length <> 0 then Events.Deleted { items = removes } |]
type Service<'t> internal (decider: Equinox.Decider<Events.Event<'t>, Fold.State<'t>>) =
member _.Ingest(adds: seq<string*'t>, removes: string seq) : Async<int*int> =
decider.Transact(interpret adds removes)
member _.Read() : Async<Map<string,'t>> =
decider.Query id
let create<'t> indexId cat =
Service(streamId indexId |> Equinox.Decider.forStream Serilog.Log.Logger cat)
module Cosmos =
open Equinox.CosmosStore
let category (context,cache) =
let cacheStrategy = Equinox.CachingStrategy.SlidingWindow (cache, System.TimeSpan.FromMinutes 20.)
let accessStrategy = AccessStrategy.RollingState Fold.snapshot
CosmosStoreCategory(context, CategoryName, Events.codec, Fold.fold, Fold.initial, accessStrategy, cacheStrategy)
module MemoryStore =
let category store =
Equinox.MemoryStore.MemoryStoreCategory(store, CategoryName, Events.codec, Fold.fold, Fold.initial)