dotnet / orleans

Cloud Native application framework for .NET
https://docs.microsoft.com/dotnet/orleans
MIT License
10.08k stars 2.03k forks source link

Evenly distributing work between all active silos #1428

Closed Eldar1205 closed 8 years ago

Eldar1205 commented 8 years ago

Scenario: In my would-be Orleans based Azure cloud service, there would be a worker role with 200 instances that should execute 50K periodic jobs every 1 minute. The periodic job is peeking an Azure event hub partition and check its last message sequence number - simple O(1) operation in terms of CPU, memory & networking, an operation that there is no problem with it running occasionally on two silos at once. There are 50K of those partitions, most of the time silent without new messages.

Thoughts about solution: Reminders can't be used for that, since 1 minute interval is too short for them, and it's very important that the job executes every 1 minute even if some silos are shut down, which means internal timers alone won't help as well. Since those jobs are O(1), the optimal solution would be to find a way to distribute them between all active silos evenly. Orleans does that internally for internal jobs (e.g. reading from stream queues, executing reminders) using consistent hashing which adapts to silos joining/leaving the cluster. Is there a way to access this functionally? I'd like something like a stateless worker per silo that every 1 minute "queries" current silo which 50K/N jobs should current silo perform, where N is number of current active silos, and ask local placement grains to do those jobs, without incurring any additional messaging between different silos and relying on what Orleans already does for me. Think of it as "querying" the consistent hash ring maintained by all active silos and periodically adjusted with current active silos with sub-minute intervals. Since it's OK to peek the event hub partition from two silos at once, there are no worries about the hash rings not consistent between all active silos, as long as it's eventually consistent and work is eventually distributed evenly.

Notes:

  1. I know having 50K event hub partitions that are mostly silent is an extreme over-abuse of event hubs, but that is a contract with external partners that is very hard to break right now - might as well embrace it and solve the problem elegantly and efficiently.
  2. I have alternative designs that use work distribution mechanisms external to Orleans to distribute the work of peeking event hub partitions around instances in a cluster, but all other functionality in the cloud service would really benefit from Orleans virtual actor model so if possible I'd like to do all with Orleans, especially given it already comes with a very good work distribution mechanism.
  3. Examples for benefits from virtual actor model for those who aren't convinced: a. Users request creation of entities that sometimes causes a creation of above event hubs. This happens on average once every few minutes, so a grain to handle requests from specific user saves me concurrency checks, helps with persisting state, can be GC when user is inactive, etc. b. if after peeking the event hub partition I detect there are enough pending messages, I'd like to ask a consumer grain to start consuming, and it's very good for me that it's garbage collected if it's not asked to consume messages after enough time. That way the hard work of consuming messages, that takes much more memory, CPU and networking compared to peeking the partition is executed by grains that are activated interactively like Orleans grains should be activated to benefit from the awesomeness of virtual actors model while having SOLID design - the responsibility of consuming messages from event hub partition is separated from the concern to check if it's actually required, saving the resources on checking that by trying to consume after buffers were allocated and then get nothing back.
gabikliot commented 8 years ago

Do you need EXACTKLY 50K, or 50K on average?

I presume on average. Then the solution is very simple, and you don't need access to any Orleans internals like ring. 1) On every silo bootstrap provider calls into X different local placed grains (not stateless workers, regular grain with PreferLocal). 2) each grain asks to never be deactivated - DelayDeactivation http://dotnet.github.io/orleans/Advanced-Concepts/Activation-Garbage-Collection 3) each grain starts a regular timer (not reminder) to tick every Y milliseconds. Inside the timer, do your operation. Use random start time for timers to ensure the ticks are jittered (not tick all at the same time). 4) Figure out X and Y based on the number of silos and target ops (50K).

That is it. This is a standard, by-the-book Orleans pattern. Supported natively by the combination of the public mechanisms, no need for internals.

If you also want to support a changing number of silo and/or a changing number of target ops (which is a more advanced scenario than you originally described), it can still be done with the public abstractions, with a bit more complexity, but not much really. I can provide details on how, if you need to.

Eldar1205 commented 8 years ago

I do have a need for varying number of active silos (elasticity) and the number 50K is not an exact one, it's based on user requests, but it should change on an hourly basis, and that's if our service is as successful as it could be. 50K is an approximation of max event hub partitions that'll be in the system for time to come, and my design has to meet this to keep up with tough SLA requirements. Three notes:

  1. Please provide details how, I'd really appreciate it. Tapping into the consistent hash ring Orleans maintains using public abstractions is great option to explore versus the alternative at Note 2.
  2. I spoke with Sergey Bykov about a simplified solution of having each silo run an always active stateless worker that every 30 seconds asks the management grain for all active silos, and then that stateless worker can figure out the X periodic jobs current silo should run. We talked about having those periodic jobs also stateless workers that aren't always active, which is preferable to [PreferLocalPlacement] because I don't mind having two silos probing the same event hub partitions for 30 seconds until the stateless workers re-balance the periodic jobs and, correct me if I'm wrong, [StatelessWorker] placement is more efficient (e.g. doesn't incur silo-to-silo communication) than [PreferLocalPlacement] which verifies the grain isn't already activated. The downside here the periodic jobs won't be distributed based on the consistent hash ring, but based on naive modulus, since using management grain I can only access list of active silos, not their projection onto the hash ring.
  3. Sergey mentioned implementing a custom stream adapter as way to tap in the consistent hash ring Orleans uses to balance streams, and we didn't talk about it too much because the management grain option sounded simpler and easier to reason about for me. If that's what you think about regarding how the consistent distribution could be done using public abstractions than I'd be even more happy to hear about it.
gabikliot commented 8 years ago

1) If you want to support changing number of silos, you can indeed periodically (infrequently, every 5 minutes) ask the management grain to # silos and adjust your math of X and Y. Better adjust only Y - the tick time and do not change the # grains. Much easier. 2) If you want to change target ops - just have another global rendezvous grain which you either periodically ask for target ops, or subscribe to this global rendezvous grain and let it notify all worker grains if target ops changes. Can use a stream for that as well, but does not have to. Can also just write it in known table in Azure Storage and periodically read it. 3) you don't need to worry about cost of activating PreferLocalPlacement grain vs. StatelessWorker grain, since you are going to activate a small number of those (like probably on the order of number of cores per silo) ONLY ONCE, at startup, so the cost is not even a consideration. The benefits are tremendous in our case: StatelessWorker is not individually addressable, you cant notify them, its hard to control the number of them in the silo. It is simply the wrong abstraction in your case. 4) I am of a strong opinion that you don't need the internal ring. Not just that it is not needed in our case, it is actually a wrong primitive/abstraction for you. You can calculate EXACTLY the X and Y, while with ring you will have approximately.

Just keep it simple. No need to complicate.

The only thing that I see that can be improved on this design, is for us to support notifications from the management grain when new silo is added/removed. That will save the cost of pinging it every 5 minutes. That is easy: can have a global SMS stream and mgmt grain publishes to it.

Again - you don't need "consistent distribution" (which is what I previously called a ring). You have a much better and exact option: X and Y based on #silos and target ops. Consistent ring is needed in a different case, when the cost of transferring ownership on something is high and you want to minimize it in case if churn. In your case there is no ownership, just a simple work distribution.

gabikliot commented 8 years ago

Ohh, maybe what you meant in your problem is that you want to pin Azure Hubs to silos/grains? That is not what you wrote initially. You wrote just to do 50K ops. If you need to pin hubs to grains ("the cost of transferring ownership on something is high and you want to minimize it in case if churn), then its a different problem.

Eldar1205 commented 8 years ago

Gabi, if you have time, I'd appreciate talking over Skype or WhatsApp call. My mail is eldar1205@gmail.com so contact me if you'd like to do that. If not please read on.

  1. I can't allow my stateless workers to query the management grain every 5 minutes. My SLA must ensure that even in case of a silo ungraceful shutdown, the periodic jobs he executed will be executed again after 1-2 minutes at most. Reduce the time it takes for Orleans to detect a silo shutdown under recommended configurations, and it leaves about 1 minute to re-activate the periodic jobs on another silo.
  2. It's fine that the same periodic jobs is transiently running at two silos at once given its stateless nature. Given there should be tens of thousands of those, isn't it better to put them as [StatelessWorker] instead of '[PreferLocalPlacement]` and not incur extra overhead of ensuring single activation?
  3. Please describe again how would you achieve the following, I didn't fully understand your idea: Given N(t) active silos (t is time parameter, silos are joining/leaving cluster every now and then), and M(t) "ping" stateless periodic jobs such that M >> N (assume M ~ 50K for evaluation), distribute the jobs between the active silos such that: a. Periodic job can be delayed for 1-2 minutes, but not more. b. The distribution is uniform (as much as possible) between silos for elasticity of executing same stateless workload and avoiding silos that become "ping" hotspots c. It's fine that same job is duplicated between silos, but duplications should be upper bounded over and not increasing over time to avoid wasting resources (networking/CPU/memory). d. Design should be simple as much as the requirements allow, preferably using Orleans abstractions.

Thank you very much!

jason-bragg commented 8 years ago

@Eldar1205

The periodic job is peeking an Azure event hub partition and check its last message sequence number

So you're not interested in reading the event hub events, you just need to check the last message's sequence number per partition?

It's quite possible there is an eventhub api that allows for this, but I am unaware of it. Only way I know to acquire an event hub partition's latest sequence number is to receive events from that partition.

Can you please elaborate on what you had in mind for partition management. How does the service know which partitions to read from? If new partitions can show up, something must be notifying the service of this, right?

What is reading from these hubs? If there is something reading from these hubs that can periodically notify your cluster of a partition's sequence number, that would simplify your service greatly.

How much activity do you expect on these partitions?

Each silo run an always active stateless worker that every 30 seconds asks....

I like this idea as well, but I'd modify it a bit. If there is something in the system that knows which partitions need to be checked (call it a partition tracker), each silo can have a set of stateless workers (10? 20?) which periodically start requesting buckets of partitions to check from the partition tracker. These grains periodically start requesting buckets of partitions, checks them, then repeat until partition tracker returns no more partitions. The partition tracker serves buckets of partitions to be checked, then moves those buckets to a list for the next check period. Once all of it's partitions have been checked, it returns nothing until it's time for the next window. If we're worried about the tracker dying, we can distribute the partitions among a number of trackers, and workers can round robin between them until none have work. Basically we don't divide up the work, we just periodically spin up workers and let them consume work until there is no more work to do.

200 silos, 50k partitions, 25 workers per silo, bucket size of 5. once a minute 5000 grains wake up 5000 grains request buckets consisting of a total of 25k partitions Each grain checks 5 partitions 5000 grains request buckets consisting of 25k partitions Each grain checks 5 partitions 5000 grains request buckets and get no more partitions. 5000 Grains go to sleep

200 silos, 50k partitions, 25 workers per silo, bucket size of 5. 10 silos die, 10k more partitions show up. 190 silos, 60k partitions once a minute 4750 grains wake up 4750 grains request buckets consisting of a total of ~23k partitions Each grain checks 5 partitions 4750 grains request buckets consisting of a total of ~23k partitions Each grain checks 5 partitions 2800 grains request buckets consisting of a total of ~14k partitions, rest get nothing 2800 grains checks 5 partitions, rest go to sleep. 2800 grains request buckets and get no more partitions. 2800 Grains go to sleep

Eldar1205 commented 8 years ago
  1. There is an API to probe an event hub partition for new messages. Assuming you track last processed message offset, you can ask the partition about the last enqueued offset and compare with last processed ( https://msdn.microsoft.com/en-us/library/azure/microsoft.servicebus.messaging.eventhubclient.getpartitionruntimeinformationasync.aspx )
  2. Regarding partitions management, event hub partitions are created every hour (on avg.) based on user operation. I planned to have an Orleans client ask a grain to create an event hub, which in turns asks a "probe" grain to start probing from that event hub every 10-15 seconds using above API, and when it has messages ask a "consumer" grain to start consuming and report back new last processed offset. That way the "consumer" grains that do the heavy work are active only when needed, and the "probe" grains that are always active require very low O(1) memory/networking/CPU resources every 10 seconds. The question was how to keep the "probe" grains alive, such that even on silo shutdown the "probe" grains would be re-activated on another silo after 1 minute at most. That question was answered by reading reminder service code and witnessing the tick period can be 10 seconds and have same performance as Orleans timers - the difference between reminders and timers in terms of performance is regarding re-registering them, but if you only create them then reminders can have sub-minute periods as efficient as timers, and I get the sub-minute resiliency to silo shutdown for free since Orleans distributes reminders based on the consistent hash ring maintained by all silos and reacts efficiently to silos shutting down. This should be documented better, there is a great difference between the interval of reminder ticks and the interval of reminder modifications.
  3. From the event hub partitions I expect to be silent most of the time, and every now and then have bursts of hundreds of messages per second. The timing can't be expected up front since it represents a security alert, which is why Orleans is so appealing to handle this, virtual actor model is great solution for interactive workloads such as these.
  4. Your buckets idea is very interesting, and is something I had in mind in another variation, but I didn't go that route because it's hard to reason about. I'm no cloud/distribution expert, and have to work on the project with people less proficient with me in those areas. I'd rather have an Azure work queue to distribute work evenly in order to be resilient to silos shutdown and maintain sub-minute "probe" times even in presence of failures. Fortunately, Orleans reminders do that much better in terms of simplicity and efficiency all using my cloud service and storage account managed (almost) fully by Orleans. I wonder what other gems could be discovered by reading the code and comparing it with documentation.

2016-02-12 1:37 GMT+02:00 Jason Bragg notifications@github.com:

@Eldar1205 https://github.com/Eldar1205

The periodic job is peeking an Azure event hub partition and check its last message sequence number

So you're not interested in reading the event hub events, you just need to check the last message's sequence number per partition?

It's quite possible there is an eventhub api that allows for this, but I am unaware of it. Only way I know to acquire an event hub partition's latest sequence number is to receive events from that partition.

Can you please elaborate on what you had in mind for partition management. How does the service know which partitions to read from? If new partitions can show up, something must be notifying the service of this, right?

What is reading from these hubs? If there is something reading from these hubs that can periodically notify your cluster of a partition's sequence number, that would simplify your service greatly.

How much activity do you expect on these partitions?

Each silo run an always active stateless worker that every 30 seconds asks....

I like this idea as well, but I'd modify it a bit. If there is something in the system that knows which partitions need to be checked (call it a partition tracker), each silo can have a set of stateless workers (10? 20?) which periodically start requesting buckets of partitions to check from the partition tracker. These grains periodically start requesting buckets of partitions, checks them, then repeat until partition tracker returns no more partitions. The partition tracker serves buckets of partitions to be checked, then moves those buckets to a list for the next check period. Once all of it's partitions have been checked, it returns nothing until it's time for the next window. If we're worried about the tracker dying, we can distribute the partitions among a number of trackers, and workers can round robin between them until none have work. Basically we don't divide up the work, we just periodically spin up workers and let them consume work until there is no more work to do.

200 silos, 50k partitions, 25 workers per silo, bucket size of 5. once a minute 5000 grains wake up 5000 grains request buckets consisting of a total of 25k partitions Each grain checks 5 partitions 5000 grains request buckets consisting of 25k partitions Each grain checks 5 partitions 5000 grains request buckets and get no more partitions. 5000 Grains go to sleep

200 silos, 50k partitions, 25 workers per silo, bucket size of 5. 10 silos die, 10k more partitions show up. 190 silos, 60k partitions once a minute 4750 grains wake up 4750 grains request buckets consisting of a total of ~23k partitions Each grain checks 5 partitions 4750 grains request buckets consisting of a total of ~23k partitions Each grain checks 5 partitions 2800 grains request buckets consisting of a total of ~14k partitions, rest get nothing 2800 grains checks 5 partitions, rest go to sleep. 2800 grains request buckets and get no more partitions. 2800 Grains go to sleep

— Reply to this email directly or view it on GitHub https://github.com/dotnet/orleans/issues/1428#issuecomment-183107973.

jason-bragg commented 8 years ago

There is an API to probe an event hub partition for new messages.

Nifty! Thanks!

I planned to have an Orleans client ask a grain to create an event hub... From the event hub partitions I expect to be silent most of the time, and every now and then have bursts of hundreds of messages per second

Please understand that I've been working with eventhub since before its release, and its performance characteristics have improved over time, so some of what I've experienced may no longer be the case. Having said that, from my experience, event hub acts quite poorly for sparse data flows. Much of it's performance comes from bulk operations, and without regular data flow, I've seen very poor performance, as well as common timeouts and other such unexpected behaviors. Is there a reason you need a single customer per hub/partition? Event hub has very high throughput, so at hundreds of events per second per user, you should be able to load 30-100 users onto a single partition, and many partitions per hub, depending on message size of course. At 50 users per partition, that would mean you'd only need 1k partitions.

Also, pre-allocating a set of hubs and partitions in advance, rather than creating them on the fly would allow you to use the streaming infrastructure, and avoid partition checks all together, as agents read as long as there is data, but when their is not, they delay checking for more data for a configured period of time. This means you can configure them to delay ~30 seconds whenever they read no data, which is, as I understand it, the behavior you want, correct? See GetQueueMsgsTimerPeriod in PersistentStreamProviderConfig.

  1. Your buckets idea is ...

Distributing data using silo availability creates a dependency on that information, and will only be as reliable as that information. In distributed systems, dumber algorithms that require less information tent to be more reliable. Stupid code breaks less often than smart code. :) The advantage of this type of 'hungry hippo' like algorithm is that it requires no coordination. Each grain just eats as long as there is food, sleeps, then does it again.

Eldar1205 commented 8 years ago
  1. The reason there are many event hub partitions is partially due to historical lack of understanding of how event hubs throttle senders, and partially due to security considerations I can't specify.
  2. It's not a good idea to have multiple consumers for single event hub partition, they'll read same messages and there'll be unwanted duplicates in the system. With event hubs there should be a single consumer per partition at a given point of time.
  3. If I had the option to make drastic changes I'd use service bus queues instead of event hubs, but event hubs is an API with partners that I can't break right now, and probably won't be able to in the future
  4. I realized Orleans reminders can be used with 10-15 seconds tick periods without incurring overhead that isn't required for keeping HA of probing event hub partitions. I choose to use them to remind probe grains to probe the event hubs, and the probe grains will be stateless workers with few minutes deactivation age so the reminders won't cause cross-silo messaging and when reminders are re-distributed among silos redundant probe grains will be collected after few minutes. Since they only probe event hubs and don't consume from it, I don't mind having probe grains duplicated between silos for few minutes, and that shouldn't happen too much because silos shouldn't shut down at short intervals and when they shut down only adjacent silos in the consistent hash ring are affected and assume ownership of orphaned reminders, which saves the need to have all active silos periodically query for work to be done and/or query for other active silos.

Orleans is simple awesome! I wish the reminders true performance characteristics would be documented, since they are much more useful than what documentation suggests.

gabikliot commented 8 years ago

So just to close on my answer from before: it appears that what you are actually asking is about resource allocation/job assignment, and not about work distribution (as it appeared initially in your question). The difference is that in job assignment you need to assign a certain job to a certain worker. Same with resource allocation - it is important where and to whom you allocate the resource. In your case the "resource" you are allocating is "responsibility to pull from event hub i".

As opposite to that, (simple) work distribution is when you have N identical jobs and you just need to do them in parallel, scale up their execution, without any restriction of pinning or assignment or allocation being "sticky".

My answer was how to do work distribution, as that what was implied you are asking about in your initial question. My answer does not hold for job assignment.

Eldar1205 commented 8 years ago

My apologies for not explaining myself better, but I do talk about work distribution. Probing the event hub partitions is simple work distribution: ~50K identical jobs, parallel, scaled, no restriction, as you said. Once a partition is detected to have messages it should be consumed using standard single activation grain, to benefit from Orleans interactive workload handling. Your answer did apply to work distribution, but you suggested reminders with 5 minutes period, which means it takes up to 6 minutes to recover from silo failure, which isn't acceptable according to imposed SLA. The recovery time should be 60-90 seconds. By reading reminders service code I deduced it can be used with sub-minute periods and gain all the benefits as long as the probe grains are stateless workers that are always activated locally by the reminder. That way by relying on reminders balancing and their efficient reliable implementation I can reliably balance my probe grains as required.

gabikliot commented 8 years ago

I did not suggest to use reminders. There is no need for them at all, in this simple pattern.

Eldar1205 commented 8 years ago

I saw you suggested reminders with 5 minutes period. To my knowledge that's the Orleans feature to run perodic work on the cluster surviving silo failure בתאריך 15 בפבר' 2016 0:42,‏ "Gabriel Kliot" notifications@github.com כתב:

I did not suggest to use reminders. There is no need for them at all, in this simple pattern.

— Reply to this email directly or view it on GitHub https://github.com/dotnet/orleans/issues/1428#issuecomment-183995492.