Rita is a toolkit of various event-centric and reactive abstractions build on top of NATS.
NOTE: This package is under heavy development, so breaking changes will likely be introduced. Feedback is welcome on API design and scope. Please open an issue if you have something to share!
Requires Go 1.18+
go get github.com/bruth/rita
See the blog post that introduces the initial design of Rita.
Rita comes with an opt-in, but recommended, type registry which provides [de]serialization transparency as well as providing consistency of usage. Fundamentally, we are defining names and mapping them to concrete types.
Given a couple of types in a domain model:
type OrderPlaced struct {}
type OrderShipped struct {}
We can create a registry, associating names. If the names are intended to be shared and portable across other languages, then some thought should be put into the naming structure. For example, the CloudEvents spec suggest using the reverse DNS name.
tr, err := rita.NewTypeRegistry(map[string]*rita.Type{
"com.example.order-placed": {
Init: func() any { return &OrderPlaced{} },
},
"com.example.order-shipped": {
Init: func() any { return &OrderShipped{} },
},
})
Each type currently supports a Init
function to allocate a new value with any default fields set. The registry also accepts an optional rita.Codec()
option for overriding the default codec for [de]serialization.
Once the set of types that will be worked with are reigstered, we can initialize a Rita instance by passing the NATS connection and the registry as an option.
r, err := rita.New(nc, rita.TypeRegistry(tr))
What is the behavior without a type registry? The expectation is that the type name is explicitly provided and the data is a
[]byte
.
Create an event store for orders. This will default to a subject of orders.>
. Note, other than a few different defaults to follow the semantics of an event store, the stream is just an vanilla JetStream and can be managed as such.
// Get a handle to the event store with a name.
es := r.EventStore("orders")
// Create the store by providing a stream config. By default, the bound
// subject will be "orders.>". This operation is idempotent, so it can be
// safely during application startup time.
err := es.Create(&nats.StreamConfig{
Replicas: 3,
})
Append an event to the orders.1
subject on the stream. ExpectSequence
can be set to zero means no other events should be associated with this subject yet.
seq1, err := es.Append("orders.1", []*rita.Event{
{Data: &OrderPlaced{}},
}, rita.ExpectSequence(0))
Append an another event, using the previously returned sequence.
seq2, err := es.Append("orders.1", []*rita.Event{
{Data: &OrderShipped{}},
rita.ExpectSequence(seq1))
Load the events for the subject. This returns a slice of *Event
values where the Data
value for each event has been pre-allocated based on the Type
using the registry.
events, lastSeq, err := es.Load("orders.1")
The lastSeq
value indicates the sequence of the last event appended for this subject. If a new event needs to be appended, this should be used with ExpectSequence
.
Although event sourcing involves modeling and persisting the state transitions as events, we still need to derive state in order to make decisions when commands are received.
Fundamentally, we have a model of state, and then we need to evolve the state given each event. We can model this as an interface in Go.
type Evolver interface {
Evolve(event *Event) error
}
An implementation would then look like:
type Order struct {
// fields..
}
func (o *Order) Evolve(event *Event) error {
// Switch on the event type or data (if using the type registry).
switch e := event.Data.(type) {
}
}
Given this model, we can use the Evolve
method on EventStore
for convenience.
var order Order
lastSeq, err = es.Evolve("orders.1", &order)
This also works for a cross-cutting state (all orders) using a subject wildcard.
var orderList OrderList
lastSeq, err = es.Evolve("orders.*", &orderlist)
It may be desirable to keep some state in memory for a period of time and then only request new events that have occurred since the state was created previously.
This can be achieved by simply passing the same state value (already evolved) along with rita.AfterSequence(lastSeq)
option.
lastSeq, err = es.Evolve("orders.1", &order, rita.AfterSequence(lastSeq))
This will only fetch the events after the last event that was received previously and evolve the state up to the latest known event.
Although features are checked off, they are all in a pre-1.0 state and subject to change.