Closed joshdixon closed 1 year ago
First of all, thanks for the effort and provided working code. We found your project just when we started thinking about developing this integration ourselves. We will try to give feedback as we develop our project.
Could you write in the commit that nuget has been deployed please?
I'm writing code based on the PersistentSubscription feature of EventStore DB, which is very efficient. A beta version should be released in a few days.
Awesome! Thanks for the effort. Do you plan on supporting making the stream rewindable? I can imagine this would be useful in an event sourcing context where you add a new projection/read model and need to build it up from previously stored events from the start of time.
I have completed the first version and am currently testing it. However, I always feel that it's not good without the Rewindable feature. I am rewriting some parts of the code, hoping to add support for it.
Now it has been successfully tested with support for rewindable feature. Check my Chat Room example, where enter a Version Number when joining a room to subscribe from a specified version.
Rewindable is an internal feature of Orleans, rather than a service provided by the backend Event Store or other message queues. Therefore, subscribing from an unknown point will cause a non-fatal exception. Currently, only Orleans' built-in binary serialization is supported and not JSON serialization. I will improve this in the next version. However, it is important to note that JSON serialization is less efficient and produces larger payloads.
First of all, thanks for the effort and provided working code. We found your project just when we started thinking about developing this integration ourselves. We will try to give feedback as we develop our project.
Could you write in the commit that nuget has been deployed please?
Now it is released.
Thanks for the updates, been trying it out this weekend and it's been great. I'm not sure if this is something that should be supported. but I'm using the EventSourcing Log Consistency Provider and am then trying to subscribe to events persisted in that provider by subscribing to the $et-eventname stream using an implicit stream subscription on a different grain. I am running into the following exception:
System.ArgumentException: Unable to parse "{"$v":"4:-1:1:4","$c":130688,"$p":130688,"$causedBy":"d3da7053-5723-4a39-a8f4-f7c2ae4622bb"}" as a stream id
at Orleans.Runtime.StreamId.Parse(ReadOnlySpan`1 value) in /_/src/Orleans.Streaming/StreamId.cs:line 208
at Orleans.Providers.Streams.EventStore.EventStoreQueueDataAdapterV2.GetStreamId(EventRecord queueMessage)
at Orleans.Providers.Streams.EventStore.EventStoreQueueDataAdapterV2.GetStreamPosition(EventRecord queueMessage)
at Orleans.Providers.Streams.EventStore.EventStoreQueueCache.Add(List`1 queueMessages, DateTime dequeueTimeUtc)
at Orleans.Providers.Streams.EventStore.EventStoreQueueAdapterReceiver.GetQueueMessagesAsync(Int32 maxCount)
at Orleans.Streams.PersistentStreamPullingAgent.ReadFromQueue(QueueId myQueueId, IQueueAdapterReceiver rcvr, Int32 maxCacheAddCount) in /_/src/Orleans.Streaming/PersistentStreams/PersistentStreamPullingAgent.cs:line 479
at Orleans.Internal.AsyncExecutorWithRetries.ExecuteWithRetriesHelper[T](Func`2 function, Int32 maxNumSuccessTries, Int32 maxNumErrorTries, TimeSpan maxExecutionTime, DateTime startExecutionTime, Func`3 retryValueFilter, Func`3 retryExceptionFilter, IBackoffProvider onSuccessBackOff, IBackoffProvider onErrorBackOff) in /_/src/Orleans.Core/Async/AsyncExecutorWithRetries.cs:line 234
I can understand this probably isn't an expected use case, however I'm trying to use the stream to be able to create read models by subscribing to events from journaled grains. Do you think this is a good way of going about, it, if not what do you think would be a better way?
The correct way to create a read model is to send a stream event after JournaledGrain is persisted. On the subscriber side (implicit subscription), the event is received and do some transforms and saved to the read model database.
Since the JSON persisted to the Grain Storage is an event or a grain state you defined in Orleans, there is no need to rely on the EventStore stream subscription and deserialize them. You only need to save and send events in your own publisher grain and receive them in your subscriber grain.
Here is an example that uses the Result Patterns library (Orleans.FluentResults):
public abstract class EventSourcingGrain<TState> : JournaledGrain<TState, DomainEvent>, IGrainWithGuidKey
where TState : class, new()
{
// ignore definitions and OnActivateAsync/OnDeactivateAsync .....
/// <summary>
/// Apply an event and send it through the stream.
/// </summary>
protected Task<Result<bool>> PublishPersistedAsync(DomainEvent evt)
{
return Result.Ok()
// Use internal RaiseEvent or RaiseConditionalEvent to apply changes to snapshot state.
.MapTryAsync(() => RaiseConditionalEvent(evt))
// Custom persistence operation such as saving snapshot to database by using EntityFramework.
.MapTryIfAsync(raised => raised, _ => PersistAsync(evt))
// If the log event and snapshot state are persisted to logs db and state db, send event...
.TapTryAsync(() => _stream.OnNextAsync(evt with { Version = Version }, new EventSequenceTokenV2(Version)));
}
/// <summary>
/// Send an error event through the stream.
/// </summary>
protected Task<Result> PublishErrorAsync(ErrorOccurredEvent evt)
{
return Result.Ok().TapTryAsync(() => _stream.OnNextAsync(evt, new EventSequenceTokenV2(Version)));
}
/// <summary>
/// Custom persistence operation such as saving snapshot to database by using EntityFramework.
/// </summary>
protected abstract Task<bool> PersistAsync(DomainEvent evt);
}
public abstract class EventSubscriberGrain : Grain, IGrainWithGuidKey
{
// ignore definitions .....
public override async Task OnActivateAsync(CancellationToken cancellationToken)
{
await base.OnActivateAsync(cancellationToken);
_streamProvider = this.GetStreamProvider(_provider);
_stream = _streamProvider.GetStream<DomainEvent>(StreamId.Create(_nameSpace, this.GetPrimaryKey()));
_streamSubscription = await _stream.SubscribeAsync(HandleNextAsync, HandleExceptionAsync, HandCompleteAsync);
}
/// <summary>
/// Receive and process the event.
/// </summary>
protected abstract Task HandleNextAsync(DomainEvent evt, StreamSequenceToken token);
protected abstract Task HandleExceptionAsync(Exception exception);
protected abstract Task HandCompleteAsync();
}
These code are samples of implementation:
[LogConsistencyProvider(ProviderName = Constants.LogConsistencyStoreName)]
[StorageProvider(ProviderName = Constants.SalesStoreName)]
public sealed class SnackGrain : EventSourcingGrain<Snack>, ISnackGrain
{
// ignore definitions .....
public Task<Result<bool>> ChangeNameAsync(SnackChangeNameCommand cmd)
{
var id = this.GetPrimaryKey();
return Result.Ok()
.Ensure(State.IsDeleted == false, $"Snack {id} has already been removed.")
.TapErrorTryAsync(errors => PublishErrorAsync(new SnackErrorOccurredEvent(id, ErrorCodes.SnackRemoved.Value, errors.ToReasons(), cmd.TraceId, DateTimeOffset.UtcNow, cmd.OperatedBy, Version)))
.EnsureAsync(State.IsCreated, $"Snack {id} is not initialized.")
.TapErrorTryAsync(errors => PublishErrorAsync(new SnackErrorOccurredEvent(id, ErrorCodes.SnackNotInitialized.Value, errors.ToReasons(), cmd.TraceId, DateTimeOffset.UtcNow, cmd.OperatedBy, Version)))
.EnsureAsync(State.Name.Length <= 100, $"The name of snack {id} is too long.")
.TapErrorTryAsync(errors => PublishErrorAsync(new SnackErrorOccurredEvent(id, ErrorCodes.SnackNameTooLong.Value, errors.ToReasons(), cmd.TraceId, DateTimeOffset.UtcNow, cmd.OperatedBy, Version)))
// All preconditions have been met, save and publish the event.
.BindTryAsync(() => PublishPersistedAsync(new SnackNameChangedEvent(id, cmd.Name, cmd.TraceId, DateTimeOffset.UtcNow, cmd.OperatedBy, Version)));
}
}
[ImplicitStreamSubscription(Constants.SnackNamespace)]
public sealed class SnackSubscriberGrain : EventSubscriberGrain
{
// ignore definitions .....
public override async Task OnActivateAsync(CancellationToken cancellationToken)
{
await base.OnActivateAsync(cancellationToken);
_dbContext = _scope.ServiceProvider.GetRequiredService<SalesDbContext>();
}
public override async Task OnDeactivateAsync(DeactivationReason reason, CancellationToken cancellationToken)
{
await _streamSubscription!.UnsubscribeAsync();
await _scope.DisposeAsync();
await base.OnDeactivateAsync(reason, cancellationToken);
}
protected override Task HandleNextAsync(DomainEvent evt, StreamSequenceToken seq)
{
switch (evt)
{
case SnackNameChangedEvent snackEvt:
return ApplyEventAsync(snackEvt);
case SnackErrorOccurredEvent snackErrorEvt:
_logger.LogWarning($"Received SnackErrorOccurredEvent: {string.Join(';', snackErrorEvt.Reasons)}");
return Task.CompletedTask;
default:
return Task.CompletedTask;
}
}
protected override Task HandleExceptionAsync(Exception exception)
{
_logger.LogError(exception, exception.Message);
return Task.CompletedTask;
}
protected override Task HandCompleteAsync()
{
_logger.LogInformation($"Stream {Constants.SnackNamespace} is completed.");
return Task.CompletedTask;
}
private async Task<bool> ApplyEventAsync(SnackNameChangedEvent evt, CancellationToken cancellationToken = default)
{
try
{
var snack = await _dbContext.Snacks.FindAsync(evt.Id);
if (snack == null)
{
_logger.LogWarning($"Apply SnackNameChangedEvent: Snack {evt.Id} does not exist in the database. Try to execute full update...");
return await ApplyFullUpdateAsync(evt, cancellationToken);
}
if (snack.Version != evt.Version - 1)
{
_logger.LogWarning($"Apply SnackNameChangedEvent: Snack {evt.Id} version {snack.Version}) in the database should be {evt.Version - 1}. Try to execute full update...");
return await ApplyFullUpdateAsync(evt, cancellationToken);
}
snack.Name = evt.Name;
snack.LastModifiedAt = evt.OperatedAt;
snack.LastModifiedBy = evt.OperatedBy;
snack.Version = evt.Version;
return await _dbContext.SaveChangesAsync(cancellationToken) > 0;
}
catch (Exception ex)
{
_logger.LogError(ex, "Apply SnackNameChangedEvent: Exception is occurred when try to write data to the database. Try to execute full update...");
return await ApplyFullUpdateAsync(evt, cancellationToken);
}
}
private async Task<bool> ApplyFullUpdateAsync(SnackEvent evt, CancellationToken cancellationToken = default)
{
.......
}
}
Thanks. Fair enough, I was thinking that might be the case. Wouldn't this cause the events to be duplicated in storage?
This is just another option, the difference between synchronous and asynchronous data persistence. However, there are some concepts that need to be clarified. The storage of data in Grain memory only includes raw data, and not as many properties as contained in the read data model.
The read data model typically includes Lookup/Join data, such as redundant product names and product images that need to be saved in order entity. There are also some aggregated data at the sub-level, such as the total number of likes, comments, and average ratings required in user posts.
The grain activated by the implicit stream automatically needs to integrate this data and then save it to the database. In addition, if there are aggregation requirements, it is necessary to report to the upper-level aggregation grain that the data has been added, updated, or deleted.
It is worth noting that the grain activated by the implicit streammay not originate from just one publisher. For example, there is a ProductGrain that publishes an event after saving its data. The subscriber grain with the same Id is activated, fills in the data of the read data model, and saves it to the database.
There is also a ProductAggregateGrain with the same stream namespace and Id, which updates its own statistics after receiving a report from a sub-level order system, and then also publishes an event. This subscriber grain should also be activated, but the processing method is different because the event is different. It updates the aggregate information on the read data model and saves it to the database.
Overall, the data model for reading is a completely different aspect from the data model for writing, and it also has its own complexity.
I'm writing code based on the PersistentSubscription feature of EventStore DB, which is very efficient. A beta version should be released in a few days.