We stopped using this library in production at Artsy in early December 2017. It served us well for 18 months, but we opted for another approach for managing an atomic event log, relying on row-based database locks and serializing the event record as a JSON list. Feel free to use it if it suits your purposes, but this repository will no longer be maintained.
Atomic Store is a system for managing persistent streams of atomic events, with strict consistency. It is intended for systems in which only one event can be admitted to a canonical event log at a time, contingent upon past events. It exists to maintain the atomicity of handling of incoming events, but outsources the actual validation logic back to the event originator. In a sense, the idea here is to do as little as possible to meet this goal, but in a way that is as practical as possible.
Atomic Store is built on top of Akka Persistence, which is designed to natively support highly scalable distributed systems with relaxed consistency. A distributed system can maximize its scalability by reducing coupling between its components, and synchronization of state changes is one such coupling. The general approach to relaxed consistency is to take compensatory actions to rectify inconsistencies between distributed components, in retrospect. But this is complex, and not desirable in all situations. Atomic Store is designed for situations where strict consistency is more desirable or appropriate than extreme scalability.
In the actor framework, individual actors are single-threaded and cannot be preempted mid-process. Consequently, consistency is guaranteed in the course of processing a message. This would be perfect for atomic validation+persistence, except for the fact that it's desirable to have validation potentially be asynchronous. Akka, out of the box, does not provide an answer for this. It's pretty easy to achieve this behavior with an actor stashes incoming messages while it is awaiting validation of the current one, and that's exactly what this project accomplishes.
At the moment, the only thing you can do is execute the tests, by running test
from the SBT prompt.
Include the following line in your build.sbt
:
libraryDependencies ++= Seq(
"net.artsy" %% "atomic-store" % "0.0.6")
Then, in your project, you will want to instantiate an atomic store matching your event types:
object MyEventStore extends AtomicEventStore[MyEventType, MyRejectionReasonType](myTimeoutReason)
In your start up code, you'll start the Receptionist actor, which serves as the store's entry point:
case class MyEventStore(
storeTimeout: FiniteDuration,
journalPluginId: String,
snapshotPluginId: String
)(
implicit
system: ActorSystem,
ec: ExecutionContextExecutor
) {
import MyEventStore._
val receptionist = system.actorOf(receptionistProps(storeTimeout, journalPluginId, snapshotPluginId))
}
At this point, you'll be able to persist events and check the store by sending messages to the receptionist.
To work within a cluster, it's important that only one instance of each event log be alive within the cluster. This can be accomplished by instantiating the receptionist as a cluster singleton. This might look like:
case class MyEventStore(
storeTimeout: FiniteDuration,
journalPluginId: String,
snapshotPluginId: String
)(
implicit
system: ActorSystem,
ec: ExecutionContextExecutor,
timeout: Timeout
) {
import MyEventStore._
val singletonProps = ClusterSingletonManager.props(
singletonProps = receptionistProps(storeTimeout, journalPluginId, snapshotPluginId),
terminationMessage = PoisonPill,
settings = ClusterSingletonManagerSettings(system)
)
val manager = system.actorOf(singletonProps, "receptionist")
val path = manager.path.toStringWithoutAddress
val proxyProps = ClusterSingletonProxy.props(
singletonManagerPath = path,
settings = ClusterSingletonProxySettings(system)
)
val receptionist = system.actorOf(proxyProps)
}
In a project, it's likely you will want some sort of server-push mechanism to notify clients of new events. Rather than containing this logic. This code can likely be located within the handler of the result.
You will likely also want to use Protobuf and a custom serializer for high-performance serialization of messages to and from Atomic Store. A sample .proto
file:
syntax = "proto2";
package net.artsy.auction.protobuf;
// Lot Event Store messages
message AtomicStoreMessageProto {
oneof type {
QueryEventsProto query_event = 1;
StoreIfValidProto store_if_valid = 2;
ValidationRequestProto validation_request = 3;
ValidationResponseProto validation_response = 4;
ResultProto result = 5;
}
}
message QueryEventsProto {
optional string scope_id = 6;
}
message StoreIfValidProto {
optional bytes event = 7;
}
message ValidationRequestProto {
optional bytes prospective_event = 8;
repeated bytes past_events = 9;
}
message ValidationResponseProto {
optional bool validation_did_pass = 10;
optional bytes event = 11;
optional string reason = 12;
optional MetaProto meta = 13;
}
message ResultProto {
optional bool was_accepted = 14;
optional bytes prospective_event = 15;
repeated bytes stored_event_list = 16;
optional string reason = 17;
optional MetaProto meta = 18;
}
message MetaProto {
// Domain-specific fields
}
In your Akka Serializer implementation, you'll then want to serialize your events themselves to a byte array, perhaps deferring to a separate serializer.
Atomic Store is built using Scala, the Akka framework, and associated libraries. Specifically, here are the core technologies being used, with links to documentation:
For testing changes:
build.sbt
as appropriate, and add -SNAPSHOT
to the end of the version number.libraryDependencies
line above in anticipation of the next version.sbt publish-signed
task to push snapshots to Maven Central.development
branch, so that the master
branch on Github always reflects the latest version on Maven Central. For releasing new versions:
-SNAPSHOT
suffix in build.sbt
.sbt publish-signed
.development
into master
to update the canonical version on Github.For reference on this process, you may want to see the following links:
0.0.7
0.0.6
0.0.5
meta
field to validation process to allow validation code to pass back arbitrary additional information.0.0.4
0.0.3
0.0.2
Timestamped
. It's not crucial to the logic of this library, so let the client own all of the metadata it wants to associate with its events.0.0.1