Cratis / ApplicationModel

Opinionated application model providing glue that makes application development more productive.
https://cratis.io
MIT License
0 stars 4 forks source link

Add support for a better way to observe MongoDB change streams for multiple collections #420

Open einari opened 1 week ago

einari commented 1 week ago

If one wants to aggregate (map / reduce) from multiple collections, instead of having to work with a unique change stream per collection, it would be better resource usage if we had one change stream subscription with specific filters.

We want this to be a nice type safe API.

Something in the lines of...

var subject = collection
   .Observe(o => o
      .Join(otherCollection, () => ... filter ...)
      .Join(thirdCollection, () => ... filter ...)
      .Combine((firstDoc, secondDoc, thirdDoc) => {
          // Return new "reduced" type
     })
 );
einari commented 1 week ago

We could implement this by having one ChangeStream connection per database per process (if we can go broader - per server per process, that is fine as well). This could then be exposed through a Subject that we can then build filters and combine on.

The benefit of this would be that we wouldn’t be leaking resources and no matter how many clients get connected, we only have one connection going per process.

We should however look into whether or not we need to use RX async for this to be most efficient.

I am thinking we can have an interface like IMongoDBWatcher (Or similar, not married to the name), which is registered as a singleton. This would then expose the subject itself and possibly necessary methods. The subject might need to a specific type of subject that we can then do the Join operations on.

Instead of joining on collections, we could have something like the following:

public MyClass(IMongoDBWatcher watcher)
{

     public ISubject<CombinedThing> Observe()
     {
            return watcher.Observe<FirstType>()
                         .Join<SecondType>((filter) => /* filterBuilder…. */ )
                         .Join<ThirdType>((filter) => /* filterBuilder…. */)
                         .Select(() => new CombinedThing(…..));
     }
}