jet / equinox

.NET event sourcing library with CosmosDB, DynamoDB, EventStoreDB, message-db, SqlStreamStore and integration test backends. Focused at stream level; see https://github.com/jet/propulsion for cross-stream projections/subscriptions/reactions
https://github.com/jet/dotnet-templates
Apache License 2.0
472 stars 68 forks source link

WIP: Equinox.Cosmos Storage + Programming Model description #50

Closed bartelink closed 5 years ago

bartelink commented 5 years ago

NB this is long and needs lots of editing.

Storage model (see source)

Batches

Events are stored in immutable batches consisting of:

Events

Per Event, we have the following:

Tip Batch

The tip is readable via a point-read, as the id has a fixed known value (-1). It uses the same base layout as an Event-Batch, but adds the following:

State, Snapshots, Events and Unfolds

In an Event Sourced system, we typically distinguish between the following basic elements

.... and:

Generating and saving unfolded events

Periodically, along with writing the events that a decision function yields to represent the implications of a command given the present state, we also unfold the resulting state' and supply those to the sync function too. The unfold function takes the state and projects one or more snapshot-events which can be used to reestablish the same state we have thus far derived from watching the events on the stream. Unlike normal events, unfolded events do not get replicated to other systems, and can also be thrown away at will (we also compress them rather than storing them as fully expanded json).

Reading from the Storage Model

Most reads request tip with anIfNoneMatch precondition citing the `etag it bore when we last saw it, which, when combined with a cache means one of the following happens when a reader is trying to establish the state of a stream prior to processing a Command:

Building a state from the Storage Model and/or the Cache

Given a stream with:

{ id:0, i:0, e: [{c:c1, d:d1}]},
{ id:1, i:1, e: [{c:c2, d:d2}]}, 
{ id:2, i:2, e: [{c:c2, d:d3}]}, 
{ id:3, i:3, e: [{c:c1, d:d4}]}, 
{ id:-1,
  i:4,
  e: [{i:4, c:c3, d:d5}],
  u: [{i:4, c:s1, d:s5Compressed}, {i:3, c:s2, d:s4Compressed}],
  _etag: "etagXYZ"
}  

If we have state4 based on the events up to {i:3, c:c1, d: d4} and the index document, we can produce the state by folding in a variety of ways:

If we have state3 based on the events up to {i:3, c:c1, d: d4}, we can produce the state by folding in a variety of ways:

If we have state5 based on the events up to C3 d5, and (being the writer, or a recent reader), have the etag: etagXYZ, we can do a HTTP GET with etag: IfNoneMatch etagXYZ, which will return 302 Not Modified with < 1K of data, and a charge of 1.00 RU allowing us to derive the state as:

Programming model

In F#, the Equinox programming model involves, per aggregation of events on a given category of stream:

When using the Equinox.Cosmos adapter, one will typically implement two further functions in order to avoid having to have every 'event in the stream having to be loaded and processed in order to build the 'state (versus a single cheap point read from CosmosDb to read the tip):

High level Command Processing flow

When running a decision process, we thus have the following stages:

  1. establish a known 'state (based on a given position in the stream of Events)
  2. let the decide function look at the request/command and yield a set of events (or none) that represent the effect of that decision in terms of events
  3. update the stream _contingent on the stream still being in the same State/Position it was in step 1 3a. if there is no conflict (nobody else decided anything since we decided what we'd do), append the events to the stream (record the new position and etag) 3b. if there is a conflict, take the conflicting events that other writers have produced since step 1, fold them into our state, and go back to 2 (the CosmosDb stored procedure sends them back immediately at zero cost or latency)
  4. if it makes sense for our scenario, hold the state, position and etag in our cache. When a reader comes along, do a point-read of the tip and jump straight to step 2 if nothing has been modified.

Sync stored procedure high level flow (see source)

The sync stored procedure takes a document as input which is almost identical to the format of the tip batch (in fact, if the stream is found to be empty, it forms the template for the first document created in the stream). The request includes the following elements:

Example

The following example is a minimal version of the Favorites model, with shortcuts for brevity (yes, and imperfect performance characteristics):

(* Event schemas *)

type Item = { id: int; name: string; added: DateTimeOffset } 
type Event =
    | Added of Item
    | Removed of itemId
    | Compacted of items: Item[]

(* State types *)

type State = Item list

let contains state id = state |> List.exists (fun x -> x.id=id)

(* Folding functions to build state from events *)

let evolve state event =
    match event with
    | Compacted items -> List.ofArray items
    | Added item -> item :: state
    | Removed id -> List.filter (not (contains state id)) 
let fold state events = Seq.fold evolve state events 

(* Decision Processing *)

type Command =
    | Add item
    | Remove itemId: int

let decide command state =
    match command with
    | Add (id, name, date) ->
        if contains id then [] else [Added {id=id; name=name; date=date}]
    | Remove id -> 
        if contains id then [Removed id] else []

(* Equinox.Cosmos Unfold Functions to allow loading without queries *)

let unfold state =
    [Event.Compacted state]
let isOrigin = function
    | Compacted _ -> true
    | _ -> false
voronoipotato commented 5 years ago

Maybe this should go in the wiki with a WIP label, it looks quite useful.

bartelink commented 5 years ago

Good point - in fact, the thought crossed my mind just this morning (reason I made it an Issue is that some form of this needs to go in the README too). But the perfect shouldn't be the enemy of the good, so I'm on it...

bartelink commented 5 years ago

extracted to https://github.com/jet/equinox/wiki/Programming-Model

voronoipotato commented 5 years ago

Thank you!