Currently, projection finalization is implemented like this:
package example
type Foo struct { ... }
func (*Foo) ApplyEvent(event.Event) {}
func (f *Foo) finalize(ctx context.Context, dep SomeDependency) error {
// do stuff
if err := dep.Do(ctx, "..."); err != nil {
return err
}
// do more stuff
return nil
}
func example(s projection.Schedule) {
var dep SomeDependency
s.Subscribe(context.TODO(), func(ctx projection.Job) error {
refs, errs, err := ctx.Aggregates(ctx)
if err != nil {
return fmt.Errorf("extract aggregates: %w", err)
}
return streams.Walk(ctx, func(ref aggregate.Ref) error {
foo := NewFoo(ref.ID)
if err := ctx.Apply(ctx, foo); err != nil {
return err
}
return foo.finalize(ctx, dep)
}, refs, errs)
})
}
Finalization is done for each individual projection after the job applies its events. A nice addition would be if finalization could be batched and deferred to the end of the projection job, like this:
package example
func example(s projection.Schedule) {
var dep SomeDependency
s.Subscribe(context.TODO(), func(ctx projection.Job) error {
refs, errs, err := ctx.Aggregates(ctx)
if err != nil {
return fmt.Errorf("extract aggregates: %w", err)
}
return streams.Walk(ctx, func(ref aggregate.Ref) error {
foo := NewFoo(ref.ID)
if err := ctx.Apply(ctx, foo); err != nil {
return err
}
return ctx.Defer(func() error {
// this call will be deferred to after this projection update
return foo.finalize(ctx, dep)
})
}, refs, errs)
})
}
Currently, projection finalization is implemented like this:
Finalization is done for each individual projection after the job applies its events. A nice addition would be if finalization could be batched and deferred to the end of the projection job, like this: