zarusz / SlimMessageBus

Lightweight message bus interface for .NET (pub/sub and request-response) with transport plugins for popular message brokers.
Apache License 2.0
475 stars 78 forks source link

[Host.Kafka] Support for High-Throughput Publishing to Kafka (TODO Resolution) #331

Open MrGeorge2 opened 5 days ago

MrGeorge2 commented 5 days ago

Hello,

I’m interested in using SlimMessageBus for a high-throughput application where I need to publish data to Kafka in near real-time. I noticed this TODO in the KafkaMessageBus code that would need to be addressed for my use case. Do you have any plans to resolve this TODO soon?

Thank you!

zarusz commented 5 days ago

Hello,

That could be added as an option, and a relatively easy change/feature. The only caveat here is that the Publish would not get any feedback if the broker would not be reachable (resulting in a silent exception/error).

Would that be the only thing you'd need to make it high throughput?

There is also an idea to allow for batch message publish. Essentialy that would work like that

await bus.Publish<IEnumerable<MyMessage>>(messages)

It could also be combined withe the no await Produce, but something I could consider adding to the v3 release.

Let me know.

MrGeorge2 commented 5 days ago

Thank you for your prompt response.

Yes, this is the only requirement I have for high throughput. In my case, producing messages without awaiting would be a better fit than publishing a batch of messages, even at the cost of potentially losing some. However, I can see how publishing a collection of messages could also be an interesting option.

zarusz commented 5 days ago

Let me look into this.

zarusz commented 5 days ago

@MrGeorge2 I have a PR #332 that should address the adding of the extra setting and not awaited produce mode.

For example, you can disable at the individual producer level or for the entire bus:

    mbb.Produce<PingMessage>(x =>
    {
        x.DefaultTopic(topic);
        // Partition #0 for even counters
        // Partition #1 for odd counters
        x.PartitionProvider((m, t) => m.Counter % 2);
        x.EnableProduceAwait(false); // <----------------
    });
    // doc:fragment:ExampleCheckpointConfig
    mbb.Consume<PingMessage>(x =>
    {
        x.Topic(topic)
            .WithConsumer<PingConsumer>()
            .KafkaGroup("subscriber")
            .Instances(2)
            .CheckpointEvery(1000)
            .CheckpointAfter(TimeSpan.FromSeconds(600));
    });

    // or
    //mbb.EnableProduceAwait(enableProduceAwait);    // <----------------

Let me know what you think about this API/setting approach. The .EnableXxxx() is consistent with other transports. Await is enabled by default as its the most safe setting and backward compatible.

I can release an RC prerelease version if you want to try this out.

MrGeorge2 commented 5 days ago

This seems great. I’d appreciate an RC prerelease version. Thank you very much.

zarusz commented 3 days ago

@MrGeorge2 here is the pre-release https://www.nuget.org/packages/SlimMessageBus.Host.Kafka/2.6.0-rc1 There is an open PR #332. Here is the relevant docs section to consult.

Please provide me your feedback, in case we need to incorporate more into that feature.

Enjoy!