dotnetcore / CAP

Distributed transaction solution in micro-service base on eventually consistency, also an eventbus with Outbox pattern
http://cap.dotnetcore.xyz
MIT License
6.61k stars 1.28k forks source link

Why not pull consumer for NATS? #1573

Open geekox86 opened 1 month ago

geekox86 commented 1 month ago

Greetings,

Out of curiosity, why you did not use a Pull Consumer for NATS since it has better flow control, failure handling, and are more efficient for horizontal scalability?

Regards,

Mohannad

yang-xiaodong commented 1 month ago

Pull Consumer not support queue group

geekox86 commented 1 month ago

Hi. AFAIK, pull consumers support load balancing over multiple clients consuming from the same consumer; essentially, giving the same Competing Consumers pattern by queue group.

I also asked in the NATS .NET (v2) client library repo, and one of the maintainers mentioned that all their consumers are pull-based now by default.

yang-xiaodong commented 1 month ago

Hi @geekox86 ,

Do you know how to achieve the equivalent behavior of the line of code js.PushSubscribeAsync(subject, groupName, SubscriptionMessageHandler, false, pso); using js.PullSubscribeAsync?

I couldn't find a queue parameter to pass in the PullSubscribeAsync method.

geekox86 commented 1 month ago

The new API of NATS clients including .NET client go like this:

await using var nats = new NatsConnection();
var js = new NatsJSContext(nats);

// Assuming the pull consumer was created previously like this:
//     var consumer = await js.CreateOrUpdateConsumerAsync(stream: "orders", new ConsumerConfig
//     {
//         Name = "order_processor",
//         DurableName = "order_processor",
//     });
// Running this code by multiple processors will automatically make them competing consumers by NATS.
// I got this information fom https://www.youtube.com/watch?v=_CN1OO7yN0I&list=PLgqCaaYodvKZ0JDTEOryCoJDeLVNvMWpj

var consumer = await js.GetConsumerAsync(stream: "orders", consumer: "order_processor");

await foreach (var msg in consumer.ConsumeAsync<Order>(serializer: orderSerializer).WithCancellation(cancellationToken))
{
    // Process message here

    await msg.AckAsync();
}

Would you consider pull-based consumers?

geekox86 commented 3 weeks ago

@yang-xiaodong Hi again, just checking of what do you think dear?

yang-xiaodong commented 3 weeks ago

@geekox86 This requires migrating the NATS client library to v2 first, would you be willing to submit a PR?