dotnet / orleans

Cloud Native application framework for .NET
https://docs.microsoft.com/dotnet/orleans
MIT License
10.03k stars 2.02k forks source link

Implementing a new Stream Provider, documentation? #2421

Closed jamescarter-le closed 7 years ago

jamescarter-le commented 7 years ago

I am looking to write a StreamProvider for EventStore, I have seen the documentation on the Orleans site, and had a look at the current EventBus implementation, so I have a reasonable understanding of how to go about it implementing this.

I wondered if there was any documentation someone could forward me about the best practices of implementing a Stream Provider. This source supports pushing events, I may need a cache in front of the PullingAgents to ensure I can fulfill the Backpressure requirement.

sergeybykov commented 7 years ago

This source supports pushing events

Do you mean that grains would only publish events but never consume them?

I'm afraid the EventHub provider code is the only best practice "documentation" at this point. @jason-bragg may have more to say.

jamescarter-le commented 7 years ago

It is the inverse, Grains would not publish to this Stream in my implementation.

Thanks I will base my provider ontop of the existing EventHub provider.

jason-bragg commented 7 years ago

@jamescarter-le Unfortunately we don't have the type of documentation you are asking about. This is, in part, due to the fact that we're still figuring out how to build stream providers ourselves. Streaming is conceptually simple but technically complex, and the requirements of various scenarios vary dramatically.

From what we've learned so far, I would suggest you spend a little time thinking about your usage patterns, especially reliability, and performance requirements.

We currently maintain three general patterns:

If one of these scenarios fit your needs, then the streaming infrastructure we currently have can likely be adapted to your needs, but the more your needs deviate the more difficulties you'll encounter.

Given that you look to read from, what appears to be, an existing message queuing system, I would suggest modeling your provider off of the Azure Queue stream provider or the EventHub stream provider, depending on the level of recoverability you need. This involves implementing a queue adapter that bridges the persistent stream provider with the underlying queue.

As @sergeybykov suggested, familiarizing yourself with the EventHub stream provider is a good start. The EventHub stream provider is the newest and most advanced stream provider, so even if you don't have the same recoverability needs, it's still good to familiarize yourself with it.

jamescarter-le commented 7 years ago

Thanks @jason-bragg for that detail. I believe the EventHub is the best one to start with due to it dealing with the SequenceTokens which I will also have to use for my provider.

I will link to my repository when constructed for your perusal.

jason-bragg commented 7 years ago

@jamescarter-le Great! Eager to see what you come up with.

A few notes about the EventHubStreamProvider: The provider was ported from 343 Industries (Halo 5) and contains many extension points to allow for that org to continue using it with some of their custom systems. You may notice that the adapter allows for customization of checkpointing, monitoring, message serialization, and other oddities. Please don't feel the need to be as customizable. These requirements have complicated the adapter more than I would have liked. The pooled cache used in the EventHubStreamProvider was necessary for the recovery scenarios, which required several minutes (30ish is common) of data to be kept in memory without a significant GC hit. Making it fast as well as flexible enough to accommodate various forms of data was non-trivial, and resulted in an undesirable level of complexity. If you choose to use the pooled cache, please feel free to provide feedback and suggestions if you see ways to simplify it.

jamescarter-le commented 7 years ago

Many thanks for your assistance here. Having dived further into the Stream Providers internals I have a few questions to resolve. Apologies for the long post - but I thought it worthwhile to see if I was on the wrong path.

Edit: For clarification - currently we are not pushing to EventStore from Orleans - this is to consume events from an another application.

I have a diagram of my proposed solution here: Block Diagram

All of the StreamProviders currently enabled, EventHub, Kafka, AzureQueue, SQS all stream from one physical Source, whether it is a Hub, Topic or Queue, and the message contains the Namespace to deliver the event to the appropriate subscriber. They use multiple IQueueAdapterReceiversjust to split the load over the different queuing technologies partitioning mechanisms.

I have a different requirement. I have one source of events, EventStore in this case, which could contain many thousands of lightweight streams. When a Grain is instantiated, we want to use PubSub to Subscribe to a Stream and persist that - which all works well. One issue is https://github.com/dotnet/orleans/issues/1121 but I can work around that for now. I do not want to open multiple sockets to the EventStore, as the API allows me to open one connection and subscribe to be notified about events by being pushed to the client (IQueueAdapterReceiver) or I can poll for them.

Am I misusing streams here by attempting to read from distinct physical streams through one EventStoreAdapterFactory - all the other providers read one from physical stream and use their IQueueAdapterReceivers as connections to the different partitions.

To achieve this I am considering using a custom IStreamQueueMapper to take the StreamNamespace (EventStore Streams and Grains have a String key) and provide a QueueId, which can be used by the IQueueAdapterReceiver to resolve the appropriate Stream to subscribe to.

Also - Orleans PullingAgents are appropriate when I am connected to a stream I am specifically pulling events for, but a lot of streams are sending data directly to the EventStore Client in the IQueueAdapterReceiver when events emerge. Currently I am pushing these to a ConcurrentQueue and popping them when GetQueueMessagesAsync is called. I believe I can apply back pressure by not acknowledging these messages to the EventStore, which would stop from sending me messages, but would I be better off implementing my own cache populating mechanism and ignore the GetQueueMessagesAsync, similar to how EventHubAdapterReceiver implements IQueueCache.

Also - why does every StreamProvider return an IBatchContainer per Message, when it is called batch, and mentions the StreamSequenceToken should be of the earliest message in the Stream?

jason-bragg commented 7 years ago

Receivers are heavy. They are intended to read many streams from a single persistent queue. Having one receiver per stream (which is what I think you are suggesting) will not work well in the current architecture.

I will speak in terms of queues, but for this discussion queues, receivers, and pulling agents are analogous, as each queue has a single receiver read by a single pulling agent. The system assumes a fixed number of queues to read from so adding (or removing) queues dynamically is not supported. For a given stream provider, all silos in the cluster must be aware of the same set of fixed queues for the queues to be (mostly) evenly distributed over the cluster.

It sounds like EventSource does not have any partitioning capabilities (other than by stream), so if you want to distribute the read load across the cluster, you'll need to configure your provider to spin up a fixed number of 'receivers' (by configuring the queue mapper to report that number of queues), than have some way of telling each receiver which streams it should read from. The trick, I suspect, will be figuring out how to partition up your streams, so that each receiver knows which streams to read.

but would I be better off implementing my own cache populating mechanism

This depends on your recovery needs. If your processing needs to be recoverable (from transient errors) it would be valuable to simply put the message in the cache for delivery, and reduce read rate only if consumers are failing to keep up. Though I'd not recommend introducing this degree of complexity unless your recovery scenarios require it.

why IBatchContainer

High performance systems we've worked with often pack many messages into a single persisted queue message, so we assume there is a many to one mapping of application events to queue events, and the IBatchContainer is responsible for converting the single queue message to a set of application events. If your service does not do this then the IBatchContainer logic should be simple, returning the single event in the queue message.

jamescarter-le commented 7 years ago

@jason-bragg Thank you for your detailed response. I now understand that the Receivers are meant for reading one queue, however in my application there is no need to consume all events, each Orleans Cluster may only use 1 or 2 percent of the available Streams.

I see that is why I cannot create my own IStreamQueueMapper and return that from the AdapterFactory, as Orleans throws an exception when it is not of type IConsistentRingStreamQueueMapper despite the typing not specifying this nor this documention: https://dotnet.github.io/orleans/Documentation/Orleans-Streams/Streams-Extensibility.html

I now understand what this is doing, and I can implement a solution that as you say, creates X number of Receivers (configuration defined) which can then decide which EventStore stream they subscribe to.

I will leave caching for now, we should be able to recover using the StreamSequenceToken. I am able to retrive Batches of messages from EventStore when pulling, so I will use this approach as suggested.

Your support is appreciated.

jason-bragg commented 7 years ago

Sorry about the secret IConsistentRingStreamQueueMapper dependency. It hurts my soul too... :(

As for using the StreamSequenceToken to start reading from a position in the stream, I see no way (off the top of my head) for the token to get back to the receiver, unless you make the receiver implement the cache interface as well and get the token through the cursor request. Though, I suspect you can find another way if you need to.. There is also the risk of multiple consumers reading from different locations in the stream, which could aggravate the use of tokens. If you don't have multiple readers on a single stream, tokens as read locations could work well.

jamescarter-le commented 7 years ago

Understood @jason-bragg - I will continue for a few hours and see where I get to.

I appreciate the help.

jamescarter-le commented 7 years ago

I have got a draft which can read from the EventStore cluster, as you say @jason-bragg I need some way of telling the AdapterReceivers which Streams they should be responsible for.

https://github.com/jamescarter-le/OrleansEventStoreProvider

Currently the only point I can see to intercept this process is a call to the IStreamProvider.GetStream, no other points are available as far as I can tell. Unfortunately I cannot access the IAdapterFactory from the IStreamProvider despite it being held by the PersistentStreamProvider. To this end I have had to add some Reflection šŸ˜¢ to get the AdapterFactory for the StreamProvider.

I wonder if you know of a better way to do this, or if you would consider making AdapterFactory protected on PersistentStreamProvider?

Edit: Also, docs would be useful to let people know that using SimpleQueueAdapterCache as recommended in the docs requires your StreamSequenceToken to be or derive from EventSequenceToken.

jason-bragg commented 7 years ago

I'd advise against exposing internals via reflection, but I think you know that :) Is there a way for a EventStore client to query the store to learn about new streams?

jamescarter-le commented 7 years ago

The issue is there many millions of Streams so the EventStoreAdapterReceiver or QueueMapper cannot maintain a dictionary of these.

It is the fact we open a PubSub subscription to a Stream which should trigger the connection to the Stream. Unfortunately the PullingAgent does not create a QueueCursor until at least one message is relevant for this StreamId, at which point it sets the StreamSequenceToken to Empty, before the real Subscription then has a QueueCursor created, which is what I would really want to use to send to EventStore as it has the real StreamSequenceToken I am after.

It seems very much like Streams are not set up for this kind of interaction, and I may be better off just having another class external to Orleans, which can be posted to via a MemoryStream informing it which Streams to connect to, and have this push messages back to the Grain using the MemoryStream.

I will keep digging away and see if I can get Orleans Streams working properly.

jason-bragg commented 7 years ago

I agree that the pattern you are describing does not fit our existing model. I don't see a clear way to resolve this without changes to the infrastructure, which may not be a bad thing.

I suspect that a custom pubsub system would allow you to solve this, but that would require a change to the streaming infrastructure.

While I'm hesitant to suggest more radical deviations from our tested patterns, your case may warrant such. A custom stream provider, which spins off local stream reader grains per consumer might work, but without the ability to install grain extensions, scheduling could be a problem.

Maybe @gabikliot will see something I'm missing.

jamescarter-le commented 7 years ago

So I've been looking for other solutions as my pattern is really opposing being shoe-horned into Orleans Streams.

My first thought was using the IGrainObserver pattern, with a Grain per EventStore Stream, however it would need to be brought to life on Startup, perhaps using a Reminder, then I would have many Reminders ticking for no reason once the Grain was alive.

However I then dug into Reminders further, and it looks like Reminders are implementing exactly what I am after. They can be load balanced across silos, can invoke Grain operations, be registered from a Grain, callback that grain, are persisted and deal with silo loss/addition.

I think some minor adjustments to a copy of LocalReminderService would work very well for this. Unfortunately it looks like a lot of objects LocalReminderService interacts with are Internal, and the Silo also lacks any hooks to register SystemTarget objects.

This is a pity as it really looks like a good fit for what I need here - do you have any plans to take some Internal classes and expose them? Also - am I missing some hook where I am able to register a SystemTarget? The Service would need to be alive before any Grains are activated otherwise they could call a Target that did not yet exist.

sergeybykov commented 7 years ago

I opened https://github.com/dotnet/orleans/issues/2444 for the system target question.

jason-bragg commented 7 years ago

@jamescarter-le I'd like to verify that I understand your scenario.

Some other process/service/? will be writing events to EventStore organized by stream. A consumer grain will need to subscribe to a stream at a specific offset (sequence token?), and process events. Only one consumer will be consuming from any specific stream.

Is this correct? What is triggering these grains to subscribe, a http call? reminder? web socket connection?

jason-bragg commented 7 years ago

If Iā€™m understanding the scenario correctly, I think I may have a workable suggestion.

So far, weā€™ve mostly been discussing persistent streams because the service is processing events from a storage service, which is what those providers were intended for. However, we also have ephemeral stream providers, like the Simple Messaging Stream Provider and the Memory Stream Provider. These stream providers deliver events from producers to consumers via messaging with no persistent queue. We also have a stream subscription model called ā€œImplicit Subscriptionsā€ that automatically activate a grain for stream processing whenever events are available on a stream, organized by stream namespace. With these features one can create a stream processing grain associated with a stream namespace, then activate ā€˜producerā€™ grains to read a streamā€™s events from EventStore and publish them onto one of the ephemeral stream providers. This act will activate the consumer grain (associated with the stream) which will start processing the events. The producer grain can read from EventStore, publishing events to the ephemeral stream, until there are no more events, then deactivate; reactivating by reminder or any other trigger.

(EventStore) => Producer grain => Ephemeral stream provider => (implicit subscription) Consumer Grain.

With this pattern, one can trigger stream processing by controlling the producer grains that read from EventStore. Start, stop, reprocess from a previous point in the stream, all become calls to the producer grain reading from EventStore.

Thoughts?

jamescarter-le commented 7 years ago

Hi Jason, yes that is accurate.

I have a proven process outside Orleans that submits approx 150 events/sec to EventBus, with 20k event bursts every 10 minutes. The client site where this Orleans process is may only be interested in 1 or 2 events every minute or so and the rest will have no relevance. This is why I want to subscribe to specific streams, for bandwidth and processing time.

They will either start from the beginning of a Stream, or from the middle if the Grain is reactivated.

I am happy to accept the limitation if necessary of only one consumer to one stream.

I appreciate your guidance on this.

On 23 Nov 2016 22:38, "Jason Bragg" notifications@github.com wrote:

@jamescarter-le https://github.com/jamescarter-le I'd like to verify that I understand your scenario.

Some other process/service/? will be writing events to EventStore organized by stream. A consumer grain will need to subscribe to a stream at a specific offset (sequence token?), and process events. Only one consumer will be consuming from any specific stream.

Is this correct?

ā€” You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub https://github.com/dotnet/orleans/issues/2421#issuecomment-262647235, or mute the thread https://github.com/notifications/unsubscribe-auth/ABkUrMBZW8gEfkrhcBiErVV6fTIf8yYVks5rBMBugaJpZM4KzEhn .

jamescarter-le commented 7 years ago

Hi Jason, that is very similar to the set up I already had started to implement before looking into the Streams, but calling the Grain directly to deliver the message. This grain is kept alive with a Reminder.

I cannot use implicit streams because the GrainID is String.

The reminder type implementation looks the most promising, but perhaps my original proof of concept will have to do for now.

Edit: An orleans call from another Grain will inform the Grain that is should be process events from a Stream from now indefinately.

On 23 Nov 2016 23:03, "Jason Bragg" notifications@github.com wrote:

If Iā€™m understanding the scenario correctly, I think I may have a workable suggestion.

So far, weā€™ve mostly been discussing persistent streams because the service is processing events from a storage service, which is what those providers were intended for. However, we also have ephemeral stream providers, like the Simple Messaging Stream Provider and the Memory Stream Provider. These stream providers deliver events from producers to consumers via messaging with no persistent queue. We also have a stream subscription model called ā€œImplicit Subscriptionsā€ that automatically activate a grain for stream processing whenever events are available on a stream, organized by stream namespace. With these features one can create a stream processing grain associated with a stream namespace, then activate ā€˜producerā€™ grains to read a streamā€™s events from EventStore and publish them onto one of the ephemeral stream providers. This act will activate the consumer grain (associated with the stream) which will start processing the events. The producer grain can read from EventStore, publishing events to the ephemeral stream, until there are no more events, then deactivate; reactivating by reminder or any other trigger.

(EventStore) => Producer grain => Ephemeral stream provider => (implicit subscription) Consumer Grain.

With this pattern, one can trigger stream processing by controlling the producer grains that read from EventStore. Start, stop, reprocess from a previous point in the stream, all become calls to the producer grain reading from EventStore.

Thoughts?

ā€” You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub https://github.com/dotnet/orleans/issues/2421#issuecomment-262651320, or mute the thread https://github.com/notifications/unsubscribe-auth/ABkUrC9StSAhi3XNkjJDpk4hoxIJzCiYks5rBMZZgaJpZM4KzEhn .

jason-bragg commented 7 years ago

Orleans process is may only be interested in 1 or 2 events every minute ... I cannot use implicit streams because the GrainID is String.

Understood. Given the non-conformant pattern I agree that streams are probably not the right solution. The complexity they introduce is only justifiable when they can save time/effort, which does not seem to be the case here.

jamescarter-le commented 7 years ago

Closed in favour of an ad-hoc solution, or implemention of application SystemTarget types as described in https://github.com/dotnet/orleans/issues/2444

jamescarter-le commented 7 years ago

Many thanks for your help @jason-bragg @sergeybykov