I've been playing with intercepting persistence to allow subscription to an actors event stream -
I've got a persistence provider that wraps an existing provider, and I want to be able to do something like the following (replay all events since startIndex and then stream remaining events as they arrive)
var sub *eventstream.Subscription
sub = streamingProvider.ReceiveActorEvents(actorName, startIndex, func (event,index) { ..})
Internally this:
1) subscribes to the incoming live events from the provider and buffers them
2) Fetches all events from the start index using ProviderState.GetEvents
3) Replays the persisted events
4) Replays the buffered events
5) streams the remaining live events as they arrive
The issue I'm having is that if you want to replay a consistent subset of the journal ( using ProviderState.GetEvents) and then subscribe to the events as they are propagated I need to be able to de-duplicate events that have already arrived on the stream and those that are returned by the persistence query.
I can't do that at present as the event identity is thrown away by ProviderState.GetEvents.
A simple change to support this would be to put the event index into the callback params of GetEvents (perhaps wrap the callback param event in a struct to avoid breaking the interface in future)
e.g.:
GetEvents(actorName string, eventIndexStart int, callback func(messageIndex int ,e interface{}))
Also, doesn't the arg have to be a proto.Message ?
I think,
1、 load the plug-in through the WithMiddleware , this plug-in buffers all income events.
2、use state ProviderState.GetEvents persistence events,throw buffered event from plug-in
so,is ok?
I've been playing with intercepting persistence to allow subscription to an actors event stream -
I've got a persistence provider that wraps an existing provider, and I want to be able to do something like the following (replay all events since
startIndex
and then stream remaining events as they arrive)Internally this: 1) subscribes to the incoming live events from the provider and buffers them 2) Fetches all events from the start index using ProviderState.GetEvents 3) Replays the persisted events 4) Replays the buffered events 5) streams the remaining live events as they arrive
The issue I'm having is that if you want to replay a consistent subset of the journal ( using ProviderState.GetEvents) and then subscribe to the events as they are propagated I need to be able to de-duplicate events that have already arrived on the stream and those that are returned by the persistence query.
I can't do that at present as the event identity is thrown away by
ProviderState.GetEvents
.A simple change to support this would be to put the event index into the callback params of GetEvents (perhaps wrap the callback param event in a struct to avoid breaking the interface in future)
e.g.:
Also, doesn't the arg have to be a
proto.Message
?