Closed wiegell closed 1 year ago
The code changed a lot in 2 years. Subscriptions no longer use channel internally.
I don't think using channels would be better because Go gRPC library is not based on channels, that would lead to none optimal memory usage and a lot of circle around with no benefit imho.
I remain open to suggestion and would like to know how you'd do it for example.
What i'm doing currently is wrapping the .Recv() like shown below. But it would be great if this functionality was supplied by the client itself.
func (m ESDBDataService) subscribeToEventDataStream(ctx context.Context,
streamName string, from esdb.StreamPosition,
) (<-chan []byte, error) {
returnChan := make(chan []byte)
options := esdb.SubscribeToStreamOptions{
From: from,
ResolveLinkTos: true,
}
subscription, err := m.esdbClient.SubscribeToStream(ctx, streamName, options)
if err != nil {
return nil, err
}
// Map data from event to pure data stream
go func() {
defer close(returnChan)
for {
select {
case <-ctx.Done():
return
case ev, ok := <-chanFromSubscription(ctx, subscription):
{
if !ok {
return
}
returnChan <- ev.Event.Data
}
}
}
}()
return returnChan, nil
}
func chanFromSubscription(ctx context.Context, subscription *esdb.Subscription) <-chan *esdb.ResolvedEvent {
returnChan := make(chan *esdb.ResolvedEvent)
go func() {
defer close(returnChan)
defer subscription.Close()
for {
event := subscription.Recv()
if event.SubscriptionDropped != nil {
break
}
if event.EventAppeared != nil {
// Forward data to channel
returnChan <- event.EventAppeared
}
}
}()
return returnChan
}
Also - can you confirm that canceling the context given to SubscribeToStream
will unblock .Recv()?
Also - can you confirm that canceling the context given to SubscribeToStream will unblock .Recv()?
Yes it should
could my example with chanFromSubscription be implemented as a method on *esdb.Subscription
? And similar on read?
If you ask if your function could be a valuable addition to the client, I would say no because it's pretty specific and doesn't deal with connection drops, nor checkpoints if you subscribed to $all
for example. To be fair, it's hard to come up with higher-level functions because the way you consume a subscription might differ from how other people would do. We decided to expose a leaner interface so it accommodates everybody.
What can be done however is to have a community driven project where your function could be added. The idea is to allow such project to not be tied to the client release schedule, so it can evolves at its own pace.
This PR somewhat implemented the use of channels internally in the package: https://github.com/EventStore/EventStore-Client-Go/pull/56/files
I find that i always wrap the .Recv() method with a goroutine to create a channel when using this package - wouldn't it be more natural that the .Recv() method returned a channel?
Also i think there could be more documentation regarding the .Recv() method. I suppose it just blocks the calling goroutine when waiting for events? Will this unblock if ctx.Done() emits? If not - is this not prone to goroutine leaks?