sroze / messenger-enqueue-transport

Uses Enqueue with Symfony's Messenger component.
MIT License
191 stars 55 forks source link

Messages sent twice using Messenger component when using multiple routes #54

Closed Steveb-p closed 5 years ago

Steveb-p commented 5 years ago

Initially thought it's an issue with Messenger itself.

I'm using enqueue with Kafka and messenger adapter to send messages to external services. Here's my messenger.yaml:

framework:
  messenger:
    transports:
    # Uncomment the following line to enable a transport named "amqp"
      amqp: '%env(MESSENGER_TRANSPORT_DSN)%'
      view_events: '%env(MESSENGER_TRANSPORT_DSN)%?topic[name]=view_events&queue[name]=view_events'

    routing:
      # Route your messages to the transports
      'App\Message\DisplayNotification':
        senders: [ view_events ]
        send_and_handle: true
      '*':
        senders: [ amqp ]
        send_and_handle: true

Notice: send_and_handle are only for debugging atm.

Related packages (and Symfony 4.2):

enqueue/amqp-tools                0.9.4    Message Queue Amqp Tools
enqueue/dsn                       0.9.2    Parse DSN
enqueue/enqueue                   0.9.6    Message Queue Library
enqueue/enqueue-bundle            0.9.3    Message Queue Bundle
enqueue/messenger-adapter         0.2.0    Enqueue adapter for Symfony Messenger component
enqueue/null                      0.9.2    Enqueue Null transport
enqueue/rdkafka                   0.9.2    Message Queue Kafka Transport

And enqueue.yaml:

enqueue:
    default:
        transport:
            dsn: "kafka:"
            global:
                metadata.broker.list: '%env(KAFKA_SERVERS)%'
            topic:
                auto.offset.reset: beginning
            commit_async: true
        client: ~

I'm emitting messages using:

public function addEvent(Request $request, string $hash, $selectedEntry): void
{
    $event = DisplayNotification::createFromRequest($request, $hash, $selectedEntry);
    $this->bus->dispatch($event);
}

This causes messages to be received twice. Once for amqp and once for view_events transports. When looking for the reason why this is happening I added send_and_handle settings and realized:

  1. Second message wraps the first one.
  2. I cannot use normal handlers, because Envelope returned from SendMessageMiddleware contains RdKafkaMessage instead of expected DisplayNotification.

Looking through SenderInterface I noticed that it's implementation QueueInteropTransport expects to receive and return an Envelope. However, envelope returned is a new instance of Envelope wrapping previous Envelope as message. I believe it should be the previous instance instead?

send method exerpt below:

public function send(Envelope $message): Envelope
{
   ...
   $encodedMessage = $this->serializer->encode($message);

   $originalMessage = $message;
   $message = $context->createMessage(
   ...

   return $message; // Should be $originalMessage?

When $originalMessage is returned, Messenger component behaves as expected: send message only once and handlers receive expected objects.

I'll create pull request in a moment, but do you see any issues with this change? Will it break anything in enqueue itself?

Steveb-p commented 5 years ago

Looks like it's fixed on master. But not yet released (db82dd6dfb9e09dd57cee6c81895b190ad5d3409).

Steveb-p commented 5 years ago

Actually, messages is also sent to another route when '*' is matched and route has a different sender. I don't know if it should be considered a Messenger bug, what do you guys think?

weaverryan commented 5 years ago

Is it better now that the fix has been released? Not sure about the * routing part :)

Steveb-p commented 5 years ago

@weaverryan Our testing when using 0.9 shown that the issue is no more :)