dotnet / orleans

Cloud Native application framework for .NET
https://docs.microsoft.com/dotnet/orleans
MIT License
10.02k stars 2.02k forks source link

await stream.SubscribeAsync(observer) fails with outdated client producer #982

Closed onionhammer closed 6 years ago

onionhammer commented 8 years ago

For an explicit stream subscription (grain is IAsyncObserver subscribing to stream), we are occasionally seeing this error; typically shortly after the silo starts.

StreamId: e9f1969d-3192-4a26-b801-9721322b417d, Error: System.AggregateException: One or more errors occurred. ---> Orleans.Runtime.OrleansException:  System.Collections.Generic.KeyNotFoundException: No activation for client *cli/004e9b88
   at Orleans.Runtime.Placement.PlacementDirectorsManager.<SelectOrAddActivation>d__0.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
   at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
   at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
   at Orleans.Runtime.Dispatcher.<AddressMessage>d__17.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
   at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
   at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
   at Orleans.Runtime.Dispatcher.<AsyncSendMessage>d__14.MoveNext()
   at Orleans.Streams.PubSubRendezvousGrain.<RegisterConsumer>d__1b.MoveNext()
   --- End of inner exception stack trace ---
   at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
   at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
   at Orleans.Runtime.GrainReference.<InvokeMethodAsync>d__0`1.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
   at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
   at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
   at Orleans.Streams.StreamConsumer`1.<SubscribeAsync>d__0.MoveNext()
onionhammer commented 8 years ago

@gabikliot mentioned it may be due to the silo not shutting down gracefully.

gabikliot commented 8 years ago

A couple of questions: 1) In this case, do you have a client restarted some time after it subscribes to this stream? 2) Is the client a producer or a consumer in this case? I think it is a producer, but just verifying. 3) Do you have a copy of the pub sub table in json format? I am only interested in the row for this stream. 4) Any chance you can create a test reproducing this case? I think it will involve killed/failed client.

onionhammer commented 8 years ago

I will get you #3, but briefly;

1) Typically we redeploy over & swap our cloud service which hosts the silos in a worker role with 2 instances. We definitely don't go out of our way to get a graceful shutdown. 2) We have another worker role sending information into the streams from an event hub, and the grain is the consumer of the stream; The error is occurring on the consumer end (according to the stack trace), when a grain is subscribing to the stream.

I will see about adding a test, but my time will be limited until next week.

onionhammer commented 8 years ago

Here's a link to the data for the three stream subscriptions

https://gist.github.com/onionhammer/0072b77383a6f53b9481

gabikliot commented 8 years ago

The test will really help make sure the problem is indeed what we think it is and also help validate any future fix. I suspect (but cannot be 100% sure) what happens in this case is the following: 1) the client (stream producer in your case, but same would happen if it was a consumer) is registered in the persistent pub sub. This is done so the consumer grain can later on find him, so that the producer can send the msg to the consumer. 2) when you swap out, you just kill the client (worker role hosting the client). 3) this client id stays in the pub sub table 4) when new cluster starts, the new client(s) are added as producers, but the old one is still there. 5) when the consumer subscribes, it tries to do the handshake with ALL producers in the pub sub, and since this one is down, the subscribe call fails.

The way to mimic this in the test is to basically re-create this scenario: a) create a setup with silo and grain subscribing and client producing. Host the client in a separate app domain. b) kill the silo c) unload the app domain. Do NOT gracefully shut down the client (do NOT call GrainClient.Uninitialize) d) start a new silo and try to subscribe again. The subscribe call in the grain should throw.

Potential fixes/mitigations: 1) An easy mitigation is to make sure you call GrainClient.Uninitialize when client is shut down. In the test you can do it from within the app domain unload event. In the real service you can do it from the worker role OnStop call (like done here https://github.com/dotnet/orleans/blob/master/Samples/AzureWebSample/OrleansAzureSilos/WorkerRole.cs#L95 for silo, but you have to do it for client)

2) Use Azure Queue stream instead. It does not have that problem at all, since client that produces the events is only enqueuing directly to AQ and not talking to the grain. Of course, that now uses a different stream provider, so I am not sure if this is appropriate in your case.

Both above are mitigations. They are not the real fix, since in the general case the client can fail without Azure PaaS invoking the OnStop. But in practice in most of the failure cases it does, so in practice you will handle probably a vast majority of cases.

The real fix requires the streaming infrustructure to properly deal with that scenario. We need to detect client crashes and remove those clients from the pub sub. This is not easy, currently, since the cluster does not track client liveness (only silos). Just reacting naively to client unreachability by treating this client as dead would be wrong, since the client may have just disconnected temporarily. If we just treat him as dead and remove from the pub sub, we may violate streaming semantics.

I do have some ideas how this can be fixed. Needs to be brainstormed and thought through.

I suggest implementing the test and making sure the suggested mitigation (a mitigation) indeed solves the problem. If it does, it will validate my suspicion that this is indeed the bug. At this point we can discuss at more details how this can be fixed

onionhammer commented 8 years ago

Thanks for the suggestions. I will work on a test PR next week

onionhammer commented 8 years ago

Hmm, so I started in on this, but it's going to be difficult to build this into the current tester/testsilohost mechanism that exists in the orleans solution, given that it has to manipulate silos and appdomains in a single test method, what if I built this as a separate SLN on github and pointed to it?

I'm open to suggestions though.

gabikliot commented 8 years ago

what if you just don't use UnitTestHost, but still do it in the Test project? That way you have all the freedom to manipulate silos and clients as you wish, but the test will still be runnable in the same project.

sergeybykov commented 7 years ago

@jason-bragg, hasn't this been fixed already?

jason-bragg commented 7 years ago

Yes.

shlomiw commented 6 years ago

Hi, Unfortunately I've encountered a similar case: Using SMSProvider stream with in-memory storage (happens also with sql storage). Orleans 1.5.0.

Digging with verbose logs, I found out that once the client (which is a consumer and producer) restarts, it doesn't get cleaned from PubSubRendezvousGrain producers list. And then when it tries to subscribes to the stream, the PubSubRendezvousGrain tries to notify the producer about the new consumer, and then it stuck, because it doesn't exists anymore.

If I do graceful shutdown of the client, it works fine. But in reality I cannot always shutdown gracefully.

Attached verbose logs which I cleaned up. logs.txt

jason-bragg commented 6 years ago

Hi @shlomiw, By "stuck" what do you mean? Is the client subscription call never returning? Timing out? Throwing some other error?

I'm having difficulties mapping your description of the scenario with what I see in the log. Can you be more explicit about the sequence of log entries that led you to your conclusions?

shlomiw commented 6 years ago

Hi @jason-bragg , thanks for picking this one up, it's really important to have SMSProvider as reliable as possible, because we're relying on it for fast streams which don't require events persistence.

Silo: I've updated the silo logs with internal comments (lines starting with //): logs_with_comments.txt

I've added comments there to show the order of subscribers and publisher of the stream. You can see that the last PubSubRendezvousGrain log - is trying to notify the producer about the new consumer, but the producer is dead. And it is probably stuck there.

Client: After restarting the client, without gracefully shutting it down, when the client tries to subscribe to the stream it receives the following logs:

LINQPad.UserQuery.exe Warning: 0 : [2017-09-29 20:28:33.698 GMT     6  WARNING  100157  CallbackData  192.168.100.63:0]  Response did not arrive on time in 00:00:30 for message: Request *cli/f8985383@73361599->S127.0.0.1:30000:0*grn/716E8E94/00000000+SMSProvider_Cache #4: global::Orleans.Streams.IPubSubRendezvousGrain:RegisterConsumer(). Target History is: <S127.0.0.1:30000:0:*grn/716E8E94/00000000+SMSProvider_Cache:>. About to break its promise.  
[2017-09-29 20:28:33.698 GMT     6  WARNING  100157  CallbackData  192.168.100.63:0]  Response did not arrive on time in 00:00:30 for message: Request *cli/f8985383@73361599->S127.0.0.1:30000:0*grn/716E8E94/00000000+SMSProvider_Cache #4: global::Orleans.Streams.IPubSubRendezvousGrain:RegisterConsumer(). Target History is: <S127.0.0.1:30000:0:*grn/716E8E94/00000000+SMSProvider_Cache:>. About to break its promise.  

And then the TimeOutException:

Response did not arrive on time in 00:00:30 for message: Request *cli/f8985383@73361599->S127.0.0.1:30000:0*grn/716E8E94/00000000+SMSProvider_Cache #4: global::Orleans.Streams.IPubSubRendezvousGrain:RegisterConsumer(). Target History is: <S127.0.0.1:30000:0:*grn/716E8E94/00000000+SMSProvider_Cache:>.
--

Please let me know about any other information or scenarios will help you to better understand. Thanks in advance!

jason-bragg commented 6 years ago

The final line in the pubsub : "Notifying 1 existing producer(s) about new consumer ObserverId.... " Indicates that the call was made to the producer, but does not report an error. So the call may be stuck, or it may have simply succeeded. Unfortunately we don't log success of the operation, so no further logging in that call would normally indicate success, not 'stuck'.

For diagnoses, it is important to know if this operation succeeded, or never returned. 'Never returned' is highly unexpected. One way of determining this is to try more subscribe calls on the stream after the timeout period. In your client logic, on subscription attempt timeouts, if you could log and retry a couple times (logging each retry) we should be able to verify the unexpected behavior. What we should see in that case is that the pubsub grain should report further subscribe calls which correspond to the client retries. If the pubsub grain is stuck however, we'd see no logs corresponding to the client retries.

We have testing around these behaviors. If you can take a quick look and identify any difference between these scenarios and your case, that would be very helpful.

https://github.com/dotnet/orleans/blob/master/test/Tester/StreamingTests/SMSClientStreamTests.cs https://github.com/dotnet/orleans/blob/master/test/Tester/StreamingTests/ClientStreamTestRunner.cs

Some questions to help us repro this: How many silos are in your cluster? How many clients are connected to the cluster? Is each silo a gateway? If not, how many gateways are configured? Is the same client the producer and consumer of the stream? After a client crashes, how long does it take for the client to come back up and reconnect?

shlomiw commented 6 years ago

Hi, I've tried many times to retry, wait, retry again, etc. I did another test with logs. The silo log: silo.log

Again I've cleaned it up and added comments of what's going on (indicated with //). You'd see there: (only 1 silo and 1 client)

Then I killed it again and tried this process yet again, with same results.

Attaching client log after restarting it: (it's linqpad output in html) client.rename_to_html.log

Answering your questions - This is simple dev environment with only 1 silo and this 1 client. Using defaults of In-Memory storage. The 1 silo is the only gateway. The same client is the producer and consumer of the stream. Also the silo subscribes to this stream. Note that if I remove the silo subscription - it doesn't affect the behavior of the client. After the client crush - I waited for more than a minute to reconnect.

I've seen this issue also happens on our QA environment (we're not live yet with Prod env :))

Now I'll take a look at your tests and let you know if there's a similar one.

Thanks!!

shlomiw commented 6 years ago

We have testing around these behaviors. If you can take a quick look and identify any difference between these scenarios and your case, that would be very helpful. https://github.com/dotnet/orleans/blob/master/test/Tester/StreamingTests/SMSClientStreamTests.cs https://github.com/dotnet/orleans/blob/master/test/Tester/StreamingTests/ClientStreamTestRunner.cs

The scenario is quite almost the same, one of the differences is that, as I understand it, when you want to kill the client (ungracefully), you simply dispose the ClusterClient without disconnecting, BUT - you continue the test in the same process. In my case - I'm totally killing the client's process, and when the silo tries to notify the 'old' client, where the process doesn't exists anymore - then it get 'stuck' (at least looks like it). Can it be the reason?

jason-bragg commented 6 years ago

Ok, so here is what I have, and what I'm seeing.

Using 1.5.1 I have a very simple standalone single silo with sms stream provider configured and a memory storage provider for pubsub. I have a very simple standalone single client that subscribes to a stream of integers and produces on it, so single client is producer and consumer. The subscribe call is retried on error and the error is logged. Upon success of the subscribe call, one integer is published to the stream every 100ms. When receiving integer events back on the stream they are written to console.

I start silo. I start client I see integer events written to console, meaning we're generating and receiving events. I hard kill the client. I restart the client The client times out on subscribing to the stream. It continues to fail to subscribe until after 1 minute. (The default ClientDropTimeout) After the 1 minute of retries, the subscribe succeeds and the client begins producing and consuming events again.

This is expected behavior. I am not seeing anything getting 'stuck', only timeouts trying to communicate with the dead client while the silo has not given up on it. If I reduce the ClientDropTimeout to 5 seconds, the client encounters a single subscription timeout call, then starts working again.

Client

class Program
{
    static void Main(string[] args)
    {
        // Then configure and connect a client.
        var clientConfig = ClientConfiguration.LocalhostSilo();
        clientConfig.AddSimpleMessageStreamProvider("SMS");
        var client = new ClientBuilder().UseConfiguration(clientConfig).Build();
        Run(client).Wait();
    }

    private static async Task Run(IClusterClient client)
    { 
        await client.Connect();
        Console.WriteLine("Client connected.");

        var stream = client.GetStreamProvider("SMS").GetStream<int>(new Guid(1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0), "Blarg");
        StreamSubscriptionHandle<int> handle = null;
        do
        {
            try
            {
                handle = await stream.SubscribeAsync(OnNext);
            }
            catch (Exception ex)
            {
                Console.WriteLine($"Subscribe failed with errro: {ex}.");
                handle = null;
            }
        } while (handle == null);
        Console.WriteLine("Subscribed to stream.");

        int i = 0;
        while(true)
        {
            await Task.Delay(100);
            await stream.OnNextAsync(i++);
        }
    }

    private static Task OnNext(int i, StreamSequenceToken token)
    {
        Console.WriteLine($"Received {i}.");
        return Task.CompletedTask;
    }
}

Silo

class Program
{
    static void Main(string[] args)
    {
        var siloConfig = ClusterConfiguration.LocalhostPrimarySilo();
        siloConfig.AddSimpleMessageStreamProvider("SMS");
        siloConfig.AddMemoryStorageProvider("PubSubStore");
        //siloConfig.Globals.ClientDropTimeout = TimeSpan.FromSeconds(5); 
        var silo = new SiloHost("StreamingSilo", siloConfig);
        silo.InitializeOrleansSilo();
        silo.StartOrleansSilo();

        Console.WriteLine("Silo started.");
        Console.WriteLine("\nPress Enter to terminate...");
        Console.ReadLine();

        // Shut down
        silo.ShutdownOrleansSilo();
    }
}

Does this deviate from what you are seeing?

shlomiw commented 6 years ago

@jason-bragg thank you very very much for your efforts! it's not trivial at all. I'm going to check your code and compare it to mine and see what's the difference.

I still have a question though - why would you get a timeout when trying to re-subscribe to the stream? since you restarted it (after hard kill), it's a new client, shouldn't it subscribe immediately? is this really expected behavior also for other clients that trying to subscribe after you kill one of the clients?

Anyway, I'll let you know about my findings.

Thanks again!

jason-bragg commented 6 years ago

@shlomiw

Why would you get a timeout when trying to re-subscribe to the stream

When using SMS and explicit subscriptions, every subscribe call fans out to all of the producers to tell them about the new subscription. Since Orleans does not immediately drop a client when it's connection is lost, as it may be a transient networking issue, the previous client is still understood by the system as an active producer, which needs to be notified about the new subscription. This call will time out, causing the subscribe call to timeout.

The MemoryStreamProvider does not have this issue. The main behavioral difference between the memory stream provider and sms is that when producing on a memory stream, events are written to an in-memory queue, then delivered, while sms will deliver and process the event in all subscribers before the OnNextAsync returns. This pattern of sending an event to the queue while consumers read events from the queue, decouples production from consumption, allowing for a system slightly more recoverable from client failures. The risk of this is that when an OnNextAsync call succeeds while using sms, one can be confident that the event was processed, while a success of OnNextAsync on a memory stream means only that the event is available for consumption by subscribers. If a silo crashed between the time the OnNextAsync succeeds and the event is consumed, events may be lost.

shlomiw commented 6 years ago

@jason-bragg I've struggled with your code for a while, from some weird reason, only if I run the processes outside Visual Studio environment then it works. I can confirm also that your code also works against my bloated silo :)

I'm starting to better understand how SMS works, and I guess it doesn't fit my streaming requirements. It's unacceptable for us that if a producer is down, then all other clients which try to subscribe can hang in timeout (until the producer client is dropped). It's not only that - also if you try to publish an event (from a another client) during this time - you can't - you'll also get a timeout.

One important conclusion - we should retry the subscribe operation. Makes sense.

I didn't know about MemoryStreamProvider. It might better suit our needs, as it's disconnected between producers and consumer. Though it says:

    /// <summary>
    /// This is a persistent stream provider that uses in-memory grain to queue the events.
    /// This is primarily for test purposes.
    /// </summary> 

So I'm not sure how reliable can it be for Production high throughput streaming requirements. I will dig more.

Once again, many thanks for your efforts!