ruby-amqp / bunny

Bunny is a popular, easy to use, mature Ruby client for RabbitMQ
Other
1.39k stars 303 forks source link

`basic_cancel` makes other channels wait #685

Closed route closed 2 months ago

route commented 4 months ago

RabbitMQ 3.12.13 Erlang 25.3.2.12 bunny 2.22.0

Scenario - I need to cancel a few consumers from one or a few channels, I call:

channels.each { |ch| ch.basic_cancel(consumer_tag) }

It should do the job, but there's a call inside basic_cancel:

@work_pool.shutdown(true) unless self.any_consumers?

which makes it wait (if that was the last consumer) until current job finishes before going to the next channel, which makes next channel accept incoming messages again and again, which makes cancellation not as efficient as it could be. I wish there was a way to do the same thing in an async way, so that cancel is cancel @connection.send_frame(AMQ::Protocol::Basic::Cancel.encode(@id, consumer_tag, false)) and nothing more fancy.

In general I'd like to give some time to current jobs to be processed, stop accepting new jobs thus cancelling all consumers, and when current jobs are finished close connection and channels gracefully. I don't think it's currently possible with one simple call in the lib.

michaelklishin commented 2 months ago

The method does not do anything "fancy", it starts a continuation. If it blocks consumer operations then you must have very high consumer churn which is a terrible idea.

The standard way of not waiting for a response in the protocol is the nowait field that synchronous methods (the ones that have a corresponding response frame, such as basic.cancel and basic.cancel-ok) usually support.

They are extremely rarely necessary. A new option, :nowait (IIRC we use this exact name in a couple of places, namely around declarations of queues and exchanges).

However, even the Java client does not expose this option in its API. RabbitMQ's own (generated) AMQP 0-9-1 parser suggests it is supported:

is_method_synchronous(#'basic.consume'{nowait = NoWait}) -> not(NoWait);
is_method_synchronous(#'basic.consume_ok'{}) -> false;
is_method_synchronous(#'basic.cancel'{nowait = NoWait}) -> not(NoWait);
is_method_synchronous(#'basic.cancel_ok'{}) -> false;
decode_method_fields('basic.cancel', <<F0Len:8/unsigned, F0:F0Len/binary, F1Bits:8>>) ->
  rabbit_binary_parser:assert_utf8(F0),
  F1 = ((F1Bits band 1) /= 0),
  #'basic.cancel'{consumer_tag = F0, nowait = F1};

Therefore it should be possible to at least try adding support for nowait for basic.cancel but in my 14 years around RabbitMQ, this may be the first time when someone needs this option.

michaelklishin commented 2 months ago

The final argument in

AMQ::Protocol::Basic::Cancel.encode(@id, consumer_tag, false)

is the nowait flag. If set to true, RabbitMQ won't send a basic.cancel-ok response so there would be no need to wait for one.

route commented 1 month ago

Whoa it's so awesome! @michaelklishin thanks for the fix and for explanation! Love it!