Closed bounoable closed 2 years ago
Given an ecommerce app with two product events: Created
(product was created with a name) and Priced
(product was given a price). The app projects a product listing read-model from these two events, which are applied as follows:
Created
event adds a product (name) to the listing but doesn't add any price data.Priced
event adds price data to a product in the listing.package product
const (
Created = "product.created"
Priced = "product.priced"
)
type Product struct { *aggregate.Base }
// Domain logic
func (p *Product) Create(name string) error { ... }
func (p *Product) SetPrice(price uint64) error { ... }
type Listing struct {
mux sync.RWMutex
products []ListingProduct
}
type ListingProduct struct {
ID uuid.UUID
Name string
Price uint64
}
func (l *Listing) Products() []ListingProduct {
l.mux.RLock()
defer l.mux.RUnlock()
out := make([]ListingProduct, 0, len(l.products))
for _, p := range l.products {
if p.Price > 0 {
out = append(out, p)
}
}
return out
}
// Project continuously projects the listing until ctx is canceled.
func (l *Listing) Project(ctx context.Context, bus event.Bus, store event.Store) (<-chan error, error) {
s := schedule.Continuously(bus, store, []string{Created, Priced})
errs, err := s.Subscribe(ctx, func(ctx projection.Job) error {
l.mux.Lock()
defer l.mux.Unlock()
return ctx.Apply(ctx, l)
})
if err != nil {
return nil, err
}
// Manually trigger a projection job to ensure all past events are applied.
go s.Trigger(ctx)
return nil
}
func (l *Listing) ApplyEvent(evt event.Event) {
switch evt.Name() {
case Created:
l.productCreated(evt)
case Priced:
l.productPriced(evt)
}
}
func (l *Listing) productCreated(evt event.Event) {
name := evt.Data().(string)
productID := pick.AggregateID(evt)
l.Products[productID] = ListingProduct{
ID: productID,
Name: name,
}
}
func (l *Listing) productPriced(evt event.Event) {
data := evt.Data().(PricedData)
productID := pick.AggregateID(evt)
for i, p := range l.products {
if p.ID == productID {
l.products[i].Price = data.Price
return
}
}
}
When a Priced
event is published for a product but the projection previously didn't receive the corresponding Created
event of that specific product, the listing will not find the product to update the price.
A possible solution is to manually trigger the projection on (service) startup. Manually triggered projection jobs always fetch the entire event history of the configured events from the store. Problem with this approach is that it may cause (huge) performance problems (for the initial projection job) if the history of the requested events is large.
Using the HistoryDependent API, the listing could hint to the projection job that it requires the entire event history until the first projection job has been applied, but this does effectively the same as simply triggering the schedule on startup.
package product
type Listing struct {
...
requiresFullHistory bool
}
func (l *Listing) RequiresFullHistory() bool {
return l.requiresFullHistory
}
func (l *Listing) applyJob(ctx projection.Job) error {
// apply job ...
// after the first job, it doesn't require the full history anymore
l.requiresFullHistory = false
}
A projection job should be configurable to push additional events into an event stream if needed. Something like this should be possible:
package product
func (l *Listing) Project(ctx context.Context, bus event.Bus, store event.Store) {
s := schedule.Continuously(bus, store, []string{Created, Priced})
errs, err := s.Subscribe(
context.TODO(),
func(ctx projection.Job) error {
l.mux.Lock()
defer l.mux.Unlock()
return ctx.Apply(ctx, l)
},
// projection.BeforeEvent is a subscription option
projection.BeforeEvent(func(ctx context.Context, evt event.Of[uint64]) ([]event.Event, error) {
productID := pick.AggregateID(evt)
// If the listing already contains the product, no additional events need to be fetched.
if l.containsProduct(productID) {
return nil, nil
}
// Otherwise fetch the required `Created` event and apply it before the `Priced` event is applied.
events, errs, err := store.Query(ctx, query.New(
query.Name(Created),
query.Aggregate("product", productID),
))
return streams.Drain(ctx, events, errs)
}, Priced), // event names are provided at the end to allow for intercepting different events with a single option
)
}
The HistoryDependent API duplicates a feature that is already implemented in continuous schedules as manual triggers and in periodic schedules as the default behavior (period schedules always fetch the entire event history from the store). Both the manual trigger feature and HistoryDependent API lead to unnecessary fetches/queries of the entire event history (of the configured events). The proposed projection.BeforeEvent()
option solves this scenario in a performant way.
-> HistoryDependent
is useless.
The proposed solution could be implemented as a generalized solution for any kind of channel, which projection.BeforeEvent
can use under the hood:
package example
func example(events <-chan event.Event) {
events = streams.Before(events, func(evt event.Event) []event.Event {
if evt.Name() == "foo" {
return []event.Event{...}
}
return nil
},)
}
Given the same ecommerce app as before but with an order
service. A read-model is projected for each order. On service startup, all orders should be updated to get to the current/correct state, then it should continuously project the orders when order events are published.
package order
const (
Placed = "order.placed"
Canceled = "order.canceled"
)
type PlacedData struct {
Items []Item
}
type Order struct {
*aggregate.Base
OrderDTO
}
type OrderDTO struct {
ID uuid.UUID
Items []Item
}
type Item struct {
ProductID uuid.UUID
Quantity uint
UnitPrice uint64
}
// Domain logic
func (o *Order) Place(items []Item) error { ... }
func (o *Order) Cancel() error { ... }
type ReadModelRepository = model.Repository[*OrderRM, uuid.UUID]
// OrderRM is the read-model of an order.
type OrderRM struct {
*projection.Progressor
projection.Guard
OrderDTO
}
func NewOrderRM(id uuid.UUID) *OrderRM {
return &OrderRM{
Progressor: projection.NewProgressor(),
// Only allow events of the specific order to be applied onto the read-model.
Guard: projection.QueryGuard(query.New(query.Aggregate("order", id))),
}
}
type Projector struct {
orders ReadModelRepository
schedule *schedule.Continuous
}
func NewProjector(orders ReadModelRepository, bus event.Bus, store event.Store) *Projector {
return &Projector{
orders: orders,
schedule: schedule.Continuously(bus, store, []string{Placed, Canceled}),
}
}
func (proj *OrderProjector) Run(ctx context.Context) (<-chan error, error) {
errs, err := proj.schedule.Subscribe(ctx, proj.applyJob)
if err != nil {
return nil, fmt.Errorf("subscribe to projection schedule: %w", err)
}
// Project all orders on startup.
go proj.schedule.Trigger(ctx)
return errs, nil
}
func (proj *OrderProjector) applyJob(ctx projection.Job) error {
orders, errs, err := proj.extractOrdersFromJob(ctx)
if err != nil {
return fmt.Errorf("extract orders from job: %w")
}
for _, orderID := range orders {
if err := proj.orders.Use(ctx, orderID, func(o *OrderRM) {
// Apply all events in the job onto the read-model.
// The projection.QueryGuard of the read-model ensures that only the events of this specific order are applied.
return ctx.Apply(ctx, o)
}); err != nil {
log.Printf("failed to project order: %v [order=%s]", err, orderID)
}
}
}
func (proj *Projector) extractOrdersFromJob(ctx projection.Job) ([]uuid.UUID, error) {
str, errs, err := ctx.Aggregates(ctx)
if err != nil {
return nil, err
}
refs, err := streams.Drain(ctx, str, errs)
if err != nil {
return nil, err
}
return slice.Map(refs, func(r aggregate.Ref) uuid.UUID { return r.ID })
}
In order to project all order read-models on startup, the manual trigger feature of the projection schedule is used. On startup, the following happens:
Step 6 and 7 are the weird part: The event filter effectively does the same as the disabled progress check. We could simply remove step 6 completely and enable the progress check in step 7.
When the ProgressAware API was added to goes, progress checks weren't disabled. That behavior could have potentially lead to invalid projection builds because of the following problem:
A projection may require events of many different aggregates. While goes guarantees that no two events of an aggregate's event stream have the same time, the same is not true for events of different aggregates. This means that a projection may apply multiple events that have the same time. Now, the ProgressAware API keeps track of the projection progress in terms of the time of the last applied event, which could potentially cause events to not be applied to a projection because another event with the same time has already been applied. For this reason, progress checks were disabled in projection jobs.
But this doesn't really make sense. The progress check just moved into the projection job as a filter, which does exactly the same as before.
Progress will be tracked not only in terms of time but also in terms of the ids of the applied events that have that exact time. An event is applied if its time is either after the time of the last applied events, or if its time is exactly the same as the last event time and the tracked last event ids do not contain the id of the event.
IgnoreProgress()
option can be removed from step 7Instead of fetching the entire event history from the event store, projection jobs modify the query so that the query respects the progress of a ProgressAware projection. This differs from the current implementation of step 6 in that it's not implemented as an in-memory filter (that is applied after the actual query) but as an actual change to the query itself.
In order to extract the order ids through the job from the store, but without actually fetching all order events, the following is done:
package order
func (proj *OrderProjector) Run(ctx context.Context) (<-chan error, error) {
// The projection.Startup() option triggers a projection job on startup, using the provided query
// to fetch the events from which the order ids are extracted. In this case, we fetch all Placed events.
return proj.schedule.Subscribe(ctx, proj.applyJob, projection.Startup(query.New(
query.Name(Placed),
)))
}
func (proj *OrderProjector) applyJob(ctx projection.Job) error {
orders, errs, err := proj.extractOrdersFromJob(ctx)
if err != nil {
return fmt.Errorf("extract orders from job: %w")
}
for _, orderID := range orders {
if err := proj.orders.Use(ctx, orderID, func(o *OrderRM) {
// The job queries the event store for all configured events.
// The progress of the read-model is used to query only those events
// that happened after the last applied event.
return ctx.Apply(ctx, o)
}); err != nil {
log.Printf("failed to project order: %v [order=%s]", err, orderID)
}
}
}
func (proj *Projector) extractOrdersFromJob(ctx projection.Job) ([]uuid.UUID, error) {
// Here, the query that was passed to projection.Startup() is used to extract the orders.
str, errs, err := ctx.Aggregates(ctx)
if err != nil {
return nil, err
}
refs, err := streams.Drain(ctx, str, errs)
if err != nil {
return nil, err
}
return slice.Map(refs, func(r aggregate.Ref) uuid.UUID { return r.ID })
}
A new projection.Startup()
option can be provided when subscribing to a projection schedule. If provided, the schedule triggers a projection job on startup but modifies the job's Aggregates()
helper so that it uses the query that was provided to projection.Startup() instead of the default query that fetches the entire history of the configured events. This makes the extraction of the orders way more performant (because only 1 event needs to be fetched for each order).
Currently, when applying a projection job onto one or multiple projections, the actual event query is only executed once. Subsequent calls to a job's Events()
, EventsOf()
, Aggregates()
and Apply()
methods re-use the result of the first query and only add in-memory filters to the result to return the correct events for a given projection.
With this change, instead of running a single (possibly costly) query once and then applying in-memory filters for each individual projection afterwards, a query is executed for each individual projection, but only within the EventsOf()
and Apply()
methods of a job; the Events()
method still queries the entire event history from the store, and Aggregates()
uses either the query that was provided to projection.Startup()
, or it also fetches the entire history to do the extraction.
Either way, a job caches all queries, so the same query will never run twice.
The current ProgressAware
implementation is flawed because it does not account for multiple events that possibly have the same time. Solution is to also track the ids of the last applied events that have the same time. Using this correct implementation, projection jobs can optimize queries for projections to avoid fetching the entire event history of a given set of events.
-> ProgressAware
is useful.
Closed by #36
This issue will determine if the different projection APIs are even necessary to solve common problems or if there are better solutions to these problems that don't require these APIs.
HistoryDependent
The
HistoryDependent
API is used by projection jobs to determine which events a projection needs to properly update itself. If a projection implements HistoryDependent, it can hint to a projection job that it requires the full history of the events that are configured in the job, instead of just the (published) events that triggered the job.ProgressAware
The
ProgressAware
API is usually used by persistent projections that are stored in a database. When a projection job is triggered, the application fetches the current projection state from the database, applies the events on it and saves it back to the database. A ProgressAware projection keeps track of the last applied event in terms of the event time, to ensure that no event is applied twice to the projection.