svalaskevicius / eventflow

cqrs+es in scala
45 stars 4 forks source link

improve projection replay #18

Closed svalaskevicius closed 8 years ago

svalaskevicius commented 8 years ago

when a projection is replayed from existing DBs, it would be good to order the events by their creation time..

one idea is to pass all relevant DBs with handlers to applyNewEventsFromDb, and read them (buffered) + order in Projection:: applyNewEventsFromDbToProjection.

any other ideas?

svalaskevicius commented 8 years ago

Should there be only one DbBackend with type E passed to its operations? (this would move HList usage from BatchRunner to InMemoryDb and ProjectionHandlers to Projection?

Fristi commented 8 years ago
  1. Potentially the list of events can be very large. type DbBackend[E] = TreeMap[AggregateId, TreeMap[Int, List[E]]] will use a lot of memory in the end. For now that is a type alias to a pure structure, but it might become a effectful computation (e.g.: Process/Task/Source). Scalaz-stream/fs2 is a nice lib todo that, but akka-stream aswell. But that should be up to the user I think. The foundation of Eventflow is the free monad stuff at the moment which makes it nice to write interpreters for. I think would be good to decouple that part for reading events aswell.
  2. In the current implementation the projection is bound to a stream of events bound by its aggregateId, but what about multiple aggregates? Akka Query allows you to tag events by it's tag. You could tag a event by it's aggregate type, but also other parts. It would be nice if we could work out a similiar system like that and consume a tag/evenstream in a projection. Doing this strongly typed would be really nice and I've done some work on that with here: https://gist.github.com/Fristi/7b5981d85a83c7c504de. This is based on akka-query. The essence are in lines 76 and 77 where I define two streams of events. EventStream.create[UserEvent]("userEvent") will tag all events with userEvent. On the next line we create stream with a subset of the events by only selecting the UserNameChange events EventStream.fromRoute[UserEvent].apply("userNameChanges" / int) { case f:UserEvent.NameChanged => f.userId :: HNil }. While you can parameterize this event stream by int, it's not really useful now, since you only get the username changes of one specific users, but you can paramaterize to what ever you want. I use EventStream.flow to get Source[E] with the events I am interested in. The Projector and ProjectionInstruction is not really nice yet. But Projector has similarities with the Projection stuff you are developing at the moment.
  3. We need to think about we persist a projection. Snapshotting, etc?
svalaskevicius commented 8 years ago
  1. yeah the in memory db is only supposed to be used for testing - so far I've used it for my examples, but in future it could be used for aggregated def / projections testing w/o any real infrastructure. as for pure vs effectful computation - you're right - it will need to change to allow communication :)
  2. stream per agg id is fine for aggregate usage, however as we just saw, projections will need to read from multiple aggregates... Ideally I would avoid coupling to a specific framework just yet (akka) and would much rather have an adapter layer for it, or any other backend, keeping the core types framework independant. As for a general approach of tagging - I think its a good idea :)
  3. if a projection is restarted, it should be possible to replay it from the original primary events - so snapshotting, at this point, could be a premature optimisation :) Another point is if we allow projections to emit derivative events - should we store them in projection's stream and allow other projection's to read them? (currently its not supported - and would make a bit harder to sync - e.g. when replaying projection - do we re-emit the events? and how do we know which ones are stored already?) I think we'll need to think about this but its a secondary goal as its a bit complex and CQRS/ES application could be used w/o them. still nice to have so will create a separate issue as an idea :)

Thanks for the comments! Such conversation is probably the best way to find a good solution w/o implementing and trying everything first! :)

My plan for this task so far:

What do you think?

Fristi commented 8 years ago

you're right - it will need to change to allow communication

There is also Streaming[T] in Cats. It would be interesting if we could use that to model a lazy list of events

Ideally I would avoid coupling to a specific framework just yet (akka) and would much rather have an adapter layer for it, or any other backend, keeping the core types framework independant

Yea of couse, we don't want to couple akka in, we want to keep that at the outside of the library as much as possible.

Another point is if we allow projections to emit derivative events - should we store them in projection's stream and allow other projection's to read them? (currently its not supported - and would make a bit harder to sync - e.g. when replaying projection - do we re-emit the events? and how do we know which ones are stored already?)

A projection is like a aggregate. It receives events instead of commands, but we can also emit events there. The name of projection (tag) is the stream it will emit events to. Maybe use pointers in the event log to avoid re-emiting the event? With snapshotting I mean, the state of the projection can be stored as a snapshot. By doing that we can store the version of that state as well. So if you restart the projection it will read from the snapshot store and continue from the specified version.Of course snapshotting is something you want to turn on/off, so maybe disable it by default?

svalaskevicius commented 8 years ago

the main purpose of a projection is to produce view data (the read model), and it is eventually consistent. It could emit (derivative) events too, and be more powerful than the previous description. - I would like to support that as it allows composing projections. Not sure if events from projections should be separated logically (reasoning based on the consistency of the system).

The current pointer of the events already read is stored in https://github.com/svalaskevicius/eventflow/blob/master/src/main/scala/Cqrs/Projection.scala#L34 - as the events are versioned in the db already :)

Snapshotting should really be thought of as an optimisation problem I think - we have to support full recovery by simply replaying the events from event storage, otherwise we'll need to say that the snapshots are the primary storage of the system (removes some benefits of event sourcing). But I agree, we should add them at some point :)

The difficulty arises if we allow projections to emit events and store them in DB - if we replay the projection from scratch, it emits the same events - given that the order it gets the source events is the same (so this is the first thing to solve). The second - is not to store the duplicate events for this projection, but this should be resolvable by the event versions.

svalaskevicius commented 8 years ago

being solved in #16

svalaskevicius commented 8 years ago

done - when a projection is replayed it reads the events in the original order now