Open einari opened 2 years ago
EventSequenceQueueCacheCursor
could be extended with start and end as optional parts. In addition it could have a "follow up" event sequence id. This could then be used to continue delivering events from the branch sequence after the cursor is exhausted.
The cursor for the branch scenario would start at the beginning of the event log and stop at the beginning of the branch and then continue within the branch. Once within a branch, we won't get any changes in the main event log.
[Observer("07d1ced0-b6a3-432d-883d-12d5ab840828", forBranchType: BranchTypes.BulkPayment]
public class SomeObserver
{
}
public class BulkPayments
{
readonly IEventLog _eventLog;
public BulkPayments(IEventLog eventLog) => _eventLog = eventLog;
public async Task<Branch> InitiateFor(AccountId accountId)
{
return await _eventLog.Branch(BranchTypes.BulkPayment, new { key = "value" });
}
}
public record AccountBalance(double Balance);
public class BalanceProjection : IImmediateProjectionFor<AccountBalance>
{
public ProjectionId Identifier => "14e8d5b0-9476-4059-a5e2-09439a98a890";
public void Define(IProjectionBuilderFor<AccountsCounter> builder) => builder
.From<DepositToDebitAccountPerformed>(_ => _
.Add(model => model.Balance).With(@event => @event.Amount))
.From<WithdrawalFromDebitAccountPerformed>(_ => _
.Subtract(model => model.Balance).With(@event => @event.Amount))
}
Using it:
immediateProjections.GetInstanceById(accountId, branchId: branchId);
For materialized projections, we could have a dependency tracking mechanism that would sense when there are changes that affects something that a query for a projection is depending on. This would take a copy of an existing projected document before it is modified as the consequence of a new event in the event log into a branch specific collection for all branches affected when a new event occurs on the event log that would affect any result.
For knowing which dependencies one has:
[Route("/api/some/route/{branchId}"]
public class QueryController : Controller
{
readonly IMongoCollection<ReadModel> _collection;
public QueryController(IBranchedMongoCollections collections)
{
var branchId = (BranchId)RouteData.Values("branchId").ToString();
_collection = collections.GetFor<ReadModel>(BranchType, branchId);
}
}
This could also be added as metadata:
[Route("/api/some/route/{branchId}"]
[Branch(branchType: MyBranchTypeIdentifier, routeValue:"BranchId")]
public class QueryController : Controller
{
readonly IMongoCollection<ReadModel> _collection;
public QueryController(IMongoCollection<ReadModel> collection) => _collection = collection;
}
The purpose of this type of mechanism is to provide the correct collection. This collection will need to have a read through mechanism. Basically it will have to run whatever query it has twice; once for the original collection it projects to and then once for the branch and then combine the results.
To avoid getting items that are created after the sequence number the branch starts from, we should keep track on the document of which sequence number created it and the last sequence number that updated it.
When an event is appended to event log any projections affected will be executed. These will then look at what queries exist that is linked to the branch type and know the dependency. If there are no branches for the branch type, it won't do anything. For all branches it will look if there is an initial document for the branch for the instance it is about to project to, copy it to a branch collection.
The projections will need to subscribe to events appended to all branches as it does for the event log.
When a branch is concluded, we will have to clean up all read models collections that are specific for the branch.
Tasks:
Design notes: