dotnet / orleans

Cloud Native application framework for .NET
https://docs.microsoft.com/dotnet/orleans
MIT License
10.09k stars 2.03k forks source link

Add support for StatelessWorker grains to subscribe to streams #433

Open veikkoeeva opened 9 years ago

veikkoeeva commented 9 years ago

@gabikliot Confirmed in Gitter that currently it is not supported for StatelessWorker grains to subscribe to streams, but support for this scenario is desirable. Let's use this issue as a vehicle to flesh out more details and improve documentation. I'll add some tasks, feel free to propose better ones and I'll refine the list.

To paraphrase that of written earlier

So today we don't support StatelessWorker subscribing to a stream. It won't work. We should probably add an explicit throw in the code now. It has to do with our internal implementation. However, in general, we do need to support this scenario. Our first priority scenario was a large number of small streams, and the processing of them is basically sharded among large number or regular (statefull) grains. The goal is statefull processing. That's what we focused until now and that's what works now. The scenario you described with StatelessWorker will be more suitable for a small number (or even one) large stream and the goal would be stateless processing (for example, a stream of all msgs for all my events, just decode/decipher them and maybe forward for further statefull processing). So for now, just priority wise, we don't support stateless processing with StatelessWorker, but we should (and hopefully will in the future). Feel free to open an issue on GH for that. Maybe someone will take a lead and help implementing this. We can provide help with where the gaps are w.r.t. the current implementation.
gabikliot commented 9 years ago

Added documentation here: http://dotnet.github.io/orleans/Orleans-Streams/Streams-Programming-APIs Stateless Automatically Scaled-Out Processing.

rore commented 9 years ago

+1 for this addition.

There's another use case for StatelessWorker subscription which I think can be very basic (I know we need it across the board) - for multiple readers / single writer pattern.

This is a common pattern when you have a resource that has high read access. You want the updates to go to a single statefull grain, but to scale reads you want those to be done via StatelessWorkers (that also function as a local-silo cache).

These readers needs to subscribe to revoke events from the writer. Doing that via observers is a problem (since you need to persist them etc.). Orleans streams seems to be a perfect match for this. If only you could use them with StatelessWorkers...

gabikliot commented 9 years ago

@rore, I think in your case you can't really use StatelessWorkers. If I understand your case correctly, you need to reliably send "invalidate" command to all read replicas and make sure they all got it. You cannot do it with StatelessWorkers (unrelated to whether this is done with streams or direct grain messaging). StatelessWorkers are not addressable individually, and you can't know how much of them are. So you cannot address them all. You could maybe send a huge number of calls and "hope" at least one lands on each, but this is very fragile and even that won't guarantee you anything.

I see 2 ways to solve your general problem, both don't involve StatelessWorkers: 1) Do it the way you are doing now, with a single write grain and a pool of read grains and managed the pool explicitly, via ids. Like you decide on 10 ids for 10 grains for that pool. No need for them to register anywhere, just use known ids 1...10. The readers just picks a random id out of the pol. The write will send invalidate to all of them. To get dynamic scaling, you basically rely on Orleans for short deactivation. Configure the readers to have a short 1 min deactivation, and write have a longer deactivation. So in the high read rate case all 10 grains will be in memory. But in the low period they will quickly be deactivated. In general, in your case, since you are managing consistency of your data yourself, with explicit invalidate command, this whole problem is relatively easy.

2) We are looking internally on some kind of similar single write multiple reader case. The difference from your case is that we want to manage the consistency automatically, not based on application telling us when to invalidate. This is still in quite an early stage.

CCing: @nehmebilal, @remusrg.

rore commented 9 years ago

Thanks @gabikliot . Two questions:

About your suggestion - part of the idea of using StatelessWorkers is that they are local on each silo, thus providing a local cache. Can this be done they way you suggest it? Can these grains be with local placement even if they are not StatelessWorkers? Also, if the writer always sends the invalidation message to all reader grains, it means all 10 grains will be activated on an invalidation even if only one of them is actually alive. That introduces an overhead for invalidations.

About streaming - wouldn't allowing StatelessActors to subscribe be the solution to the problem of broadcasting messages to StatelessActors? (This is a problem that goes beyond this specific use case). That is - if StatelessActors could subscribe, I would be able to have an "invalidation" pub-sub stream that will be used to broadcast the invalidation to all subscribed StatelessWorkers. If that's not the case, what am I missing?

gabikliot commented 9 years ago

Lets talk about StatelessWorkers here and open a separate issue about replication. It is a big enough issue to deserve its own thread. OK? So please go ahead and open a new issue on replication with your case described and potential solutions.

Back to StatelessWorkers. What you are suggesting does make sense , but for a different abstraction. StatelessWorkers - the whole point is: you don't care which one, you don't send them commands to control their state, since they are Stateless. W.r.t. stream processing, I think StatelessWorkers can be used for scaling out stateless processing: http://dotnet.github.io/orleans/Orleans-Streams/Streams-Programming-APIs -> Stateless Automatically Scaled-Out Processing But it is not even clear to me how StatelessWorkers will subscribe to a steam - each activation needs to subscribe, or one activation on behalf of all activations, or maybe StatelessWorkers with stream can only be used with Implicit Stream Subscriptions. I don't know yet.

But being able to send a broadcast msgs to all seems like introduces explicit management of them, which is against the nature of Stateless.

There is a different question about whether Orleans should provide an abstraction of a group of actors, how this abstraction is provided, ...lots of questions. But StatelessWorkers is not that abstraction.

rore commented 9 years ago

I see your point, but if that's the case then I feel there's a fundamental building block that is missing.

What we usually need are Workers, the part of the "Stateless" we kind of ignore. The common need is to have grains of a certain type that are created locally, automatically and scaled when needed (we don't want to manage them explicitly, e.g. by known IDs).

These grains are not persistent, but they can potentially hold a local non-persistent state (which is some kind of data that you might need and don't want to bring on every call).

As for subscribing, I was thinking something like this - each StatelessWorker will subscribe on activation and will unsubscribe on deactivation. That way stream messages will only go to the existing activations and will not activate any StatelessWorker that was deactivated. Why does that break the abstraction of the StatelessWorker? I don't see how it introduces explicit management.

veikkoeeva commented 9 years ago

@rore I rephrase otherwise to check I understood correctly: You have a MasterCache that is an origin, a source of events to some well known stream. Let's say it's a cache stream with Id 0. Then you would like to instantiate local, per-silo caches, PerSiloCache grains, that have the exact same content as the master cache grain.

To query these per-silo caches grain you call them with a well-known ID, say 0 (PerSiloCacheFactory.GetGrain(0)). Upon OnActivateAsync this PerSiloCache retrieves data from MasterCache and subscribes to the aforementioned well known cache stream with stream ID 0. Upon DeactivateAsync you would unsubscribe from the well known cache stream.

This way MasterCache could feed changes to PerSiloCache grains and you wouldn't need to explicitly manage the caching and/or guess how many you would need.

rore commented 9 years ago

@veikkoeeva yes, exactly, and PerSiloCache are StatelessWorkers so they are scaled when needed.

veikkoeeva commented 9 years ago

@rore I think this could work. There's another pattern that could also work, though not as scalable: Have a Bootstrapper and instantiate there a [PreferLocalPlacement] grain with some well known ID, say, something derived from the silo ID. For this plan to work, there's a little snag, it's not exposed in Bootstrapper publicly at the moment (though it is known at the time). Though to get it, you could have a stateless grain, query the silo ID from there and use that then to instantiate the locally placed grain. Code organization-wise this is almost the same as the plan with stateless worker and if someone doesn't find holes in this, eventually streams would be supported in the stateless grains and you don't need that maximum scalability right now, refactoring later would be easy.

As a side note, maybe it is OK to add [Reentrant] too.

gabikliot commented 9 years ago

Guys, lets not combine 2 unrelated threads please, OK? Just makes later following and reasoning so hard. This thread is about StatelessWorkers using streams. A different issue is replication and local placement optimization. Or like @rore wrote: "I see your point, but if that's the case then I feel there's a fundamental building block that is missing."

gabikliot commented 9 years ago

@rore, about StatelessWorker abstraction with streams. First, lets assume we are talking about Explicit subscription, and not Implicit. So first the GRAIN has to subscribe, at least once. Otherwise, no activation of that grain will receive any stream event. So you send a control msg to this aStatelessWorkera grain, it subscribes once. Now you keep sending it stream events, more activations are created, they subscribe (but only their activation), then unsubscribe inside deactivate async. But the one global per grain subscription stays? How you unsubscribe from that? This mixes 2 abstractions. I don't see how this can work. I am really open to suggestion. I would love to make it work. I just don't currently see how.

Alternatively, we can say that StatelessWorker can only have implicit subscription. No need to subscribe a grain ever. In this case indeed the subscribe inside activation will be for this activation only. This can work I think.

rore commented 9 years ago

I don't see a problem with having a limitation that you can only use an implicit subscription. I would be happy to understand, though, in what way is the implicit subscription different from explicit so that you don't have the same problem with the subscribing grain identity.

gabikliot commented 9 years ago

With implicit subscription the grain is already subscribed, statically. You still need to call SubscribeAsync from OnActivateAsync, but this is only attach the IAsyncObserver, not to do any actual grain subscription. In the future, when we add support for a grain code to directly implement IAsyncObserver, there will not be a need in this "attach the IAsyncObserver" call. So implicitly subscribed grain will just get the events, just like regular grain calls, without any calls to SubscribeAsync.

For explicit subscription you do need to subscribe the grain.

Take a look here: http://dotnet.github.io/orleans/Orleans-Streams/Streams-Programming-APIs "Explicit and Implicit Subscriptions"

nehmebilal commented 9 years ago

To avoid mixing subjects again, I just opened an issue to discuss the Single Writer Multiple Readers scenario with a proposal #447.

Please let us know what you think.

rore commented 9 years ago

Getting back to this. I have a feeling you won't like the idea but I'll still raise it.

What if when subscribing you will be able to control if the subscription is on a "grain level" or an "activation level" ? Currently subscriptions are per grain. So why is it useful to have the subscription per specific grain activation? I can see two probable use cases -

  1. You want the subscription to be active only if the grain is active. Say you have a user grain, and when the user is active in the application you want it to get some messages (or events). But if the user has been idle for some time, you don't want to process these messages any more. If you could subscribe per activation, when the user has been idle and the grain deactivates, it would not reactivate as a result of messages ("automatic" unsubscribe).
  2. Being able to subscribe to multiple activations of the same grain (hmmm...).
gabikliot commented 9 years ago

I think this is again unrelated to stateless workers. For regular grain we can have an additional option: IAsyncStream.SubsribeTransientAsync() and it will be a transient subscription for this activation only. I don't know if I like that option or not. On the one hand, it is just yet another capability, it works correctly with all other abstractions, it does not contradict any other abstractions. On the other hand, not always the more is the better. Sometimes giving too much control options may become confusing. But in general, this could be a valid addition.

Totally unrelated is stateless option. As I understand it, your goal is to allow to subscribe lets say 5 stateless worker activations and when 1 event is published, each activation will receive 1, so in total 5 will be received by this grain. This proposal is very different from: transient subscription. This proposal does break the nature of grain vs. activation abstraction. One msg sent to a group of grains is different from one msg is broadcasted to all activations. We can have: MutiCopyBroadcastedActivation, but this is NOT Stateless Worker. This will be a different abstraction.

jason-bragg commented 6 years ago

One can use streams with stateless workers. Some behaviors may be unexpected and should probably be documented.

skyflyer commented 6 years ago

@jason-bragg, can you comment on the "behaviours that may be unexpected"?

jason-bragg commented 6 years ago

Stateless workers are always activated on the same silo as the caller. This means that a different activation of a stateless worker subscribing to a stream will be created on every silo events on that stream are generated, and only the events from that silo will go to that grain. In the case of persistent streams (AzureQueue, EventHub, ..) stateless worker consumer grains will always live on the same silo as the pulling agent.

Janisku7 commented 5 years ago

now with aspnetcore 3.0 have worker SDK it would be great for Orleans StatelessWorker

sergeybykov commented 5 years ago

@Janisku7 Can you elaborate what you think is an opportunity here?

dmartensson commented 5 years ago

Could the stateless worker grain use an attribute indicating it is a subscriber?

This tells the runtime that there exists an stateless subscriber.

When a message arrives it checks if there are any active subscriptions in the local silo, if not, or if all are busy and there are room for more it activates a new grain and subscribes it to the stream and once the grain is deactivated it is unscubscribed.

But the runtime still knows there are grains ready to be activated?

sergeybykov commented 5 years ago

@dmartensson The impedance mismatch here is that a stream subscriber is a grain (with a unique grain ID), but a stateless worker 'grain' is comprised of many activations that are created on demand in the caller's silo. So the 1-1 relation between a stream/subscription and a grain implicitly translates to 1-N relation between a stream/subscription and N activations of the single stateless worker grain.

It is impossible to subscribe/unsubscribe individual activation of a grain. The single subscription would stay on, but events from the stream may end up delivered to different activations of the grain. In addition, messages directed at stateless worker grains, including messages carrying stream events, never cross silo boundaries, and are always process locally.

Janisku7 commented 4 years ago

@Janisku7 Can you elaborate what you think is an opportunity here?

so if Stateless worker grain could be implemented using Worker Service so stateless grain worker is a background service, could it help? as ref for this https://docs.microsoft.com/en-us/aspnet/core/fundamentals/host/hosted-services?view=aspnetcore-3.0&tabs=visual-studio

sergeybykov commented 4 years ago

I don't quite understand how this would help. StatelessWorker is a grain that benefits from the scheduling and isolation guarantees of the runtime. It could spin up long running tasks, sure. But I struggle to see the other way around.

ThiagoT1 commented 4 years ago

@sergeybykov, would you kindly elaborate on how would a StetelessWorker behave regarding subscriptions when decorated with [StatelessWorker(1)] ?

pavel-zybenkov commented 4 years ago

Hello. Is it a valid workaround to have a simple IoC singleton object subscribed to a stream? (To cover per-silo local cache scenario)

sergeybykov commented 4 years ago

Sorry about delayed responses.

@sergeybykov, would you kindly elaborate on how would a StetelessWorker behave regarding subscriptions when decorated with [StatelessWorker(1)] ?

Without (1), silo will create multiple activations of the grain that will receive stream events in an unpredictable order. Usually, that is undesirable.

Hello. Is it a valid workaround to have a simple IoC singleton object subscribed to a stream? (To cover per-silo local cache scenario)

Objects within a silo cannot subscribe to streams, only grains can. So, one does need to execute within a context of a grain to receive and process stream events.

siennathesane commented 4 years ago

Without (1), silo will create multiple activations of the grain that will receive stream events in an unpredictable order. Usually, that is undesirable.

In my mind, this could be acceptable so long as it's well-documented as it allows users to consider a trade-off of automatic scaling vs ordered responses. I have a use case where I have hundreds of IoT devices which send data to a queue, and stateless processing is preferred because the consuming grain is just tagging/cleaning the data based off known constraints and then sending it along to a new queue. If I were to deploy thousands of devices with a non-linear messaging rate increase, I'd want to be able to have Orleans scale the stateless worker as the ordering doesn't matter, just the rate at which messages are processed.

hryo commented 3 years ago

Objects within a silo cannot subscribe to streams, only grains can. So, one does need to execute within a context of a grain to receive and process stream events.

Sorry, maybe missed something, but even in co-hosted scenario? Is it impossible to inject IClusterClient in object and subscribe on stream from there?

Dobby007 commented 3 years ago

Without (1), silo will create multiple activations of the grain that will receive stream events in an unpredictable order. Usually, that is undesirable.

@sergeybykov, but will different activations of a stateless grain receive different events/messages from the stream? Or, conversely, can different activations of the stateless grain (either on the same silo, or different silos) receive the same messages from the stream?

It is said in the docs:

If a grain (or client) is subscribed X times to the same stream, it will receive the same event X times, once for each subscription.

So I am confused what exactly changes here in the case of stateless grains. Still it's better to document it to avoid such confusion...

Also you are writing:

In addition, messages directed at stateless worker grains, including messages carrying stream events, never cross silo boundaries, and are always process locally.

Does it only mean that once the pulling agent received an event/message from a stream provider it will pass this message for processing to one of the activations of a stateless grain on the same silo? My question is: can a stateless grain on different silos process messages from the same stream at the same time?

P.S. I am looking into an opportunity to use Orleans Streams on top of Kafka. I don't care about ordering. I am concerned just about delivery guarantees.

sergeybykov commented 3 years ago

will different activations of a stateless grain receive different events/messages from the stream?

Yes.

can different activations of the stateless grain (either on the same silo, or different silos) receive the same messages from the stream?

No.

what exactly changes here in the case of stateless grains.

Stateless grains don't have cluster-wide identity, only local within a silo. In a cluster of 5 silos, there will be 5 stateless worker grains with ID="X", one in each silo. On top of that, unless (1) is specified, each silo can create multiple activations of grain X within it.

Stateless grains can only be invoked by grains (or stream pulling agents) from the same silo.

Does it only mean that once the pulling agent received an event/message from a stream provider it will pass this message for processing to one of the activations of a stateless grain on the same silo?

Yes. It will pass it to an unpredictable activation, which makes any notion of ordering impossible.

My question is: can a stateless grain on different silos process messages from the same stream at the same time?

Only a regular grain can. Because unlike stateless worker grains, regular grains are remotely addressable. Pulling agents are a mechanism for distributing the job of pulling events from queues or queue partitions across silos. Hence, pulling agents on each silo read events from a subset of queues/partitions. Stateless worker grains help optimize processing of events by guaranteeing that they will not need to be sent over the network to another silo. For a stateless grain on another silo to receive the same messages, they would either need to be pulled again by a pulling agent on that silo, which defeats the purpose of distributing queues/partitions across silos, or would require events to be sent over the network, which doesn't work for stateless worker grains, as they aren't remotely addressable.

I am looking into an opportunity to use Orleans Streams on top of Kafka. I don't care about ordering. I am concerned just about delivery guarantees.

Then stateless worker grains should be a good fit. Pulling of events from Kafka partitions will be distributed across silos. Each event will be pulled only once and processed locally within the same process (silo). Delivery guarantee is at-least-once. If a grain activation returns an error to pulling agent or the call times out, pulling agent will retry delivery of that message up to the configurable limit of retries.

Dobby007 commented 3 years ago

Thanks, @sergeybykov. It made a lot of sense. Still I believe it's better to add this into the documentation as suggested by @mxplusb.

Only a regular grain can. Because unlike stateless worker grains, regular grains are remotely addressable.

This question was rather about subscribing to the same stream (with the same StreamIdentity) on different silos. I believe that for stateless grains it doesn't matter what ID you pass into the GetStream method – all messages received by the pulling agents will be processed locally as long as there is a subscription to that stream on that silo. Implicit implication here is that if a subscription does not exist it won't be able to process the incoming messages with that StreamId at all. And it's actually better to use the same StreamId if i just want to scale out the processing of the messages across silos and not divide processing into different streams (at least in the case with Kafka partitions). Am I right in assuming it?

By the way, if there are many small streams with various IDs it becomes much harder to subscribe to them with stateless grains. Because in that case you would need to subscribe to every possible ID in the same stateless grain or create a grain per each StreamId out there.

danielleiszen commented 2 years ago

I would like to use Orleans for similar purpose as @Dobby007 but on top of NATS Streaming. I see this infrastructure extremely helpful with ES architecture so I am looking forward to experimenting with it. Is there a roadmap for this feature or some fork I can give it a try with? Thx

sheeri185adidam commented 2 years ago

Looking at the documentation of stateless worker usage with streams, I see that it says here, "This is not yet implemented. An attempt to subscribe to a stream from a StatelessWorker grain will result in undefined behavior. "

Wanted to get confirmation if this is still the case? From the discussions above, sounds more like it is supported in someway? FWIW, I was able to add implicit subscription to a stateless worker grain on a single silo cluster locally on my machine. Have not tried it on multi-silo cluster yet and hence the question.

benjaminpetit commented 2 years ago

Still not supported. With only one silo and one stateless worker, it will work, but will not once you have multiple workers in the cluster.

sheeri185adidam commented 2 years ago

Still not supported. With only one silo and one stateless worker, it will work, but will not once you have multiple workers in the cluster.

Thanks.

feitzi commented 1 year ago

It seems that the plan of stateless automatically scaled-out processing has been on the backlog for a few years, and is mentioned in the official documentation

Do you plan to implement and support this in the near future? If not, it should be removed from the documentation.

benjaminpetit commented 1 year ago

I wasn't aware it was in the docs.

On one side, I think that's a good idea, but on the other side, I don't think it doesn't really fit with the streaming model we have. Too many chances to use it incorrectly.

I think we should remove it from the docs for now and reevaluate later if there is demand for it.