coingaming / coney

Consumer server for RabbitMQ with message publishing functionality.
MIT License
42 stars 9 forks source link

Application closes connection while still processing messages #20

Open ignaciogoldchluk-yolo opened 1 month ago

ignaciogoldchluk-yolo commented 1 month ago

Current behaviour

  1. Upon receiving any shutdown message from the BEAM, ConnectionServer.terminate/2 immediately closes the connection.
  2. RabbitMQ messages are still being processed successfully by the workers, but they cannot be acked because the connection to RabbitMQ was closed.
  3. RabbitMQ assumes the messages are lost after a specific timeout without connection and redelivers them. See Automatic requeueing.
  4. After recovering connection, the messages get redelivered, if they are not idempotent they might cause errors and corrupted state.

Expected behaviour

This is one of the possible solutions:

  1. Coney stops processing new RabbitMQ messages upon receiving a shutdown message from the BEAM.
  2. Existing RabbitMQ messages are processed and acked.
  3. Once all RabbitMQ messages have been processed, Coney shuts down.

Another solution could be to leave it up to the application code (workers) to deal with redelivered messages. Increasing the heartbeat timeout is not a solution since in rolling deployment styles the connection might be reopened from another host.

Technical information

  1. The children order in ApplicationSupervisor is reversed. First ConnectionServer should start, and then ConsumerSupervisor. Since they are terminated in reverse order, first ConsumerSupervisor terminates all ConsumerServer, and finally ConnectionServer can close the connection.
  2. ConsumerSupervisor does not have to be a DynamicSupervisor since we already know ahead of time the full list of consumers. It could be a regular Supervisor.
  3. ConnectionServer can keep a map of {consumer, channel} so that ConsumerServer does not keep any connection (channel) state. That way when ConnectionServer receives a {:DOWN, _, _, _} message it only has to update its {consumer, channel} map list, and all the ConsumerServer are unaffected. ConsumerServer can be responsible only for processing the messages and creating the response, and ConnectionServer communicates with RabbitMQ.
llxff commented 3 weeks ago

Coney stops processing new RabbitMQ messages upon receiving a shutdown message from the BEAM.

I agree with this approach, it looks like a correct approach đź‘Ť

Existing RabbitMQ messages are processed and acked. Once all RabbitMQ messages have been processed, Coney shuts down.

This would be an ideal and valid solution, however I lean more towards the alternative approach and here is why:

  1. It is difficult to guarantee that all workers will complete processing the received messages before the consumer stops because you can always force kill the app/machine dies/network issues/etc.
  2. Consumers can detect when a message is redelivered, allowing the application to handle such scenarios appropriately

I believe the correct shutdown process is must have and handling idempotency issues should be the responsibility of the application, because it can handle such issues better.

Also, it could be useful to support passing additional options to the consume function, like no_ack. This should give applications more flexibility in message processing.

What do you think?

ignaciogoldchluk-yolo commented 3 weeks ago

You are correct. We ended up going with the alternative approach. The main issue was that ApplicationSupervisor started ConsumerSupervisor first with all the ConsumerServer, and then ConnectionServer. Since the processes are closed in reverse order, it was first closing ConnectionServer, which meant ConsumerServer were still processing messages and had ACK to deliver but no connection/channel to send it through, raising errors when closing an application.

It will be up to the application to handle redeliveries when using Coney