A framework for building event sourced, CQRS applications.
Table of Contents
Event Sourcery is in production use at Envato.
The goal of EventSourcery is to make it easier to build event sourced, CQRS applications.
The hope is that by using EventSourcery you can focus on modeling your domain with aggregates, commands, and events; and not worry about stitching together application plumbing.
The example EventSourcery application is intended to illustrate concepts in EventSourcery, how they relate to each other, and how to use them in practice. If you'd like a succinct look at the library in practice take a look at that.
Otherwise you will generally need to add both event_sourcery and event_sourcery-postgres to your application.
If Event Sourcing or CQRS is a new concept to you, we highly recommend you watch An In-Depth Look at Event Sourcing With CQRS. It explores some of the theory behind both Event Sourcing & CQRS and will help you better understand the building blocks of the Event Sourcery framework.
There are several ways to configure EventSourcery to your liking. The following presents some examples:
EventSourcery.configure do |config|
# Add custom reporting of errors occurring during event processing.
# One might set up an error reporting service like Rollbar here.
config.on_event_processor_error = proc { |exception, processor_name| … }
config.on_event_processor_critical_error = proc { |exception, processor_name| … }
# Enable EventSourcery logging.
config.logger = Logger.new('logs/my_event_sourcery_app.log')
# Customize how event body attributes are serialized
config.event_body_serializer
.add(BigDecimal) { |decimal| decimal.to_s('F') }
# Config how your want to handle event processing errors
config.error_handler_class = EventSourcery::EventProcessing::ErrorHandlers::ExponentialBackoffRetry
end
Run the setup
script, inside the project directory to install the gem dependencies and create the test database (if it is not already created).
./bin/setup
Then you can run the test suite with rspec:
bundle exec rspec
To release a new version:
lib/event_sourcery/version.rb
bundle exec rake release
, this will create a git tag for the
version, push tags up to GitHub, and package the code in a .gem
file.Not sure what Event Sourcing (ES), Command Query Responsibility Segregation (CQRS), or even Domain-Driven Design (DDD) are? Here are a few links to get you started:
Below is a high level view of a CQRS, event-sourced web application built using EventSourcery. The components marked with *
can be created using building blocks provided by EventSourcery. Keep on reading and we'll describe each of the concepts illustrated.
┌─────────────┐ ┌─────────────┐
│ │ │ │
│ Client │ │ Client │
│ │ │ │
└─────────────┘ └─────────────┘
│ │
Issue Command Issue Query
│ │
┌───────┴──────────────────────────────────────┴─────────┐
│ Web Layer │
└───────┬──────────────────────────────────────┬─────────┘
│ │
▼ ▼
┌─────────────┐ ┌─────────────┐
│ Command │ │Query Handler│
│ Handler │ │ │
└─────────────┘ └─────────────┘
│ │
▼ ┌───────▼─────┐
┌─────────────┐ │┌────────────┴┐
│ * Aggregate │ ││* Projection │
│ │ └┤ │
└─────────────┘ └─────────────┘
│ ▲
│ │
│ Update Projection
│ │
Emit Event ┌─────────────┐
│ │┌────────────┴┐
│ ││ * Projector │
▼ └┤ │
┌─────────────┐ └─────────────┘
│* Event Store│ Process ▲
┌─▶│ │────────Event───────────────────┘
│ └─────────────┘
│ │
│ Process ┌─────────────┐
│ Event │┌────────────┴┐ ┌ ─ ─ ─ ─ ─ ─ ┐
│ └───────▶││ * Reactor │ External
│ └┤ │───Trigger ───▶│ System │
│ └─────────────┘ Behaviour ─ ─ ─ ─ ─ ─ ─
│ │
│ │
└────────Emit Event────────┘
Events are value objects that record something of meaning in the domain. Think of a sequence of events as a time series of immutable domain facts. Together they form the source of truth for our application's state.
Events are targeted at an aggregate via an aggregate_id
and have the following attributes.
module EventSourcery
class Event
attr_reader \
:id, # Sequence number
:uuid, # Unique ID
:aggregate_id, # ID of aggregate the event pertains to
:type, # type of the event
:body, # the payload (a hash)
:version, # Version of the aggregate
:created_at, # Created at date
:correlation_id # Correlation ID for tracing purposes
# ...
end
end
You can define events in your domain as follows.
TodoAdded = Class.new(EventSourcery::Event)
# An example instance.
# #<TodoAdded:0x007fb6f88f04b0
# @id=24,
# @uuid="75dcc7eb-33c0-4f1c-ac23-31bf32fc5edc",
# @aggregate_id="fca315ff-d45d-46c5-a230-67c5bec0b06d",
# @type="todo_added",
# @body={"title"=>"My task"},
# @version=1,
# @created_at=2017-06-14 11:50:32 UTC,
# @correlation_id="b4d1e31d-9d1b-4ea1-a685-57936ce65a80">
The event store is a persistent store of events.
EventSourcery currently supports a Postgres-based event store via the event_sourcery-postgres gem.
For more information about the EventStore
API refer to the postgres event store or the in memory event store in this repo
Naturally, it provides the ability to store events. The event store is append-only and immutable. The events in the store form a time-ordered sequence which can be viewed as a stream of events.
EventStore
clients can optionally provide an expected version of event when saving to the store. This provides a mechanism for EventStore
clients to effectively serialise the processing they perform against an instance of an aggregate.
When used in this fashion the event store can be thought of as an event sink.
The EventStore
also allows clients to read events. Clients can poll the store for events of specific types after a specific event ID. They can also subscribe to the event store to be notified when new events are added to the event store that match the above criteria.
When used in this fashion the event store can be thought of as an event source.
An aggregate is a cluster of domain objects that can be treated as a single unit. Every transaction is scoped to a single aggregate. An aggregate will have one of its component objects be the aggregate root. Any references from outside the aggregate should only go to the aggregate root. The root can thus ensure the integrity of the aggregate as a whole.
Clients execute domain transactions against the system by issuing commands against aggregate roots. The result of these commands is new events being saved to the event store.
A typical EventSourcery application will have one or more aggregate roots with multiple commands.
A central part of EventSourcery is the processing of events in the store. EventSourcery provides the Event Stream Processor abstraction to support this.
┌─────────────┐ Subscribe to the event store
│Event Stream │ and take some action. Tracks
│ Processor │◀─ ─ ─ ─ ─its position in the stream in
│ │ a way that suits its needs.
└─────────────┘
▲
┌────────┴───────────┐
│ │
│ │
┌─────────────┐ ┌─────────────┐
Listens for events and takes │ │ │ │ Listens for events and
action. Actions include ─▶│ Reactor │ │ Projector │◀─ ┐ projects data into a
emitting new events into the ─ ┘ │ │ │ │ ─ ─ projection.
store and/or triggering side └─────────────┘ └─────────────┘
effects in the world.
A typical EventSourcery application will have multiple projectors and reactors running as background processes.
Event Stream Processors (ESPs) subscribe to an event store. They read events from the event store and take some action.
When newly created, an ESP will process the event stream from the beginning. When catching up like this an ESP can process events in batches (currently set to 1,000 events). This allows them to optimise processing as desired.
ESPs track the position in the event stream that they've processed in a way that suits them. This allows for them to optimise transaction handling in the case where they are catching up for example.
A Projector is an EventStreamProcessor that listens for events and projects data into a projection. These projections are generally consumed on the read side of the CQRS world.
Projectors tend to be built for specific read-side needs and are generally specific to a single read case.
Modifying a projection is achieved by creating a new projector.
A Reactor is an EventStreamProcessor that listens to events and emits events back into the store and/or trigger side effects in the world.
They typically record any external side effects they've triggered as events in the store.
Reactors can be used to build process managers or sagas.
An EventSourcery application will typically have multiple ESPs running. EventSourcery provides a class called ESPRunner which can be used to run ESPs. It runs each ESP in a forked child process so each ESP can process the event store independently. You can find an example in event_sourcery_todo_app.
Note that you may instead choose to run each ESP in their own process directly. The coordination of this is not currently provided by EventSourcery.
Below we see the typical flow of state in an EventSourcery application (arrows indicate data flow). Note that steps 1 and 2 are not synchronous. This means EventSourcery applications need to embrace eventual consistency.
1. Issue Command │ 2. Update Projection │ 3. Issue Query
│ │
│ ▲
│ │ │ │
│ │
│ │ │ F. Handle
B. Handle Query
Command │ │ │
│ │
│ │ │ │
▼ ┌─────────────┐ │
┌─────────────┐ │ │ │ │ ┌─────────────┐
│ │ ┌───────▶│ Projector │ │ │
┌─▶│ Aggregate │ │ │ │ │ │ │Query Handler│
│ │ │ │ └─────────────┘ │ │
│ └─────────────┘ │ D. Read │ │ └─────────────┘
│ │ event E. Update ▲
A. Load C. Emit │ │ Projection │ │
state from Event │ │ G. Read
events │ │ │ │ │ Projection
│ ▼ │ ▼ │
│ ┌─────────────┐ │ │ ┌─────────────┐ │ │
│ │ │ │ │ │ │
└──│ Event Store │───┼───┘ │ Projection │────┼───────────────┘
│ │ │ │
└─────────────┘ │ └─────────────┘ │
│ │
A command comes into the application and is routed to a command handler. The command handler initialises an aggregate and loads up its state from events in the store. The command handler then defers to the aggregate to handle the command. It then stores any new events raised by the aggregate into the event store.
class AddTodoCommandHandler
def handle(id:, title:, description:)
# The repository provides access to the event store for saving and loading aggregates
repository = EventSourcery::Repository.new(
event_source: EventSourcery.config.event_source,
event_sink: EventSourcery.config.event_sink,
)
# Load up the aggregate from events in the store
aggregate = repository.load(TodoAggregate, id)
# Defer to the aggregate to execute the add command.
# This may raise new events in the aggregate which we'll need to save.
aggregate.add(title, description)
# Save any newly raised events back into the event store
repository.save(aggregate)
end
end
You can think of projections as read-only models. They are created and updated by projectors and show different views over the events that are the source of truth for our application state. Projections are typically stored as database tables.
Projecting is process of converting (or collecting) a stream of events into these database tables. You can think of this process as a fold over a sequence of events.
A projector is a process that listens for new events in the event store. When it sees a new event it cares about it updates its projection.
class OutstandingTodosProjector
include EventSourcery::Postgres::Projector
projector_name :outstanding_todos
# Database table that forms the projection.
table :outstanding_todos do
column :todo_id, 'UUID NOT NULL'
column :title, :text
column :description, :text
end
# Handle TodoAdded events by adding the todo to our projection.
project TodoAdded do |event|
table.insert(
todo_id: event.aggregate_id,
title: event.body['title'],
description: event.body['description'],
)
end
# Handle TodoCompleted events by removing the todo from our projection.
project TodoCompleted, TodoAbandoned do |event|
table.where(todo_id: event.aggregate_id).delete
end
end
A query comes into the application and is routed to a query handler. The query handler queries the projection directly and returns the result.
module OutstandingTodos
# Query handler that queries the projection table.
class QueryHandler
def handle
EventSourceryTodoApp.projections_database[:outstanding_todos].all
end
end
end