dotnet / orleans

Cloud Native application framework for .NET
https://docs.microsoft.com/dotnet/orleans
MIT License
10.07k stars 2.03k forks source link

SimpleMessageStreamProvider Response did not arrive on time in 00:00:30 for message #795

Closed Merroy closed 9 years ago

Merroy commented 9 years ago

Faced with a problem when using SimpleMessageStreamProvider, I receive this error:

<a WARNING 100157 CallbackData 127.0.0.1:11111] Response did not arrive on time in 00:00:30 for message: Request S127.0.0.1:11111:180031794*grn/C51B0919/00000000@6d85f7b3->S127.0.0.1:11111:180031794*grn/F6BA0771/00000000+NEK@d1adabb3 #101: Orleans.Streams.IStreamConsumerExtension:DeliverItem(). Target History is: S127.0.0.1:11111:180031794:*grn/F6BA0771/00000000+NEK:@d1adabb3. About to break its promise.>

When this error occurs, the contents of the stream is cleared and the user continues to work only with new data that the producer sent in the stream. What can be the problem?

I have a task. in which a single grain (procuder, one entry point) will do the tasks, depending on the task directs them to the stream (several) on each stream on one subscriber (consumer), the consumer processes the message (this may take a while) and give to another grain. appropriately adhere strictly fifo. In other words. in the schema client -> producer need to "get rid" await, as quickly as possible to unlock the grain producer. And the scheme producer -> consumer - acceptable "waiting" because it takes time to process the message and you need to follow FIFO.

gabikliot commented 9 years ago

This is a question about backpressure in SMS streams. Your producer is overwhelming the consumer, by sending too much events in parallel and the consumer can’t keep up processing the events and therefore eventually events start timing out.

In Persistent streams you get backpressure for free by using the persistent queue in the middle (plus the built in Orleans backpressure between the agents pulling from the queues and delivering to consumers, as explained here). But in SMS stream it is up to the producer to regulate it. SMS streams are just a streaming API on top of Orleans messaging, there is no queue in the middle. The same way as one can in theory send too many messages from one grain to another, one can send too many stream events from one producer (client or grain) to a consumer. There is a number of options one can use to deal with backpressure in SMS streams:

1) Scale up the consumer – make it a Reentrant grain.

2) Scale out the consumer and the number of streams – if it is possible, split one big stream into multiple streams and subscribe from a different consumer grain to each.

3) Regulate the production on the produce: a. OnNext of the producer returns a Task that represents the processing of this event (if configured as FireAndForget false, which is the default case). The producer can simply await this Task before producing the next event. b. If one wants to produce at high rate and not one by one, you can still do it, but need to control how many event you are throwing at the consumer in parallel. A good approach would be to maintain a window with produced events that have not been processed yet (a buffer of Tasks for events that were send to a stream but whose Task was not resolved yet). The buffer has a limit. If the buffer is fill (all the items in the buffer have been sent but not acknowledged yet), wait a bit (await Delay) and try again. As items are acknowledged remove them from the buffer. One can easily built such a helper buffer class with Tasks and make it a reusable component in his app.

Merroy commented 9 years ago

If I understood everything correctly, options 1 and 2 would be violated FIFO, and with 3 version client who contacted the producer will wait until the producer does not wait for completion of the consumer.

Merroy commented 9 years ago

The only solution I currently see is to do an implementation of stream or use SimpleAzureQueueStreamProvider

gabikliot commented 9 years ago

with 3 version client who contacted the producer will wait until the producer does not wait for completion of the consumer

How is your client written now and how your producer is written now? I do not understand your explanation above after "In other words". Can you please describe exactly what client is doing and what it awaits, what producer is doing and what it awaits. Pseudo code will help.

Merroy commented 9 years ago

There is a single entry point into procuser:

var inputGraing = GrainClient.GrainFactory.GetGrain<IInputProducerGrain>(0)

Different clients at different times to send the data or the data queue:

await inputGraing.SendData(Data data);
await inputGraing.SendData(Queue<Data> data);

The customer is important and enough to know that the data is successfully sent (await) and forget about them. in the "data" there are some property "public string Code {get; set;}" in which they should be grouped and processed within your group and in the order which they were sent

apublic Task SendData (Data data) // public async Task SendData (Data data)
{
 …
_stream[data.Code].PushData(data); // await _stream[data.Code].PushData(data);
 …
}

respectively for parallelization is the stream with the appropriate code

private async Task<IAsyncStream<Data>> GetStream(string code)
{
var streamProvider = GetStreamProvider(PROCIDER_NAME);
var consumerGrain = GrainFactory.GetGrain<IConsumerGrain>(code);
await consumerGrain.SubscribeAsync();
return streamProvider.GetStream<Data>(Guid.Parse(GUID_STREAM), code);
}

And a consumer:

public override Task OnActivateAsync()
{
var streamProvider = GetStreamProvider(PROCIDER_NAME);
_asyncStream = streamProvider.GetStream<Data>(Guid.Parse(GUID_STREAM), this.GetPrimaryKeyString());
return TaskDone.Done;
}
public async Task SubscribeAsync()
{
var subscriptionHandles = await _asyncStream.GetAllSubscriptionHandles();
if (subscriptionHandles.IsNullOrEmpty())
await _asyncStream.SubscribeAsync(OnNextAsync);
}
public async Task OnNextAsync(Data data, StreamSequenceToken token = null)
{
await Task.Delay(10000); // long work
}

Consumer as possible should handle tasks while respecting FIFO in other words do you want to make quick, to parallelize on the grounds and put in the processing queue, and the consumer must process the data according to the queue in the stream it is allowed that the queue can grow to large sizes, and processing of one data packet can reach several hours

gabikliot commented 9 years ago

1) One producer grain? Is it a Stateless worker? If not, it will not scale. 2) As I understood, the client awaits the call but producer does not. Is that correct? 3) If yes to 2, then I don't understand the purpose of this logic? Why client is awating? 4) I am not saying client should not await, I am saying both client and producer should await. 5) > and processing of one data packet can reach several hours Processing can take hours? Is the processing CPU intensive task? I don't think Orleans as is, without writing an extra layer of reliability and persistence, is a good fit for task that takes hours to process. Orleans is mostly for interactive services, not for long background jobs.

Merroy commented 9 years ago

1)I admit that the only bottleneck entrance. I need to strictly follow FIFO, so it will not scale, but everything that comes after the processing of the processing of consumer - scaled 2)Almost within FireAndForget - true producer did not expect a response from the consumer and Vice versa. The client can continue to get a response from producer, and producer shall receive the response from the stream that the task is in the queue and the producer and the client will get rid of the responsibility for this task

gabikliot commented 9 years ago

the task is in the queue What queue? There are no queues in SMS. There is a "queue" in front of any grain (for non reentrant grains), but that queue is not persistent, it is in memory, so putting this msg in this queue does not buy you anything in term of guarantees of reliability.

Merroy commented 9 years ago

3)If the client does not expect a response, unless there are guarantees that the second message will be in the stream it is second and not the first 4)again, if we consider in the framework SMSProvider FireAndForget is false then he would not need to wait for the consumer 5)Why not? If I understand correctly, I can split into more small tasks. Orleans allows me to spread them across different silo. Or am I mistaken?

gabikliot commented 9 years ago

If you split and spread, then yes, it can work. But that is not what you originally wrote: "processing of one data packet can reach several hours". One packet is not "split and spread".

Merroy commented 9 years ago

At the moment I found a workaround, to be honest I don't particularly like. Abandoned stream provider, has created a grain state in which through StorageProvider live the queue, and the timer gets one message from the queue and sends to consumer.

Considering the use case of subscriber's quotes, as well as option which will iterate through all of the message queues in parallel, as recorded in the result table, and the watcher as soon as the results according to FIFO will pick up the finished result But I assume that this is just another crutch:)

sergeybykov commented 9 years ago

Maybe I misunderstood something here, but isn't the simplest solution for the requirements to use a persistent queue provider instead of SMS, e.g. ServiceBus or EventHub? A producer gets a confirmation quickly that the event got successfully written to the queue. FIFO is guaranteed. Consumer consumes at the rate it is capable of. Why use SMS to begin with?

Merroy commented 9 years ago

Started with an easy one :) Do I understand correctly that you need to implement PersistentStreamProvider ? In the event of failure of the service what is the probability of successful operation the grain and the silo?

gabikliot commented 9 years ago

No, you don't need to implement PersistentStreamProvider . PersistentStreamProvider is a base class that is already implemented. You need to implement a queue adapter, if you want to use a queue different from Azure Queue. http://dotnet.github.io/orleans/Orleans-Streams/Streams-Extensibility

In the event of failure of the service what is the probability of successful operation the grain and the silo

Now sure what you mean by a service. Which part of it and what kind of failure?

Merroy commented 9 years ago

Sorry, a little confused the name of the object, of course I meant QueueAdapterFactory and QueueAdapter including

gabikliot commented 9 years ago

Yes, you need to implement QueueAdapterFactory and QueueAdapter if you want to use a different queue rather than Azure Queue (for which we already have a QueueAdapter).

sergeybykov commented 9 years ago

Also, @riccardobecker created https://github.com/OrleansContrib/Orleans.StreamProviders for a ServiceBus adapter. But he didn't seem to push any code there. Maybe it's worth asking him about it.

Merroy commented 9 years ago

I mean what will be the behavior of the grain and the silo, if the service is not available in a particular time for some reason, I want to consider fault tolerance, at the moment I'm working with StorageProvider and it's easier for me to control because I control its operation and implementation and he always lives with the grain and silo

gabikliot commented 9 years ago

If a silo is down, the grains that used to be active of that silo will be automatically recreated on another silo upon first usage. Thus, there will be zero impact. Some individual messages may still get lost during the short period of time before silo is detected as dead. Beyond that, you don't need to worry about grain fault tolerance, it is automatically provided by Orleans. Storage/Persistent Queue fault tolerance is beyond Orleans's scope.

Merroy commented 9 years ago

why I'm wary of the situation as in the case of the SMS Provider (the situation is different, but I mean in general terms) when part of the queue is lost and the grain and the stream continued its work

Merroy commented 9 years ago

overall what I learned/heard/got the answer. more thanks for the advice you can close the topic if it is interesting and makes sense, I can share the result soon

gabikliot commented 9 years ago

Great! Glad we could help. Sure, please do share the results.