dotnet / orleans

Cloud Native application framework for .NET
https://docs.microsoft.com/dotnet/orleans
MIT License
10.07k stars 2.03k forks source link

Extend MayInterleave predicate #2862

Closed yevhen closed 7 years ago

yevhen commented 7 years ago

While working on converting our TPL DataFlow based pipeline to Orleans I've figured out that extending MayInterleave predicate with Grain instance could be incredibly helpful for expressing DataFlow's bounded capacity feature and user-level applied backpressure in general.

For example, DataFlow's ActionBlock could be expressed with a Grain (this is a no-brainer). By default blocks process messages one-by-one as an Orlean's grains. By default blocks work in a greedy mode, accepting any incoming message into an internal buffer. To apply backpressure the block could be configured to have bounded buffer capacity. Grains don't have any kind of internal buffer (like in Akka) but that behavior could be easily simulated by making grain [Reentrant] and hosting a queue (buffer) (List<TaskCompletionSource>) inside a grain. That's a well-known Orleans pattern (AsyncBatcher) for acknowledged processing.

Unfortunately, it looks like there is no way to stop accepting requests when queue is at max capacity. If we can have Grain instance passed along with InvokeMethodRequest to MayInterleavePredicate it might be possible to express this constraint and move blocking to Scheduler level, ie it won't interleave incoming requests (will enqueue them) until the next turn.

What do you think? Is this something that does make sense? Are there any other ways to solve this?

gabikliot commented 7 years ago

An alternative could be to have 2 queues inside the grain. Currently parallel and blocked. Sounds like it would be easier to express any logic about how the 2 queues interact with each other by having them both in the grain and not relying on the framework.

yevhen commented 7 years ago

I'm not sure I understood proposed solution :(

Example:

[Reentrant]
public class Batcher : Grain
{
    const int BatchSize = 512;

    readonly List<BufferItem> buffer = new List<BufferItem>();
    ITargetGrain target;

    public override Task OnActivate()
    {
        target = GrainFactory.GetGrain<ITargetGrain>(Id);
        return base.OnActivate();
    }

    public async Task<Status> Process(Message msg)
    {
        var item = new BufferItem(msg);
        buffer.Add(item);

        if (buffer.Count == BatchSize)
             await Flush();

        return await item.Completion();
    }

    async Task Flush()
    {
        var items = buffer.ToList(); // snapshot
        buffer.Clear();

        try
        {
            var batch = new Batch(items.Select(x => x.Message));

            BatchResult result = await target.Save(batch);

            items.ForEach(item => item.CompleteResult(result[item.Message.Id]));
        }
        catch (Exception ex)
        {
            items.ForEach(item => x.CompleteError(ex));
        }
    }

    class BufferItem
    {
        readonly TaskCompletionSource<Message> completion;

        public Task<Message> Completion() => completion.Task;
        public readonly Message Message;

        public BufferItem(Message msg)
        {
            completion = new TaskCompletionSource<Message>();
            Message = msg;
        }

        public void CompleteResult(Status status) => completion.SetResult(status);
        public void CompleteError(Exception exception) => completion.SetException(exception);
    }
}

How to prevent runtime from interleaving Process(Message msg) requests while flush is in progress?

gabikliot commented 7 years ago

If I understood your setup correctly, you want to allow limited concurrency: allow up to X requests to interleave. But if there are more then X, queue them. When one of the first Case finishes, pipe another one from the queued.

Did I understand it correctly?

yevhen commented 7 years ago

Exactly

gabikliot commented 7 years ago

Ok, cool. So it sounds to me this is something that can be relatively easy done in the grain space. You can write a helper class, that has a pending queue and current executing set. When the set size is below X, you add to the set and start executing. When the set is full, you put in the queue and don't execute. Once any currently executing finished (wrap execution inside your execution wrapper) , dequeue from the queued, add to the set and execute. You can even do it without an explicit set, just a counter.

Alternatively, one can extend may interleave predicate, but you asked about other options.

yevhen commented 7 years ago

Sounds like an AsyncPipeline I've seen somewhere in Orleans codebase - to limit the work in progress. But runtime will still interleave requests on a first await and I can't prevent that with suggested design :(

gabikliot commented 7 years ago

I am not sure I follow. I don't see why this cannot be done completely in grain space.

sergeybykov commented 7 years ago

@yevhen

To apply backpressure the block could be configured to have bounded buffer capacity.

I'm trying to understand how you want to apply backpressure here. If you keep accepting messages to the grain, even if only to add them to the 'non-reentrant' queue, then you aren't applying any pressure to the producer, are you? If I understood your idea correctly, the grain would have a limited 'executing' buffer of requests being processed with interleaving. But it would also have an 'unlimited' (there's a configurable runtime limit for that) buffer of incoming requests with no indication to the producer that the 'executing' buffer is full. Did I misunderstand?

gabikliot commented 7 years ago

Do you think it's even possible to do something different here Sergey? With async interface what back pressure one can apply, aside from executing slower? Slowing the execution, as opposite to any explicit back pressure, is how I understood Yengeny's scenario.

The only other option I see is an explicit handshake protocol with explicit flow control, encoded in the grain interface methods.

sergeybykov commented 7 years ago

Do you think it's even possible to do something different here Sergey?

That's kind of my question. I don't see how backpressure can be applied here without an explicit flow control protocol. For example, each call to the grain could return a number of available slots in its internal buffer. But then, alternatively, the caller could constrain the number of in-flight requests with an AsyncPipeline or something like that. Grain reporting available slots in its buffer will probably work better for varying number of producers. But if the number of producers to the grain is fixed, then the pipeline approach is likely simpler.

I don't understand how we can slow execution here. If requests are already admitted into the internal queue, they will execute as fast as possible. So it's all about admission control, isn't it?

gabikliot commented 7 years ago

Slow execution I meant start working on it later. Admission control, yes.

sergeybykov commented 7 years ago

@yevhen What do you think about the feedback?

yevhen commented 7 years ago

So it's all about admission control, isn't it?

ye, I came to the same conclusion.

yevhen commented 7 years ago

I should tell more about my use-case.

I have single azure queue and a number of pulling agents, one per silo (i.e. Competing Consumers pattern). The queue contains messages targeted at multiple receivers (tens of thousands). There could be multiple messages for an individual receiver and I'd like to batch processing of those messages inside receiver. So that it may hold a small buffer which is processed by timer.

What I want is to:

I was trying to reuse existing azure queue provider infra but it is built with assumption that once subscriber received message from the stream - the message could be deleted from the queue, which means we may lost uncommited messages residing in the receiver's buffer.

I've switched to a different task but I'll return to this very soon. Any suggestions on how we can model this with Orleans/AQP?

What do you think of using Reactive Streams (Akka) protocol, since it's all about admission control?

sergeybykov commented 7 years ago

What do you think of using Reactive Streams (Akka) protocol, since it's all about admission control?

I'm not sure. I seems not conceptually different from what we do today inside pulling agents - one cursor per subscriber, deliver next message only after the previous one is processed. So it's like every subscriber requesting 1 message at a time.

Maybe auto-batching would work here. What I mean by that is that a pulling agent would be configured to wait for n items per consumer or time interval t before sending a batch of events to the consumer. Having processed the batch, the consumer acknowledges it, and the cursor for that consumer jumps forward, and messages sent in the batch get deleted from the queue (if needed).

Would that make any sense? I wonder what @jason-bragg, @xiazen, @gabikliot, and everybody else think.

xiazen commented 7 years ago

Streaming message batch delivery has been talked about before and also asked from our users. I would say we should schedule sometime to design and build proper support for that.

delete message from the queue only after it was actually processed by receiver, i.e. receiver's buffer was flushed to disk.

I think this is one solution to handle losing message problem when batch processing messages. But as a alternative solution, there's RecoverableStreamProvider, such as EventHubStreamProvider, in Orleans which can greatly mitigate this problem. In RecoverableStreamProvider, you can configure how long you want your message to stay in the cache. So if there's unexpected failure in consumer side, the consumer can rewind back to a previous message as long as the message is still in the cache.

don't overpull from queue so that there is a limited number of messages in-flight, i.e. make pulling agents aware of the current progress and pull only when needed

EventHubStreamProvider also has back pressure algorithm built in to regulate pulling agent pulling from queues. Current back pressure algorithm will stop pulling agent from pulling messages when there's relatively slow consumers in the queue. May not be exactly what you wanted, but I believe it will greatly mitigate your problem. And also you can plug in your custom back pressure logic in EHStreamProvider now in 1.5.

I brought it up because it is already supported in orleans now so that you can use it now. Have you considered switching to EHStreamProvider?

sergeybykov commented 7 years ago

Seems to me we can close this one and use https://github.com/dotnet/orleans/issues/2685 for tracking the issue of batching APIs not being implemented. Need to update the title of it.

yevhen commented 7 years ago

Basically, it was about controlling the length of the grain inbox queue on application level.

sergeybykov commented 7 years ago

We have a crude kind of control there - via configuration, so that LimitExceededException is thrown when more than X requests are enqueued for an activation. I suspect you mean a more sophisticated mechanism for controlling those limits at run time.

yevhen commented 7 years ago

yes. At the minimum, controllable per grain type. What is your current recommendation for implementing request throttling in Orleans? Do I miss some known Orleans design pattern?

sergeybykov commented 7 years ago

What is your current recommendation for implementing request throttling in Orleans?

At the moment, there are only top-level controls.

  1. Load shedding option that is configured by max CPU utilization.
  2. Max queue size per activation. There are hard (rejection) and soft (warning) limits. Should be relatively easy to add support for per grain class limits. We could (not necessarily should) even allow per activation overrides of the limits at run time.

However, with both of these mechanisms the caller receives an exception when the limits are reached. I wonder (not sure) if reactive queries (#1883) prototyped last summer would be a better option if implemented.