A framework for implementing the event-sourcing pattern easily in Go.
To install this package, please use gopkg.in instead of Github:
go get gopkg.in/go-gadgets/eventsourcing.v0
The features of this framework are:
json:"name"
tag on your aggregates/events to persist fields, without worrying about your underlying storage engine.Event-Sourcing is an architectural pattern in which the state of an entity in your application is modelled as a series of events, mutating the state. For example, we may store the history of a bank account:
Aggregate Key | Sequence | Event Data |
---|---|---|
123456 | 1 | Account Created |
123456 | 2 | Deposit ($50) |
123456 | 3 | Withdrawl ($25) |
If we now had to consider a bank account withdrawl, we would:
At any given point in time, we can track and identify the state of an entity. It's also possible to understand exactly the sequence of events that led to an outcome being selected.
An aggregate (root) is an entity that's defined by the series of events that happen to it. In this simple example (found under /examples/counter
within this repository), we'll look at an aggregate that counts the times it's incremented:
var registry eventsourcing.EventRegistry
func init() {
registry = eventsourcing.NewStandardEventRegistry("ExampleDomain")
registry.RegisterEvent(IncrementEvent{})
}
type CounterAggregate struct {
eventsourcing.AggregateBase
Count int
}
// IncrementEvent is an event that moves the counter up.
type IncrementEvent struct {
}
// Initialize the aggregate
func (agg *CounterAggregate) Initialize(key string, store eventsourcing.EventStore, state eventsourcing.StateFetchFunc) {
agg.AggregateBase.Initialize(key, registry, store, state)
agg.AutomaticWireup(agg)
}
// HandleIncrementCommand handles an increment command from the bus.
func (agg *CounterAggregate) HandleIncrementCommand(command IncrementCommand) ([]eventsourcing.Event, error) {
// Insert domain rules here.
// Raise the events
return []eventsourcing.Event{
IncrementEvent{},
}, nil
}
// ReplayIncrementEvent updates the counter by adding one.
func (agg *CounterAggregate) ReplayIncrementEvent(event IncrementEvent) {
agg.Count++
}
In this example we have:
Note the use of AutomaticWireup(agg)
: this is a helper function that scans a type using reflection and configures:
Replay<EventTypeName>(event <EventTypeName>)
method signature.Handle<CommandTypeName>(command <CommandTypeName>) ([]eventsourcing.Event, error)
method signature.To run this code, we can leverage a memory based store:
package main
import (
"github.com/gin-gonic/gin"
"github.com/go-gadgets/eventsourcing/stores/memory"
)
func main() {
gin.SetMode(gin.ReleaseMode)
store := memory.NewStore()
r := gin.Default()
r.POST("/:name/increment", func(c *gin.Context) {
name := c.Param("name")
agg := CounterAggregate{}
agg.Initialize(name, store, func() interface{} { return &agg })
errCommand := agg.Handle(IncrementCommand{})
if errCommand != nil {
c.JSON(500, errCommand.Error())
return
}
// Show the count
c.JSON(200, gin.H{
"count": agg.Count,
})
})
r.Run() // Listen and serve on 0.0.0.0:8080
}
Don't! Stop! Replay methods should act atomically upon the aggregate and only the aggregate - not calling out to anything else that could impact the decision or control flow. This is a mandatory element for reliable event-sourcing:
You should call any externalities in your Command handling functions, and then once you're satisfied that the model can mutate, you raise the events.
When loading events from the backing stores, all prior events must be loaded and processed in order to 'catch up' and execute the command versus the latest state. In the case of long-lived aggregates, this can be one hell of a long history and will be accordingly slow. It's recommended that:
If you follow these practices, you'll get great performance - and the ability to scale.
There's a few spots where it's required to use reflection/marshalling of types from generic structures (i.e. Turning BSON/JSON back into a structure or vice-versa). Some other areas which leverage reflection can be avoided if you're prepared to do a little bit of extra leg-work:
AutomaticWireup
method to dynamically register event handler methods and command handlers.agg.DefineReplayMethod(eventType, replay func(Event))
to manually define the event type.In short, if you're keen to avoid those reflecftion calls - you can - but there's a price to pay in terms of code effort - and if you're using a real-world backing store generally it won't be a good trade.
This is something that most (possibly all) event-sourcing implementations out there do poorly it seems. The challenges are that document-databases for storing events either have one of two limitations:
If you can accept these shortcomings, there is a publisher
middleware that allows
a distribution publisher to be attached to the event storage process. However, for applications where transactional integrity is paramount it is recommended that you:
This ultimately reduces the complexity required in the package, but also means that you'll get a more reliable delivery of events to targets.
Making a generic command-bus is essentially an exercise in reflection-abuse in Go, so instead the library is currently focused on making BYO-bus as easy as possible. The preferred pattern for this project is that your model gets exposed as a service (i.e. HTTP-ReST or similiar) and then people interact with that, without reference to the fact that under the hood you are using event-sourcing.
Use the aggregate.Run((cb) => {})
methods. During the callback the aggregate will be revived to the latest/current state. Be mindful of using this excessively
though and instead bias towards using projections, unless there is a distinct and genuine reason to hit your event-store with the read commands.