EventStoreInterop is an implementation of event store on top of Akka Persistence plugins, i.e. logic that implements EventStore trait and uses JournalProtocol.XXX classes to communicate with plugin actor.
Recovering events require managing actor's state while the recovery happening thus here's the state:
sealed trait State
object State {
object Empty extends State
case class Buffering(events: Vector[EventStore.Event[Any]]) extends State
case class Consuming(consumer: F[Consumer]) extends State
case class Finishing(events: Vector[EventStore.Event[Any]], finalSeqNr: SeqNr) extends State
}
where Consuming represents presence on the consumer, i.e. thing that calls user-defined stream callback. On each new event consumer will handler it lazily by adding handle action on top of action's stack:
case state: State.Consuming =>
message match {
case JournalProtocol.ReplayedMessage(persisted) =>
val consumer = for {
consumer <- state.consumer
consumer <- consumer.onEvent(event(persisted))
} yield consumer
val state1 = State.Consuming(consumer): State
state1.asLeft[Consumer].pure[F]
effectively postponing consumer.onEvent(event(persisted)) until whole stream is consumed and buffered in memory. This can be improved by pushing events into the consumer immediately on arrival (if the consumer is available) or still processing them asynchronously (but without spawning fibers on each event)
EventStoreInterop
is an implementation of event store on top of Akka Persistence plugins, i.e. logic that implementsEventStore
trait and usesJournalProtocol.XXX
classes to communicate with plugin actor.Recovering events require managing actor's state while the recovery happening thus here's the state:
where
Consuming
represents presence on the consumer, i.e. thing that calls user-defined stream callback. On each new event consumer will handler it lazily by adding handle action on top of action's stack:effectively postponing
consumer.onEvent(event(persisted))
until whole stream is consumed and buffered in memory. This can be improved by pushing events into the consumer immediately on arrival (if the consumer is available) or still processing them asynchronously (but without spawning fibers on each event)