pardahlman / RawRabbit

A modern .NET framework for communication over RabbitMq
MIT License
746 stars 144 forks source link

Graceful shutdown in RawRabbit #361

Closed Hexraycode closed 6 years ago

Hexraycode commented 6 years ago

Hello. I'm using RawRabbit to consume messages from the queue and handle them. Initialization section:

_client = RawRabbitFactory.CreateSingleton(new RawRabbitOptions
{
     ClientConfiguration = config
});

So I'm thinking of graceful shutdown. I need to stop consuming messages when IApplicationLifetime.ApplicationStopped (CancellationToken from HostedService) is called. I just want to be sure that after this event (ApplicationStopped) no message will be consumed. How can I handle this?

Below I provided a sample code. Is it okay to pass cancellationToken this way? Or it leads to another behavior? If it leads to another behavior, what approach you'd recommend for my requirements?

Consuming section:

public async Task ReceiveAsync<T>(Func<T, Task> handler, **CancellationToken cancellationToken** <======= HERE #1)
{
////
await _client.SubscribeAsync<T>(async msg => {
                    var body = JsonConvert.SerializeObject(msg);
                    var type = typeof(T).Name;
                    try {
                        await handler.Invoke(msg);
                        return (Acknowledgement) new Ack();
                    }
                    catch (Exception ex) {
                        return (Acknowledgement) new Reject();
                    }
                }, ctx => ctx.UseSubscribeConfiguration(cfg => {
                    cfg.Consume(c => {
                        var builder = c.FromQueue(topology.QueueName)
                            .OnExchange(topology.ExchnangeName)
                            .WithRoutingKey(topology.RoutingKey);

                        if (!string.IsNullOrWhiteSpace(topology.ConsumerTag)) {
                            builder.WithConsumerTag(topology.ConsumerTag);
                        }
                    });
                }), **ct: cancellationToken**); <======= HERE #2
}

UPD: It also would be great if the system can correctly finish handling messages which have already been consumed before ApplicationStopped is called. So I'm not sure that disposing of the factory that you recommended here https://github.com/pardahlman/RawRabbit/issues/267 is a great solution.

Hexraycode commented 6 years ago

UPD: Yes, it's enough to stop consuming new messages. Probably, you can use CancellationToken this way to provide graceful shutdown for your consumers.