confluentinc / confluent-kafka-dotnet

Confluent's Apache Kafka .NET client
https://github.com/confluentinc/confluent-kafka-dotnet/wiki
Apache License 2.0
2.78k stars 847 forks source link

OAuth callbacks, Dispose(), and thread safety #2217

Open j2jensen opened 1 month ago

j2jensen commented 1 month ago

Description

I'm looking for guidance on how to handle Dispose() when I have registered an OAuth handler with SetOAuthBearerTokenRefreshHandler.

This comment notes:

After the Dispose method returns, delivery callbacks will not be triggered.
Delivery callbacks are called on a thread (the "poll thread") managed by the producer, which as you point out is different to the thread on which you call Dispose. It's worth explicitly noting then that the dispose method blocks on the poll thread finishing (i.e. no delivery callbacks being processed) before rendering the underlying producer instance unusable, so it's safe to make use of the producer instance in your callbacks without any locking.

Is this the behavior we should expect from the OAuth Bearer Token Refresh Handler, too?

The reason I ask is that in integration testing I'm finding strange race conditions which can lead to a variety of unexpected behaviors. I'm trying to determine whether that's a bug, or if there's a recommended set of patterns that can help me avoid these issues.

When my application is shutting down, I'm careful to call Flush() and then Dispose() on my Producers. I've been assuming that if I call Flush() it will make sure everything in-progress completes, and then it's safe to call Dispose().

However, sometimes Flush() appears to wait for the OAuth token handler to complete, and other times it doesn't. Same with Dispose(). In several of my integration tests I'm able to consistently get into a state where:

As a side note, the reason Flush is getting called twice relates to WebApplicationFactory calling IHostedService.StopAsync twice. I can address this by making my IHostedService.StopAsync() only execute its underlying logic once, and returning the same Task instance on subsequent calls--that way the second call doesn't complete, and Dispose() doesn't get called, until the first (blocking) call to Flush() is done. But I want to understand whether I'm just putting addressing one symptom of an underlying problem.

What additional logic do I need to add around the OAuth handler to make sure I don't end up with threading issues?

How to reproduce

Using Confluent.Kafka v2.3.0.

I am still trying to find good minimal reproduction steps. I am being hindered by the nondeterministic nature of the behavior I'm seeing. For example, the following test shows that Flush() blocks on the refresh handler when I Run it in Visual Studio, but doesn't when I Debug it:

   [Fact]
   public async Task Dispose_Flush_Test5_Logging()
   {
       var builder = new ProducerBuilder<Null, Null>(new ProducerConfig
       {
           BootstrapServers = Guid.NewGuid().ToString(),
           SaslMechanism = SaslMechanism.OAuthBearer,
           SecurityProtocol = SecurityProtocol.SaslSsl,
           Debug = "all"
       });
       var refreshHandlerCalled = new TaskCompletionSource();
       var tokenRetrievalSimulated = new TaskCompletionSource();
       var setBearerTokenError = new TaskCompletionSource<Exception?>(TaskCreationOptions.RunContinuationsAsynchronously);

       builder.SetLogHandler((producer, message) =>
       {
           _testOutputHelper.WriteLine($"{message.Level}|{message.Name}:{message.Message}");
       });

       builder.SetOAuthBearerTokenRefreshHandler((producer, _) =>
       {
           refreshHandlerCalled.SetResult();
           tokenRetrievalSimulated.Task.GetAwaiter().GetResult();
           try
           {
               producer.OAuthBearerSetTokenFailure("Foo");
               setBearerTokenError.SetResult(null);
           }
           catch (Exception ex)
           {
               setBearerTokenError.SetResult(ex);
           }
       });

       var producer = builder.Build();
       var tokenRetrievalTask = Task.Run(async () =>
       {
           await Task.Delay(100);
           tokenRetrievalSimulated.SetResult();
       });
       producer.Flush();
       _testOutputHelper.WriteLine("Flush done. Token retrieval: " + tokenRetrievalTask.Status);

       producer.Dispose();
       _testOutputHelper.WriteLine("Dispose done. Token retrieval: " + tokenRetrievalTask.Status);
       await refreshHandlerCalled.Task.ConfigureAwait(false);
       var error = await setBearerTokenError.Task;
       Assert.Null(error);
   }
Test output Output when Running: ``` Debug|rdkafka#producer-1:[thrd:app]: Selected provider OAUTHBEARER (builtin) for SASL mechanism OAUTHBEARER Debug|rdkafka#producer-1:[thrd:app]: Using OpenSSL version OpenSSL 3.0.8 7 Feb 2023 (0x30000080, librdkafka built with 0x30000080) Debug|rdkafka#producer-1:[thrd:app]: 64 certificate(s) successfully added from Windows Certificate Root store, 0 failed Debug|rdkafka#producer-1:[thrd:app]: sasl_ssl://c89476f2-b667-4b7e-8f3f-63543b347f73:9092/bootstrap: Added new broker with NodeId -1 Debug|rdkafka#producer-1:[thrd:app]: sasl_ssl://c89476f2-b667-4b7e-8f3f-63543b347f73:9092/bootstrap: Selected for cluster connection: bootstrap servers added (broker has 0 connection attempt(s)) Debug|rdkafka#producer-1:[thrd:app]: librdkafka v2.3.0 (0x20300ff) rdkafka#producer-1 initialized (builtin.features gzip,snappy,ssl,sasl,regex,lz4,sasl_gssapi,sasl_plain,sasl_scram,plugins,zstd,sasl_oauthbearer,http,oidc, SSL ZLIB SNAPPY ZSTD CURL SASL_SCRAM SASL_OAUTHBEARER PLUGINS HDRHISTOGRAM, debug 0xfffff) Debug|rdkafka#producer-1:[thrd:app]: Client configuration: Debug|rdkafka#producer-1:[thrd:app]: client.software.name = confluent-kafka-dotnet Debug|rdkafka#producer-1:[thrd:app]: client.software.version = 2.3.0 Debug|rdkafka#producer-1:[thrd:app]: metadata.broker.list = c89476f2-b667-4b7e-8f3f-63543b347f73 Debug|rdkafka#producer-1:[thrd:app]: debug = generic,broker,topic,metadata,feature,queue,msg,protocol,cgrp,security,fetch,interceptor,plugin,consumer,admin,eos,mock,assignor,conf,all Debug|rdkafka#producer-1:[thrd:app]: log_cb = 00007FFBA6713464 Debug|rdkafka#producer-1:[thrd:app]: security.protocol = sasl_ssl Debug|rdkafka#producer-1:[thrd:app]: sasl.mechanisms = OAUTHBEARER Debug|rdkafka#producer-1:[thrd:app]: oauthbearer_token_refresh_cb = 00007FFBA67134A4 Debug|rdkafka#producer-1:[thrd:app]: dr_msg_cb = 00007FFBA6713424 Debug|rdkafka#producer-1:[thrd:sasl_ssl://c89476f2-b667-4b7e-8f3f-63543b347f73:9092/bootstrap]: sasl_ssl://c89476f2-b667-4b7e-8f3f-63543b347f73:9092/bootstrap: Enter main broker thread Debug|rdkafka#producer-1:[thrd:sasl_ssl://c89476f2-b667-4b7e-8f3f-63543b347f73:9092/bootstrap]: sasl_ssl://c89476f2-b667-4b7e-8f3f-63543b347f73:9092/bootstrap: Received CONNECT op Debug|rdkafka#producer-1:[thrd:sasl_ssl://c89476f2-b667-4b7e-8f3f-63543b347f73:9092/bootstrap]: sasl_ssl://c89476f2-b667-4b7e-8f3f-63543b347f73:9092/bootstrap: Broker changed state INIT -> TRY_CONNECT Debug|rdkafka#producer-1:[thrd:sasl_ssl://c89476f2-b667-4b7e-8f3f-63543b347f73:9092/bootstrap]: Broadcasting state change Debug|rdkafka#producer-1:[thrd::0/internal]: :0/internal: Enter main broker thread Flush done. Token retrieval: WaitingForActivation Error|rdkafka#producer-1:[thrd:app]: Failed to acquire SASL OAUTHBEARER token: Foo Dispose done. Token retrieval: RanToCompletion ``` Output when Testing: ``` Debug|rdkafka#producer-1:[thrd:app]: Selected provider OAUTHBEARER (builtin) for SASL mechanism OAUTHBEARER Debug|rdkafka#producer-1:[thrd:app]: Using OpenSSL version OpenSSL 3.0.8 7 Feb 2023 (0x30000080, librdkafka built with 0x30000080) Debug|rdkafka#producer-1:[thrd:app]: 64 certificate(s) successfully added from Windows Certificate Root store, 0 failed Debug|rdkafka#producer-1:[thrd:app]: sasl_ssl://bcee0cf1-581c-460c-98a1-2cc654381c80:9092/bootstrap: Added new broker with NodeId -1 Debug|rdkafka#producer-1:[thrd:app]: sasl_ssl://bcee0cf1-581c-460c-98a1-2cc654381c80:9092/bootstrap: Selected for cluster connection: bootstrap servers added (broker has 0 connection attempt(s)) Debug|rdkafka#producer-1:[thrd:app]: librdkafka v2.3.0 (0x20300ff) rdkafka#producer-1 initialized (builtin.features gzip,snappy,ssl,sasl,regex,lz4,sasl_gssapi,sasl_plain,sasl_scram,plugins,zstd,sasl_oauthbearer,http,oidc, SSL ZLIB SNAPPY ZSTD CURL SASL_SCRAM SASL_OAUTHBEARER PLUGINS HDRHISTOGRAM, debug 0xfffff) Debug|rdkafka#producer-1:[thrd:app]: Client configuration: Debug|rdkafka#producer-1:[thrd:app]: client.software.name = confluent-kafka-dotnet Debug|rdkafka#producer-1:[thrd:app]: client.software.version = 2.3.0 Debug|rdkafka#producer-1:[thrd:app]: metadata.broker.list = bcee0cf1-581c-460c-98a1-2cc654381c80 Debug|rdkafka#producer-1:[thrd:app]: debug = generic,broker,topic,metadata,feature,queue,msg,protocol,cgrp,security,fetch,interceptor,plugin,consumer,admin,eos,mock,assignor,conf,all Debug|rdkafka#producer-1:[thrd:app]: log_cb = 00007FFBA66F3464 Debug|rdkafka#producer-1:[thrd:app]: security.protocol = sasl_ssl Debug|rdkafka#producer-1:[thrd:app]: sasl.mechanisms = OAUTHBEARER Debug|rdkafka#producer-1:[thrd:app]: oauthbearer_token_refresh_cb = 00007FFBA66F34A4 Debug|rdkafka#producer-1:[thrd:app]: dr_msg_cb = 00007FFBA66F3424 Debug|rdkafka#producer-1:[thrd::0/internal]: :0/internal: Enter main broker thread Debug|rdkafka#producer-1:[thrd:sasl_ssl://bcee0cf1-581c-460c-98a1-2cc654381c80:9092/bootstrap]: sasl_ssl://bcee0cf1-581c-460c-98a1-2cc654381c80:9092/bootstrap: Enter main broker thread Debug|rdkafka#producer-1:[thrd:sasl_ssl://bcee0cf1-581c-460c-98a1-2cc654381c80:9092/bootstrap]: sasl_ssl://bcee0cf1-581c-460c-98a1-2cc654381c80:9092/bootstrap: Received CONNECT op Debug|rdkafka#producer-1:[thrd:sasl_ssl://bcee0cf1-581c-460c-98a1-2cc654381c80:9092/bootstrap]: sasl_ssl://bcee0cf1-581c-460c-98a1-2cc654381c80:9092/bootstrap: Broker changed state INIT -> TRY_CONNECT Debug|rdkafka#producer-1:[thrd:sasl_ssl://bcee0cf1-581c-460c-98a1-2cc654381c80:9092/bootstrap]: Broadcasting state change Error|rdkafka#producer-1:[thrd:app]: Failed to acquire SASL OAUTHBEARER token: Foo Flush done. Token retrieval: RanToCompletion Dispose done. Token retrieval: RanToCompletion ```

Here's another test that seems to fail intermittently when I Debug it:

    [Theory]
    [InlineData(1)]
    [InlineData(10)]
    [InlineData(20)]
    [InlineData(50)]
    [InlineData(100)]
    [InlineData(200)]
    [InlineData(500)]
    public async Task Dispose_Flush_Test8_Logging(int delay)
    {
        var builder = new ProducerBuilder<Null, Null>(new ProducerConfig
        {
            BootstrapServers = Guid.NewGuid().ToString(),
            SaslMechanism = SaslMechanism.OAuthBearer,
            SecurityProtocol = SecurityProtocol.SaslSsl,
            Debug = "all"
        });
        var refreshHandlerCalled = new TaskCompletionSource();
        var tokenRetrievalSimulated = new TaskCompletionSource();
        var setBearerTokenError = new TaskCompletionSource<Exception?>(TaskCreationOptions.RunContinuationsAsynchronously);

        builder.SetLogHandler((producer, message) =>
        {
            _testOutputHelper.WriteLine($"{message.Level}|{message.Name}:{message.Message}");
        });

        builder.SetOAuthBearerTokenRefreshHandler((producer, _) =>
        {
            refreshHandlerCalled.SetResult();
            _testOutputHelper.WriteLine($"Getting token");
            tokenRetrievalSimulated.Task.GetAwaiter().GetResult();
            _testOutputHelper.WriteLine($"Token Retrieved");
            try
            {
                producer.OAuthBearerSetTokenFailure("Foo");
                _testOutputHelper.WriteLine($"Called OAuthBearerSetTokenFailure");
                setBearerTokenError.SetResult(null);
            }
            catch (Exception ex)
            {
                _testOutputHelper.WriteLine($"Exception: " + ex);
                setBearerTokenError.SetResult(ex);
            }
        });

        var producer = builder.Build();
        var tokenRetrievalTask = Task.Run(async () =>
        {
            await Task.Delay(100);
            tokenRetrievalSimulated.SetResult();
        });
        var backgroundFlushAndDisposeTask = Task.Run(async () =>
        {
            await Task.Delay(delay);
            _testOutputHelper.WriteLine($"Calling flush in the background");
            producer.Flush();
            _testOutputHelper.WriteLine($"Background Flush done.");

            producer.Dispose();
            _testOutputHelper.WriteLine($"Dispose done.");
        });
        _testOutputHelper.WriteLine($"Calling flush");
        producer.Flush();
        _testOutputHelper.WriteLine($"Flush done.");
        await refreshHandlerCalled.Task.ConfigureAwait(false);
        var error = await setBearerTokenError.Task;
        Assert.Null(error);
        await backgroundFlushAndDisposeTask.ConfigureAwait(false);
    }
Test output Output when successful: ```  Kafka.WebApi.Tests.FlushDisposeBugRepro.Dispose_Flush_Test8_Logging(delay: 100)  Source: KafkaControllerTests.cs line 463  Duration: 257 ms Standard Output:  Debug|rdkafka#producer-7:[thrd:app]: Selected provider OAUTHBEARER (builtin) for SASL mechanism OAUTHBEARER Debug|rdkafka#producer-7:[thrd:app]: Using OpenSSL version OpenSSL 3.0.8 7 Feb 2023 (0x30000080, librdkafka built with 0x30000080) Debug|rdkafka#producer-7:[thrd:app]: 64 certificate(s) successfully added from Windows Certificate Root store, 0 failed Debug|rdkafka#producer-7:[thrd:app]: sasl_ssl://3eb6a3af-f3cb-4f09-af21-7ed8f0e9c523:9092/bootstrap: Added new broker with NodeId -1 Debug|rdkafka#producer-7:[thrd:app]: sasl_ssl://3eb6a3af-f3cb-4f09-af21-7ed8f0e9c523:9092/bootstrap: Selected for cluster connection: bootstrap servers added (broker has 0 connection attempt(s)) Debug|rdkafka#producer-7:[thrd:app]: librdkafka v2.3.0 (0x20300ff) rdkafka#producer-7 initialized (builtin.features gzip,snappy,ssl,sasl,regex,lz4,sasl_gssapi,sasl_plain,sasl_scram,plugins,zstd,sasl_oauthbearer,http,oidc, SSL ZLIB SNAPPY ZSTD CURL SASL_SCRAM SASL_OAUTHBEARER PLUGINS HDRHISTOGRAM, debug 0xfffff) Debug|rdkafka#producer-7:[thrd:app]: Client configuration: Debug|rdkafka#producer-7:[thrd:app]: client.software.name = confluent-kafka-dotnet Debug|rdkafka#producer-7:[thrd:app]: client.software.version = 2.3.0 Debug|rdkafka#producer-7:[thrd:app]: metadata.broker.list = 3eb6a3af-f3cb-4f09-af21-7ed8f0e9c523 Debug|rdkafka#producer-7:[thrd:app]: debug = generic,broker,topic,metadata,feature,queue,msg,protocol,cgrp,security,fetch,interceptor,plugin,consumer,admin,eos,mock,assignor,conf,all Debug|rdkafka#producer-7:[thrd:app]: log_cb = 00007FFB65F138E4 Debug|rdkafka#producer-7:[thrd:app]: security.protocol = sasl_ssl Debug|rdkafka#producer-7:[thrd:app]: sasl.mechanisms = OAUTHBEARER Debug|rdkafka#producer-7:[thrd:app]: oauthbearer_token_refresh_cb = 00007FFB65F13924 Debug|rdkafka#producer-7:[thrd:app]: dr_msg_cb = 00007FFB65F138A4 Debug|rdkafka#producer-7:[thrd:sasl_ssl://3eb6a3af-f3cb-4f09-af21-7ed8f0e9c523:9092/bootstrap]: sasl_ssl://3eb6a3af-f3cb-4f09-af21-7ed8f0e9c523:9092/bootstrap: Enter main broker thread Debug|rdkafka#producer-7:[thrd:sasl_ssl://3eb6a3af-f3cb-4f09-af21-7ed8f0e9c523:9092/bootstrap]: sasl_ssl://3eb6a3af-f3cb-4f09-af21-7ed8f0e9c523:9092/bootstrap: Received CONNECT op Debug|rdkafka#producer-7:[thrd:sasl_ssl://3eb6a3af-f3cb-4f09-af21-7ed8f0e9c523:9092/bootstrap]: sasl_ssl://3eb6a3af-f3cb-4f09-af21-7ed8f0e9c523:9092/bootstrap: Broker changed state INIT -> TRY_CONNECT Debug|rdkafka#producer-7:[thrd:sasl_ssl://3eb6a3af-f3cb-4f09-af21-7ed8f0e9c523:9092/bootstrap]: Broadcasting state change Calling flush Getting token Debug|rdkafka#producer-7:[thrd::0/internal]: :0/internal: Enter main broker thread Calling flush in the background Background Flush done. Token Retrieved Error|rdkafka#producer-7:[thrd:app]: Failed to acquire SASL OAUTHBEARER token: Foo Called OAuthBearerSetTokenFailure Flush done. Dispose done. ``` Output when failed: ```  Kafka.WebApi.Tests.FlushDisposeBugRepro.Dispose_Flush_Test8_Logging(delay: 20)  Source: KafkaControllerTests.cs line 463  Duration: 202 ms Message:  System.ObjectDisposedException : handle is destroyed Stack Trace:  SafeKafkaHandle.ThrowIfHandleClosed() Producer`2.Flush(CancellationToken cancellationToken) FlushDisposeBugRepro.Dispose_Flush_Test8_Logging(Int32 delay) line 517 --- End of stack trace from previous location --- Standard Output:  Debug|rdkafka#producer-2:[thrd:app]: Selected provider OAUTHBEARER (builtin) for SASL mechanism OAUTHBEARER Debug|rdkafka#producer-2:[thrd:app]: Using OpenSSL version OpenSSL 3.0.8 7 Feb 2023 (0x30000080, librdkafka built with 0x30000080) Debug|rdkafka#producer-2:[thrd:app]: 64 certificate(s) successfully added from Windows Certificate Root store, 0 failed Debug|rdkafka#producer-2:[thrd:app]: sasl_ssl://0922ba93-542b-406a-aaf7-33eb63bc6a5c:9092/bootstrap: Added new broker with NodeId -1 Debug|rdkafka#producer-2:[thrd:app]: sasl_ssl://0922ba93-542b-406a-aaf7-33eb63bc6a5c:9092/bootstrap: Selected for cluster connection: bootstrap servers added (broker has 0 connection attempt(s)) Debug|rdkafka#producer-2:[thrd:app]: librdkafka v2.3.0 (0x20300ff) rdkafka#producer-2 initialized (builtin.features gzip,snappy,ssl,sasl,regex,lz4,sasl_gssapi,sasl_plain,sasl_scram,plugins,zstd,sasl_oauthbearer,http,oidc, SSL ZLIB SNAPPY ZSTD CURL SASL_SCRAM SASL_OAUTHBEARER PLUGINS HDRHISTOGRAM, debug 0xfffff) Debug|rdkafka#producer-2:[thrd:app]: Client configuration: Debug|rdkafka#producer-2:[thrd:app]: client.software.name = confluent-kafka-dotnet Debug|rdkafka#producer-2:[thrd:app]: client.software.version = 2.3.0 Debug|rdkafka#producer-2:[thrd:app]: metadata.broker.list = 0922ba93-542b-406a-aaf7-33eb63bc6a5c Debug|rdkafka#producer-2:[thrd:app]: debug = generic,broker,topic,metadata,feature,queue,msg,protocol,cgrp,security,fetch,interceptor,plugin,consumer,admin,eos,mock,assignor,conf,all Debug|rdkafka#producer-2:[thrd:app]: log_cb = 00007FFB65F13524 Debug|rdkafka#producer-2:[thrd:app]: security.protocol = sasl_ssl Debug|rdkafka#producer-2:[thrd:app]: sasl.mechanisms = OAUTHBEARER Debug|rdkafka#producer-2:[thrd:app]: oauthbearer_token_refresh_cb = 00007FFB65F13564 Debug|rdkafka#producer-2:[thrd:app]: dr_msg_cb = 00007FFB65F134E4 Debug|rdkafka#producer-2:[thrd::0/internal]: :0/internal: Enter main broker thread Debug|rdkafka#producer-2:[thrd:sasl_ssl://0922ba93-542b-406a-aaf7-33eb63bc6a5c:9092/bootstrap]: sasl_ssl://0922ba93-542b-406a-aaf7-33eb63bc6a5c:9092/bootstrap: Enter main broker thread Calling flush Debug|rdkafka#producer-2:[thrd:sasl_ssl://0922ba93-542b-406a-aaf7-33eb63bc6a5c:9092/bootstrap]: sasl_ssl://0922ba93-542b-406a-aaf7-33eb63bc6a5c:9092/bootstrap: Received CONNECT op Debug|rdkafka#producer-2:[thrd:sasl_ssl://0922ba93-542b-406a-aaf7-33eb63bc6a5c:9092/bootstrap]: sasl_ssl://0922ba93-542b-406a-aaf7-33eb63bc6a5c:9092/bootstrap: Broker changed state INIT -> TRY_CONNECT Getting token Debug|rdkafka#producer-2:[thrd:sasl_ssl://0922ba93-542b-406a-aaf7-33eb63bc6a5c:9092/bootstrap]: Broadcasting state change Calling flush in the background Background Flush done. Token Retrieved Called OAuthBearerSetTokenFailure Dispose done. ```

This shows that it's possible for one call to Flush to get blocked on the OAuth callback while another succeeds. Even though Dispose() blocks on the token's completion, the blocking call to Flush() still tries to access the closed handle after Dispose() has finished.

I have not yet been able to find a minimal repro test to show Dispose() completing before the callback handler completes, even though this happens consistently in my integration tests.

Checklist

Please provide the following information: