FoundatioFx / Foundatio

Pluggable foundation blocks for building distributed apps.
Apache License 2.0
1.96k stars 243 forks source link

Kafka consumer throughput. Usage of KafkaMessageBusOptions.DefaultConsumerThreadCount #265

Closed jojopeter closed 2 years ago

jojopeter commented 3 years ago

Hi, This is a query on Consumer throughput of Kafka when I use Foundatio my application.

We subscribe to the messages in following manner.

namespace : Foundatio.Messaging class : static class MessageBusExtensions Method: public static Task SubscribeAsync(this IMessageSubscriber subscriber, Func<T, Task> handler, CancellationToken cancellationToken = default) where T : class { return subscriber.SubscribeAsync((msg, token) => handler(msg), cancellationToken); }

We are using above method and the implementation is as follows

      messageBus.SubscribeAsync<message>(
       m => this.HandleMessageAsync(m)).GetAwaiter().GetResult();
      var options = new KafkaMessageBusOptions
      {
        DefaultConsumerThreadCount = 4,
        ManualCommitBatch = 10,
        ProducerConfig = new ProducerConfig
        {
          BootstrapServers = bootstrapServers,
          Acks = Acks.Leader,
          Partitioner = Partitioner.ConsistentRandom,
          SecurityProtocol = securityProtocol,
          Debug = "security"
        },
        ConsumerConfig = new ConsumerConfig
        {
          GroupId = consumerGroupId,
          BootstrapServers = bootstrapServers,
          AutoOffsetReset = AutoOffsetReset.Latest,
          EnableAutoCommit = true,
          AutoCommitIntervalMs = 5000,
          CancellationDelayMaxMs = 1000,
          SessionTimeoutMs = 100000,
          SecurityProtocol = securityProtocol,
          Debug = "security"
        }
      };
      services.AddFoundatioKafkaMessageBus(options);

We noticed the consumer throughput is very low and trying to optimize it. As part of this I changed the DefaultConsumerThreadCount from 4 to 8. But that didn't make any difference to the consumer throughput. Can you please let us know what is the intention of 'DefaultConsumerThreadCount'? Is there any other setting I can change to increate the consumer throughput?

Thanks In advance, Jojo Peter

niemyjski commented 3 years ago

We don't have a Kafka message bus implementation that we ship with. What NuGet package are you using? We also try not to do any GetAwaiter().GetResults() in an async code path.

jojopeter commented 3 years ago

Thanks for your reply. Sorry I posted in the repository. BTW, I am using 'Aix.KafkaMessageBus' package.

Thanks.

niemyjski commented 2 years ago

No problem, I'm going to close this as we don't own that code. I'd recommend posting on their project repository. If you have any feedback on the core libraries we would love to hear it.