[QUESTION] Can't consume messages from third-party GPS publisher #130

Open Axel29 opened 1 month ago

Axel29 commented 1 month ago

Hi, I'm trying to setup Google Pub/Sub (GPS) with Shopify and Symfony 6.4, but even though messages seem to be published on GPS (can see them in the Google Cloud Console), no matter which configuration I try, it doesn't seem to consume the messages.

I've installed the following packages:

  1. sroze/messenger-enqueue-transport (version 0.8.0)
  2. enqueue/gps (version 0.10.19)

Here's my configuration files:

    default_bus: default

    failure_transport: failed
        dsn: "%env(MESSENGER_TRANSPORT_DSN)%failed"
          max_retries: 4
          delay: 1000
          check_delayed_interval: 0

        dsn: "enqueue://default"

      App\Message\ShopifyWebhookMessage: shopify_webhook_queue
        transport: '%env(ENQUEUE_DSN)%'
        client: ~
            receive_timeout: 300000

Then I run the command php bin/console messenger:consume shopify_webhook_queue -vvv and nothing happens.

Additional information

  1. If I try to run the command php bin/console enqueue:consume --setup-broker -vvv, I get the following error:

    cURL error 7: Failed to connect to localhost port 8900 after 0 ms: Couldn't connect to server (see for http://localhost:8900/v1/projects/vtwonen/topics/enqu  

    ... but I'm trying to connect to an actual Google Pub/Sub project for my tests, not the emulator

  2. I've also tried to create a "Processor" class like bellow but didn't seem to change anything:


namespace App\Queue;

use Enqueue\Client\TopicSubscriberInterface; use Interop\Queue\Context; use Interop\Queue\Message; use Interop\Queue\Processor; use Psr\Log\LoggerInterface;

class ShopifyWebhookProcessor implements Processor, TopicSubscriberInterface { public function __construct( protected LoggerInterface $logger, ) { }

public function process(Message $message, Context $context)
    $this->logger->critical(__METHOD__ . '(' . __LINE__ . ')');

public static function getSubscribedTopics()
    return ['enqueue.topics']; // Also tried to set the topic name from Google Cloud Console but didn't change anything.


3. I have a doubt regarding the subscription name to use, I guess this is the one auto generated by Google with the topic name suffixed with `-sub`, or is it the same as the topic name ? For now, I've been using the name shown in the Google Cloud Console.  


Thank you for your help.
Axel29 commented 2 weeks ago

Hi again,

I've managed to make the Consumer work but faced mulitple issues.

The first one is that the body is not a serialized string but a JSON this time, so I had to create a custom Serializer in order to consume the messages. The second one is that for some reason, I don't see any headers in the envelope, but they seem to be sent by Shopify in the "nativeMessage" -> "messages" -> "attributes" entry of the "$interopMessage variable (\Enqueue\MessengerAdapter\QueueInteropTransport::get). Thus, there's no way to properly dispatch my messages as I don't have any header (and Enqueue throws errors because of that).

Does anyone have any idea on what I should do next to have the headers properly interpreted please?

Here are my codes working so far:

    default_bus: default
        dsn: 'enqueue://gps?queue[name]=%env(resolve:PUBSUB_SUBSCRIPTION_NAME)%&topic[name]=%env(resolve:PUBSUB_TOPIC_NAME)%'
        serializer: App\Serializer\MessengerJsonSerializer
      App\Message\Shopify\PubSubMessage: pubsub_queue

namespace App\Message\Shopify;

class PubSubMessage
    private array $payload;

    public function __construct(array $payload)
        $this->payload = $payload;

    public function getPayload(): array
        return $this->payload;

namespace App\MessageHandler\Shopify;

use App\Message\Shopify\PubSubMessage;
use Symfony\Component\Messenger\Attribute\AsMessageHandler;

final class PubSubMessageHandler
    public function __invoke(PubSubMessage $message): void
        $payload = $message->getPayload();

namespace App\Serializer;

use App\Message\Shopify\PubSubMessage;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Exception\MessageDecodingFailedException;
use Symfony\Component\Messenger\Stamp\SerializedMessageStamp;
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
use Symfony\Component\Serializer\SerializerInterface as SymfonySerializerInterface;
use function json_validate;

class MessengerJsonSerializer implements SerializerInterface
    private SymfonySerializerInterface $serializer;
    private string                     $format;

    public function __construct(SymfonySerializerInterface $serializer, string $format = 'json')
        $this->serializer = $serializer;
        $this->format     = $format;

    public function decode(array $encodedEnvelope): Envelope
        if (empty($encodedEnvelope['body'])) {
            throw new MessageDecodingFailedException('Encoded envelope should have at least a "body",  or maybe you should implement your own serializer.');

        $body = $encodedEnvelope['body'];

        if (!json_validate($body)) {
            throw new MessageDecodingFailedException('Invalid JSON data: ' . json_last_error_msg());

        $message = new PubSubMessage(json_decode($body, true));

        return new Envelope($message, []);

    public function encode(Envelope $envelope): array
        $message = $envelope->getMessage();
        $headers = ['type' => get_class($message)];

        $body = $this->serializer->serialize($message, $this->format);

        return [
            'body'    => $body,
            'headers' => $headers,

Thanks for your help.