Closed frenchyjef closed 1 year ago
Searching for "eventflow" and "persistentsubscription" on Google doesn't yield any results. As far as I know, this hasn't been researched yet.
I don't think so: The RabbitMQ integration in EventFlow is a consumer (implements ISubscribeSynchronousToAll
). That means, it will publish all events to RabbitMQ, where they can be consumed by other applications. With EventStore Persistent Subscriptions you could achieve something similar, but why would there be an EventFlow integration for this? You can already do this - just use the EventStore API.
In EventFlow, there is no (technical) difference between Domain and Integration Events.
@frenchyjef @frankebersoll
I have not found it either.
Yes it could! I'm using EventStoreDB for both my domain and integration events
My opinion is that EventFlow only works for domain events, not for integration events. Normally, people create services and those services need to communicate between each other. The service can be event sourced using EventFlow. The communication between each services mostly happen outside such a framework. Like with the help of say RabbitMQ. This is basically event sourcing (or cqrs) is not a top level architecture.
That said, I'm actually using EventStoreDB for communication between services. So, normally people don't do this, but it is perfectly fine to do so as it is 'just a queue'.
This is the code for that. It is still work in progress, but should work.
public class EventStoreEventHandlerManager : IHostedService
{
private readonly ILog _log;
private readonly IResolver _resolver;
private IEventStoreConnection _connection;
private readonly string _stream;
private readonly string _cursorsStream;
private long? _cursor;
private long _cursorVersion;
public EventStoreEventHandlerManager(
ILog log,
IResolver resolver,
IEventStoreConnection connection,
string aggregate,
string subscriber)
{
_log = log;
_resolver = resolver;
_connection = connection;
_stream = $"$ce-{aggregate}";
_cursorsStream = $"cursors-{subscriber}";
}
public async Task StartAsync(CancellationToken cancellationToken)
{
await InitializeCursors();
Subscribe();
}
public Task StopAsync(CancellationToken cancellationToken)
{
return Task.CompletedTask;
}
private async Task InitializeCursors()
{
var result = await _connection.ReadStreamEventsBackwardAsync(_cursorsStream, StreamPosition.End, 1, resolveLinkTos: true);
switch (result.Status)
{
case SliceReadStatus.Success:
_cursor = JsonConvert.DeserializeObject<long>(Encoding.UTF8.GetString(result.Events[0].OriginalEvent.Data));
_cursorVersion = result.Events[0].OriginalEventNumber;
break;
case SliceReadStatus.StreamNotFound:
_cursor = StreamCheckpoint.StreamStart;
_cursorVersion = ExpectedVersion.NoStream;
break;
default:
throw new Exception($"Unexpected result when reading {_cursorsStream} to resume EventHandlerManager: {result.Status}");
}
}
private void Subscribe()
{
var retryPolicy = Policy
.Handle<Exception>()
.WaitAndRetryForeverAsync(
retryAttempt => TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)),
(ex, timespan) =>
{
Console.Out.WriteLineAsync($"Retrying because of {ex}");
}
);
var position = _cursor.HasValue ? _cursor.Value.ToString() : "null";
Console.Out.WriteLine($"Subscribing to stream '{_stream}' from position {position}");
var semaphoreSlim = new SemaphoreSlim(1, 1);
var subscription = _connection.SubscribeToStreamFrom(_stream, _cursor, CatchUpSubscriptionSettings.Default,
async (s, e) =>
{
// sometimes EventStoreDB gives as an event twice... this code prevents that
// the semaphore is needed because the check for cursor can be async and needs a lock
await semaphoreSlim.WaitAsync();
if (_cursor.GetValueOrDefault(-1) >= e.OriginalEvent.EventNumber)
{
Console.Error.WriteLine($"Received event '{e.Event.GetType()}' (#{e.Event.EventNumber}) from stream '{s.StreamId}', but cursor is already at {_cursor}");
return;
}
try
{
Console.Out.WriteLine($"Received event '{e.Event.GetType()}' (#{e.Event.EventNumber}) from stream '{s.StreamId}'");
await retryPolicy.ExecuteAsync(async () =>
{
var eventJson = Encoding.UTF8.GetString(e.Event.Data);
var metadataJson = Encoding.UTF8.GetString(e.Event.Metadata);
var metadata = JsonConvert.DeserializeObject<Metadata>(metadataJson);
var integrationEvent = new IntegrationEvent(eventJson, metadata);
_cursor = e.OriginalEvent.EventNumber;
await DispatchToSubscribersAsync(integrationEvent);
// update cursor. This only works when the commands are idempotent
// eg. use ISourceId, or an Idempotent Producer
var cursorsMessageInput = new EventData(Guid.NewGuid(), "cursors", true, Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(_cursor)), null);
await _connection.AppendToStreamAsync(_cursorsStream, _cursorVersion, cursorsMessageInput);
_cursorVersion++;
});
}
finally
{
semaphoreSlim.Release();
}
},
(s) =>
{
Console.Out.WriteLine($"Subscription to stream '{s.StreamId}' is now live");
},
(s, r, e) =>
{
Console.Out.WriteLine($"Subscription to stream '{s.StreamId}' has dropped because of '{r}' with exception: {e}");
}
);
}
private async Task DispatchToSubscribersAsync(IntegrationEvent integrationEvent)
{
var subscribers = _resolver
.ResolveAll(typeof(IEventHandler))
.Cast<IEventHandler>()
.ToList();
if (!subscribers.Any())
{
_log.Debug(() => $"Didn't find any subscribers to '{integrationEvent.Metadata.EventName}'");
}
foreach (var subscriber in subscribers)
{
_log.Verbose(() => $"Calling HandleAsync on handler '{subscriber.GetType().PrettyPrint()}' " +
$"for aggregate event '{integrationEvent.Metadata.EventName}'");
try
{
var executionResult = await subscriber.HandleAsync(integrationEvent);
if (!executionResult.IsSuccess)
{
var failedExecutionResult = (FailedExecutionResult)executionResult;
var errors = string.Join(",", failedExecutionResult.Errors);
throw new Exception(@$"Execution is not succesfull, because of the following errors: {errors}");
}
}
catch (Exception e)
{
_log.Error(e, $"Subscriber '{subscriber.GetType().PrettyPrint()}' threw " +
$"'{e.GetType().PrettyPrint()}' while handling '{integrationEvent.Metadata.EventName}': {e.Message}");
throw;
}
}
}
}
So the EventStore's persistent subscription will be altering the EventStore. It will be creating streams which is what makes it persistent. I recently added an EventStore subscription to our implementation. Our scenario was like this: Domain Server: -> command comes in -> If it passes the validation then an event is persisted to EventStore -> The read model is updated (with standard EventFlow, this is done using the event passed through as an object in memory) -> the event is published via RabbitMQ
But that coupled our read models to our domain. So we are now looking to have a separate server for reading and for writing.
The new read server: -> On startup it will use the IReadModelPopulator and IEventPersistence to LoadAllCommittedEvents() -> after it has loaded all events it uses my new interface ISubscriptionEventPublisher to StartSubscription() -> So if Domain Server persists and event to EventStore, then EventStore will notify my Read Server via the Subscription and it will update any read models.
public EventStoreSubscriptionEventPublisher(
ILog log,
EventStoreFactoryService connection,
IEventJsonSerializer eventJsonSerializer,
IDomainEventPublisher domainEventPublisher)
{
this.log = log;
this.connection = connection;
this.eventJsonSerializer = eventJsonSerializer;
this.domainEventPublisher = domainEventPublisher;
}
public async Task StartSubscription()
{
log.Information("Starting subscription");
await connection.GetConnection().SubscribeToAllAsync(false, HandleNewEvent);
}
private async Task HandleNewEvent(EventStoreSubscription subscription, ResolvedEvent e)
{
if (e.OriginalStreamId.StartsWith("$"))
{
return;
}
var ae = new EventStoreEvent
{
AggregateSequenceNumber = (int)(e.Event.EventNumber + 1), // Starts from zero
Metadata = GetMetaData(e),
AggregateId = e.OriginalStreamId,
Data = Encoding.UTF8.GetString(e.Event.Data),
};
var de = eventJsonSerializer.Deserialize(ae);
await domainEventPublisher.PublishAsync(new List<IDomainEvent> { de }, new CancellationToken());
}
Hi all,
I created Pull Request #797 which upgrades EventStore persistence to use EventStoreDB 20.6.1.
I also added a UseEventStoreSubscriptions extension to help subscribe to stream events based on a filter and then publish them to your internal microservices (pubsub). After talking with the EventStore team, it seems that it is preferred to use SubscribeToAllAsync instead of PersistentSubscriptions.
Could any of you review my pull request and provide thoughts/feedback so we can get this merged into EventFlow library?
Cheers, J-F
Hello there!
We hope you are doing well. We noticed that this issue has not seen any activity in the past 90 days. We consider this issue to be stale and will be closing it within the next seven days.
If you still require assistance with this issue, please feel free to reopen it or create a new issue.
Thank you for your understanding and cooperation.
Best regards, EventFlow
Hello there!
This issue has been closed due to inactivity for seven days. If you believe this issue still needs attention, please feel free to open a new issue or comment on this one to request its reopening.
Thank you for your contribution to this repository.
Best regards, EventFlow
Hi,
There is an implementation of EventStoreDb in EventFlow to store the events...
Has anyone tried to develop an implementation of EventStore's PersistentSubscription for EventFlow?
Could this replace RabbitMQ?
If yes, would it apply to both Domain and Integration Events?