JasperFx / marten

.NET Transactional Document DB and Event Store on PostgreSQL
https://martendb.io
MIT License
2.87k stars 457 forks source link

Projecting a subset of events #3569

Open flyingpie opened 1 day ago

flyingpie commented 1 day ago

Hey guys, thank you for the work on this excellent library!

We've set up a system where event types are always the same ("MyEvent" in these examples), so that events can be published without any changes to event ingestion.

This allows us to start publishing events before the projections are in place, and without needing to know any data types around the newly created events (data is passed along as JObject properties, which get parsed during projection).

Each event contains a "stream type", which is what we'd like to use to distinguish events on. I assume that this "stream type" is analogous to what you'd normally use the CLR event type for, and/or the CLR type when calling StartStream<>().

So on the projection side, I'm trying to have a projection only apply to a subset of events, using this "stream type", instead of event CLR type or StartStream<> type.

So far, these attempts I've tried:

Simple SingleStreamProjection, which works, but projects all events, regardless of the "stream type". At the moment Project is called, an aggregate object (will) already exist in the database.

public class MyAggregate : SingleStreamProjection<MyProjection>
{
  public MyAggregate()

    ProjectEvent<IEvent<MyEvent>>((querySession, projection, @event) =>
    {
      // Projection as per usual.
    });
  }
}

EventProjection, which allows us to prevent aggregate docs from being stored, and all in all works quite well.

public class MyAggregate : EventProjection
{
  public MyAggregate()
  {
    ProjectAsync<IEvent<MyEvent>>(
      async (ev, ops) =>
      {
        if (!ev.Data.StreamType.Equals("some stream type", StringComparison.OrdinalIgnoreCase))
        {
          return;
        }

        var doc = await ops.LoadAsync<MyProjection>(ev.Data.StreamId);

        doc ??= new MyProjection()
        {
          Id = ev.Data.StreamId,
          TenantId = ev.Data.Tenant,
          ...
        };

        ops.Store(doc);
      });
  }

}

It's one thing that unrelated events are still processed (would be nice to cap that off at the database level of course). But more problematic for now, is that on projection rebuilds, the existing database objects are not removed.

I've started looking into custom projections, and possibly assigning a different CLR event type on runtime somehow, but I have a feeling that there should be a pretty simple way of doing this, hence this question.

Any help is appreciated!

flyingpie commented 11 hours ago

Additionally, here's the MyEvent definition:

public class MyEvent
{
    [Identity]
    public Guid Id { get; set; } = Guid.NewGuid();

    public string? StreamType { get; set; } // <-- We'd like to apply projections on events with a specific value in here.

    public string? StreamId { get; set; }

    public string? EventId { get; set; }

    public string? EventType { get; set; }

    public JObject? EventData { get; set; }

    public string? Tenant { get; set; }

    public DateTimeOffset Timestamp { get; set; } = DateTimeOffset.UtcNow;
}