sroze / messenger-enqueue-transport

Uses Enqueue with Symfony's Messenger component.
MIT License
190 stars 53 forks source link

[ActiveMQ][STOMP] Every second message skipped #117

Open RafaelKr opened 2 years ago

RafaelKr commented 2 years ago

I have an ActiveMQ setup where messages from an external system are coming in and should be consumed with the Symfony messenger. This basically works, which means some messages are handled. The issue is it skips every second message.

This is a long issue including the findings from my debugging session. I split it into these 3 sections:

  1. Which code paths are taken
  2. Visualization of what happens on the ActiveMQ side
  3. My current workaround (I would really appreciate if someone with deeper knowledge could have a look on this and decide if this is a viable general fix)

Which code paths are taken

I found out the following is happening when starting the Symfony messenger:consume command:

  1. The symfony Messenger Worker calls the get method on the receiver. The receiver is an instance of Enqueue\MessengerAdapter\QueueInteropTransport.
  2. Inside the get method the getConsumer method is called which leads to a new Enqueue\Stomp\StompConsumer instance: https://github.com/sroze/messenger-enqueue-transport/blob/83c30ede78a0ef5cc890d1ecab7a110d3e1c2aef/QueueInteropTransport.php#L75 https://github.com/sroze/messenger-enqueue-transport/blob/83c30ede78a0ef5cc890d1ecab7a110d3e1c2aef/QueueInteropTransport.php#L276-L283
    // Enqueue\Stomp\StompContext

    // https://github.com/php-enqueue/stomp/blob/25c8155a7b7e86f57e22ab0869eb7725888e095a/StompContext.php#L178-L185
    public function createConsumer(Destination $destination): Consumer
    {
        InvalidDestinationException::assertDestinationInstanceOf($destination, StompDestination::class);

        $this->transient = false;

        return new StompConsumer($this->getStomp(), $destination);
    }
  1. On this new StompConsumer instance the receive method is called which itself calls the subscribe method. When $this->stomp->sendFrame($frame); inside the subscribe method is called it first will run a CONNECT command and a SUBSCRIBE command afterwards. After the SUBSCRIBE I can see a new consumer client inside ActiveMQ.
  2. Now it will receive a message and handle it. After handling it the Symfony Worker runs $receiver->ack(). This creates a NEW StompConsumer instance. https://github.com/sroze/messenger-enqueue-transport/blob/83c30ede78a0ef5cc890d1ecab7a110d3e1c2aef/QueueInteropTransport.php#L106-L111 On this new StompConsumer instance the acknowledge method is called. It will send an ACK command to ActiveMQ.
  3. Steps 1-6 are repeated. The only difference is that the CONNECT in Step 5 is not sent anymore, only the first time.

Visualization of what happens on the ActiveMQ side

  1. Initial State. We have 9 messages and 0 consumers. image
  2. CONNECT and SUBSCRIBE commands are sent. After SUBSCRIBE we see a new consumer in ActiveMQ. Note that the consumer already has one enqueued message. image
  3. This first message is handled, afterwards an ACK command is sent for it. This removes the first message from the queue. image Also the consumer now shows that two messages are enqueued and only one message is dequeued. So ActiveMQ sees the second message was received but not acknowledged (it's still visible in the queue). image
  4. SUBSCRIBE command is sent. After SUBSCRIBE we again see a new consumer in ActiveMQ. Note the previous consumer still has two enqueues but only one dequeue. image
  5. ACK command is sent. Afterwards we see the third message was acknowledged. The second message was never handled. image The consumer overview now also shows two enqueued and one dequeued message for the new consumer. image
  6. Steps 5 and 6 repeat until we arrive at the following final state image image
  7. The Message worker will continue creating new consumers but never receive any further messages. Also if I create new messages on the queue they aren't received. image

As you can see every even message was acknowledged and every odd message is skipped. This leads to my assumption that ActiveMQ already "pushes" the next message on every ACK but due to always creating a new StompConsumer it's never handled and "forgotten" in an StompConsumer instance which was only created for an ACK command.

Also when exiting the Symfony message:consume command the consumers are still shown in the ActiveMQ consumer overview. When restarting the message:consume command it also won't receive any messages because they are still reserved for the old consumers. The ActiveMQ-Server is running on a remote machine. Disconnecting the network of my local machine removes the consumers at ActiveMQ and when now restarting the message:consume command it will once start again consuming every second message.

Edit: Maybe this is also something that should be looked into. The StompClient probably should disconnect when messenger:consume is exited.


Workaround

My current workaround is to cache the StompConsumer instance in the QueueInteropTransport instance.

class WorkaroundQueueInteropTransport implements TransportInterface
{
    // [...]
    private $consumer;

    // [...]

    private function getConsumer(): Consumer
    {
        if ($this->consumer) {
            return $this->consumer;
        }

        $context = $this->contextManager->context();
        $destination = $this->getDestination(null);
        $queue = $context->createQueue($destination['queue']);

        $this->consumer = $context->createConsumer($queue);

        return $this->consumer;
    }
}

I'm currently not 100% sure if this is a viable general fix because I don't know if we need to create a NEW consumer on every getConsumer call. The only side effect of creating a new consumer I could find is that it's setting $this->transient = false; on the StompContext. I don't have enough knowledge of the Symfony messenger and this project to know if having a cached consumer instance breaks something else. Currently it seems to work for me. I would really appreciate if someone with deeper knowledge could have a look on all of this. @keulinho and @sroze you implemented the getConsumer method in the first place. Can you tell if this is a viable fix?

ibenkhelil commented 1 year ago

hello, any news regarding this issue?