Open arunprakashn opened 2 months ago
` using Confluent.Kafka; using System.Diagnostics.Tracing; using System.Text;
Console.WriteLine("Hello, World!"); string apiKey = "WorkingKey"; string secret = "WorkingSecret"; string bootstrapUrl = "dummy.azure.confluent.cloud:9092"; var config = new ProducerConfig { BootstrapServers = bootstrapUrl, SaslMechanism = SaslMechanism.Plain, SecurityProtocol = SecurityProtocol.SaslSsl, SaslUsername = apiKey, SaslPassword = secret, RetryBackoffMaxMs = 2000, MessageTimeoutMs = 10000, }; var producerBuilder = new ProducerBuilder<string, byte[]>(config);
var producer = producerBuilder .SetErrorHandler((p, error) => { if (error.IsFatal) { Console.WriteLine($"Confluent Kafka Producer Error Handler : FATAL : Error Code: {error.Code} {error.Reason}", EventLevel.Critical); } else { Console.WriteLine($"Confluent Kafka Producer Error Handler : Error Code: {error.Code} {error.Reason}", EventLevel.LogAlways); } }) .SetLogHandler((p, logHandler) => { Console.WriteLine( $"Confluent Kafka Producer Log Handler : {logHandler.Level.ToString().ToUpper()}|{DateTime.UtcNow}|{logHandler.Facility}|{logHandler.Name}|{logHandler.Message}", EventLevel.Verbose);
})
.Build();
producer.SetSaslCredentials("hello", "hello"); for (int i = 0; i < 1; i++) //This wont work. Expected { try { var result = producer.ProduceAsync("arun.poc", new Message<string, byte[]> { Key = (string)(object)"key", Value = (byte[])(object)Encoding.UTF8.GetBytes("value") }).GetAwaiter().GetResult(); Console.WriteLine($"Message sent to Partition: {result.Partition} with Offset: {result.Offset}"); } catch (Exception ex) { Console.WriteLine($"Exception: {ex.Message}"); } }
producer.SetSaslCredentials(apiKey, secret); for (int i = 0; i < 10; i++) //This will work, expected { var result = producer.ProduceAsync("arun.poc", new Message<string, byte[]> { Key = (string)(object)"key", Value = (byte[])(object)Encoding.UTF8.GetBytes("value") }).GetAwaiter().GetResult(); Console.WriteLine($"Message sent to Partition: {result.Partition} with Offset: {result.Offset}"); }
producer.SetSaslCredentials("hello", "hello");
for (int i = 0; i < 1; i++) //This should not working as the creds are set to hello which are not right { try { var result = producer.ProduceAsync("arun.poc", new Message<string, byte[]> { Key = (string)(object)"key", Value = (byte[])(object)Encoding.UTF8.GetBytes("value") }).GetAwaiter().GetResult(); Console.WriteLine($"Message sent to Partition: {result.Partition} with Offset: {result.Offset}"); } catch (Exception ex) { Console.WriteLine($"Exception: {ex.Message}"); } }`
Once if the auth is successful and works, it wont switch again even if you call "producer.SetSaslCredentials"
Currently auth is only required when connecting to a broker. Once a connection is established, changing the credentials afterward to something incorrect doesn't cause any issue as long as the connection is intact (Connections are expected to be persisted)
Depending on where you are running the broker, you can check the broker property "connections.max.reauth.ms" which mandates reauthentication of successful connections within this duration.
Description
I am trying to use the new enhancement "SetSaslCredentials" and I think it's not working as expected
How to reproduce
Checklist
Please provide the following information: