This project aims to build a bridge between akka and pure functional code based on cats-effect
Covered:
akka-effect-actor
moduleRepresents ActorRef.tell
trait Tell[F[_], -A] {
def apply(a: A, sender: Option[ActorRef] = None): F[Unit]
}
Represents ActorRef.ask
pattern
trait Ask[F[_], -A, B] {
def apply(msg: A, timeout: FiniteDuration, sender: Option[ActorRef]): F[B]
}
Represents reply pattern: sender() ! reply
trait Reply[F[_], -A] {
def apply(msg: A): F[Unit]
}
This is what you need to implement instead of familiar new Actor { ... }
trait Receive[F[_], -A, B] {
def apply(msg: A): F[B]
def timeout: F[B]
}
Constructs Actor.scala
out of receive: ActorCtx[F] => Resource[F, Receive[F, Any]]
Wraps ActorContext
trait ActorCtx[F[_]] {
def self: ActorRef
def parent: ActorRef
def executor: ExecutionContextExecutor
def setReceiveTimeout(timeout: Duration): F[Unit]
def child(name: String): F[Option[ActorRef]]
def children: F[List[ActorRef]]
def actorRefFactory: ActorRefFactory
def watch[A](actorRef: ActorRef, msg: A): F[Unit]
def unwatch(actorRef: ActorRef): F[Unit]
def stop: F[Unit]
}
akka-effect-persistence
moduleConstructs PersistentActor.scala
out of eventSourcedOf: ActorCtx[F] => F[EventSourced[F, S, E, C]]
Describes a lifecycle of entity with regard to event sourcing, phases are: Started, Recovering, Receiving and Termination
trait EventSourced[F[_], S, E, C] {
def eventSourcedId: EventSourcedId
def recovery: Recovery
def pluginIds: PluginIds
def start: Resource[F, RecoveryStarted[F, S, E, C]]
}
Describes start of recovery phase
trait RecoveryStarted[F[_], S, E, C] {
def apply(
seqNr: SeqNr,
snapshotOffer: Option[SnapshotOffer[S]]
): Resource[F, Recovering[F, S, E, C]]
}
Describes recovery phase
trait Recovering[F[_], S, E, C] {
def replay: Resource[F, Replay[F, E]]
def completed(
seqNr: SeqNr,
journaller: Journaller[F, E],
snapshotter: Snapshotter[F, S]
): Resource[F, Receive[F, C]]
}
Used during recovery to replay events
trait Replay[F[_], A] {
def apply(seqNr: SeqNr, event: A): F[Unit]
}
Describes communication with underlying journal
trait Journaller[F[_], -A] {
def append: Append[F, A]
def deleteTo: DeleteEventsTo[F]
}
Describes communication with underlying snapshot storage
/**
* Describes communication with underlying snapshot storage
*
* @tparam A - snapshot
*/
trait Snapshotter[F[_], -A] {
def save(seqNr: SeqNr, snapshot: A): F[F[Instant]]
def delete(seqNr: SeqNr): F[F[Unit]]
def delete(criteria: SnapshotSelectionCriteria): F[F[Unit]]
}
akka-effect-eventsourced
moduleThis is the main runtime/queue where all actions against your state are processed in desired eventsourcing sequence:
It is optimised for maximum throughput hence different steps of different actions might be executed in parallel as well as events might be stored in batches
trait Engine[F[_], S, E] {
def state: F[State[S]]
/**
* @return Outer F[_] is about `load` being enqueued, this immediately provides order guarantees
* Inner F[_] is about `load` being completed
*/
def apply[A](load: F[Validate[F, S, E, A]]): F[F[A]]
}
in build.sbt
addSbtPlugin("com.evolution" % "sbt-artifactory-plugin" % "0.0.2")
libraryDependencies += "com.evolutiongaming" %% "akka-effect-actor" % "0.2.1"
libraryDependencies += "com.evolutiongaming" %% "akka-effect-actor-tests" % "0.2.1"
libraryDependencies += "com.evolutiongaming" %% "akka-effect-persistence" % "0.2.1"
libraryDependencies += "com.evolutiongaming" %% "akka-effect-eventsourcing" % "0.2.1"
libraryDependencies += "com.evolutiongaming" %% "akka-effect-cluster" % "0.2.1"
libraryDependencies += "com.evolutiongaming" %% "akka-effect-cluster-sharding" % "0.2.1"