KonstantinCodes / messenger-kafka

Simple Kafka transport for Symfony Messenger.
MIT License
87 stars 35 forks source link

When handling a message and an exception occures, message should be acknowledged? #22

Open rdotter opened 4 years ago

rdotter commented 4 years ago

Hi @KonstantinCodes we came accross the following issue, when handling a Kafka message and an exception is thrown while handling an $reciever->reject will be triggered (in Symfony\Component\Messenger\Worker::handleMessage)

public function reject(Envelope $envelope): void { // Do nothing. auto commit should be set to false! }

But if I'm corrrect when handling a 'reject' the message should also trigger the ack, so the message won't be handled again.

Shall I make a PR to change this, or do you have a reason why the reject doesn't do anything? The Enqueue implementation triggers also ACK when calling a reject.

Or does the setting enable.auto.commit: 'false' enforces the rejected message won't be processed again?

KonstantinCodes commented 4 years ago

It really depends on your implementation. In your project you're free to write a Middleware, that catches the Exception and logs it.

Just keep in mind that some systems may have a specific order of messages and if we ack failed messages by default, we remove the option to retry a failed message later.

For example:

  1. CustomerSignedUp
  2. CustomerEmailConfirmed
  3. OrderPlaced

If you have an exception at message 1, then messages 2 and 3 will cause an exception as well.

Even worse would be:

  1. AddressChanged / {"name": "Old Name"}
  2. AddressChanged / {"name": "New Name"}

If you have an DB network error exception at message 1 but no exception at message 2. And then later retry message 1, then you will have the old name in your DB

rdotter commented 4 years ago

@KonstantinCodes okay clear, but with Kafka at a topic, it is split in different partitions and normally you process from the first message and start consuming. So if you cannot ack the message and your consumer will be broken it will retry this message and start hamering... (because you have it by default running via supervisord).

It could be possible the message 2 will be consumed before message 1, because it is in a different partition with multiple consumers.

If you see for example this, you will see it always send an ack: https://github.com/php-enqueue/rdkafka/blob/f63b08d35647aab31f8731293718931e08628b57/RdKafkaConsumer.php#L144

After that you can for example send this message to a doctrine failed queue.

So maybe it would be nice to add a setting to acknowledge it when a failure occurs?

rdotter commented 4 years ago

@KonstantinCodes for now we choose to just ACK every Kafka message and dispatch the WorkerMessageFailedEvent ourselfs.. We have no related Kafka messages within a single topic.

$this->eventDispatcher->dispatch(new WorkerMessageFailedEvent(
   new Envelope($message), 'transport_name', $exception
));

But I would to mark, order is only guarenteed within a specific partition. So this could be done when producing a message, otherwise you can't expect a message will be received in the same order.

KonstantinCodes commented 4 years ago

@rdotter That's right. Maybe we should ack failed messages. I'm not sure how many people will want to use a kafka topic as an eventstore though. (single partition)

What do you think about adding a flag to the config like ackFailedMessages? It's default is true, but the user can choose to set it to false.

rdotter commented 4 years ago

@KonstantinCodes Sure that sounds like a good plan :-)