sroze / messenger-enqueue-transport

Uses Enqueue with Symfony's Messenger component.
MIT License
191 stars 55 forks source link

Kafka: Message can not be ACKed because it gets lost in serialization. #74

Closed KonstantinCodes closed 5 years ago

KonstantinCodes commented 5 years ago

The original Kafka Message gets lost.

Here it's still present: \Enqueue\RdKafka\RdKafkaConsumer::doReceive It gets lost when returning the date here: \Enqueue\MessengerAdapter\QueueInteropTransport::get

When it comes time for ACKing the message, we run into LogicException:

The message could not be acknowledged because it does not have kafka message set.

here \Enqueue\RdKafka\RdKafkaConsumer::acknowledge

We might need an InteropMessageStamp, which adds the \Interop\Queue\Message object, for later use.

Maybe even better to use \Symfony\Component\Messenger\Stamp\TransportMessageIdStamp and set the offset as the id.