artsy / atomic-store

Atomic event store for Scala/Akka
MIT License
17 stars 3 forks source link

A possible new architecture #20

Open acjay opened 7 years ago

acjay commented 7 years ago

It may be possible to simplify this project further. Today, the EventLog actor replies to its sender for validation, but it seems like this may be unnecessary.

Naive case Assuming the validator is static, the atomic store could be instantiated with the validator of a type such as (CurrentEvent, PastEvents) => ValidationResult. If this were the case, the EventLog could make its persistence decisions without needing to make any external communication, because the decision logic would be injected at startup time.

Plus async That assumes that validation can be done synchronously. More generally, the type would be more like (CurrentEvent, PastEvents) => Future[ValidationResult]. In this case, the stateful stashing behavior of the EventLog would still need to be retained, in order to guarantee atomicity of validation+persistence.

Plus auxiliary input data It may be desirable to have some data taken into account for validation, yet a different representation actually persisted as an event. For example, in canonical Event Sourcing, commands are requests to change state, and events are a record of what happened. Perhaps it would be more general to define the interface as (Command, PastEvents) => Future[Seq[Event]]. This generalizes over the concept of auxiliary input data.

Plus auxiliary output data The validation process has knowledge of the change in state effected by a command. It may be desirable to propagate some information out about the validation process (e.g. errors) or the resulting changes. So perhaps the most general validator interface would be (Command, PastEvents) => Future[(Seq[Event], AdditionalData)].

Side objective: Split out atomicity But it occurs to me that this behavior atomic processing of incoming commands is actually independent of the storage concern. Could this behavior be factored into a mixin or wrapper-actor? If so, the actual persistence part could potentially be written basically the same as in the naive case. With care for handing buffering, this could be a very generic way to allow actors to cope with async APIs in a way that also preserves the actor framework rule of sequential message processing.

It occurs to me that this sounds a lot like a Reactive Stream with a capacity of 1 before backpressuring. I think this may be a red herring though, because the actor would still need to be a Cluster Singleton (for atomicity) and accessible via Remoting, neither of which fit the Akka Streams model.

Requester might be an example to look at for how to make such behavior modular, although it isn't designed to be atomic.

See also Fun.CQRS for an attempt to model the pattern in types.

acjay commented 7 years ago

Been working on further thoughts on how this would look in implementation. I'm interested in seeing whether it's possible to factor the functionality of Atomic Store into smaller, more generic & cohesive pieces. This could have a lot less protocol than today's Atomic Store.

Another thing that is codified here is the use of async method API facads (i.e. Future-returning methods) instead of presenting actor refs. I think this is more type-safe, more encapsulated, and less error-prone. But to take advantage of Actor supervision, all the pieces are still implemented with actors and set up so that the supervision chain is intact.

// Interaction would be through an async interface,
// rather than explicit message passing.
trait Log[EventType] {
  def persistenceId: String
  def read: Future[Seq[EventType]]
  def append(events: Seq[EventType]): Future[Done]
}

object Log {
  def apply[EventType](persistenceId: String)(implicit actorRefFactory: ActorRefFactory) = Log[EventType] = ???
}

Then the whole machine would be:

// General purpose utility for allowing actors to process 
// messages asynchronously in strict sequence, since the
// actor model only provides this guarantee synchronously.
def atomicFlow[Input, Output](
  bufferSize: Int, 
  bufferOverflowStrategy: OverflowStrategy
)(
  f: Input => Future[Output]
)(
  implicit 
  mat: ActorMaterializer
): ActorRef = {
  Source
    .actorRef[(Input, ActorRef)]
    .mapAsync(1) { case (input, replyTo) => (f(input), replyTo) }
    .to(Sink.foreach { case (output, replyTo) => replyTo ! output })
    .run()
class AtomicEventLog[CommandType, EventType, OutputType](persistenceId: String, validator: (Command, Seq[EventType], Seq[EventType] => Future[Done]) => Future[OutputType) {
  implicit val materializer = ActorMaterializer()
  val log = Log[EventType](persistenceId)
  val atomicFlowActorRef = atomicFlow(10, OverflowStrategy.DropLast) { command =>
      validator(command, log.read, log.append)
  }

  // Need to figure out how to apply timeout rules and 
  // to make `atomicFlow` failures are supervised by
  // this actor.

  def receive = {
    case command: CommandType => atomicFlowActorRef ! (command, sender())
    case EventsForId(persistenceId) => log.read pipeTo sender()
  }
}
// TODO: How to make sure that timeouts cancel work that is
// mid-stream?

object MyAtomicStore extends AtomicStore[CommandAndConfig] { 
  case ((command, config), pastEvents, appendToLog) =>
    async {
      val calculator = Calculator(config, pastEvents)
      val (newEvents, additionalData) = await(calculator.validateCommand(command))
      await(appendToLog(newEvents))
      (newEvents, additionalData)
    }
  }
}
  def processCommandForId(command: Command, scope: String) = async {
    val config = await(configForId(scope))
    await(atomicStore.processCommand((command, config), scope))
  }