sroze / messenger-enqueue-transport

Uses Enqueue with Symfony's Messenger component.
MIT License
191 stars 53 forks source link

[question] How to consume messages from Cloud Pub Sub? #26

Open ErikSaunier opened 5 years ago

ErikSaunier commented 5 years ago

My message/command CreateStore is properly send to Cloud Pub Sub like this :

use App\Message\Command\Store\CreateStore;
use Enqueue\MessengerAdapter\EnvelopeItem\TransportConfiguration;

$command = new CreateStore();
$this->commandBus->dispatch((new Envelope($command))->with(new TransportConfiguration(
    ['topic' => 'enqueue.commands']
)));

All message/command are in my Cloud Pub Sub Queue, I can see them (gcloud pubsub subscriptions pull enqueue.commands) and acknowledge them manual via the gcloud Command-Line Tool.

Now I trying to consume my message by running bin/console messenger:consume-messages enqueue. The consumer run but nothing happening.

What I'm missing to consume my message?

Here are my config files :

service.yaml

framework:
    messenger:
        transports:
            default: 'amqp://guest:guest@localhost:5672/%2f/messages'
            enqueue: 'enqueue://gps'
        default_bus: messenger.bus.commands
        buses:
            messenger.bus.commands: ~
            messenger.bus.events: ~
        routing:
            # Route your messages to the transports
            'App\Message\Command\Store\CreateStore': enqueue

enqueue.yaml

enqueue:
    transport:
        default: 'gps'
        gps:
            projectId: '%env(GOOGLE_PROJECT_ID)%'
            keyFilePath: '%env(GOOGLE_APPLICATION_CREDENTIALS)%'
makasim commented 5 years ago

This might be related https://github.com/php-enqueue/enqueue-dev/issues/236

As I recall GooglePubSub needs a big enough connection time (other words timeout) to properly schedule messages among consumers. If a consumer connects for a short period of time and then disconnects shortly after a broker does not able to prepare messages for it.

makasim commented 5 years ago

You could please try to use enqueue:transport:consume command with a https://github.com/php-enqueue/messenger-adapter/blob/master/MessageBusProcessor.php

and a big --receive-timeout=300000

ErikSaunier commented 5 years ago

By running bin/console messenger:consume-messages enqueue I found that Cloud Pub/Sub was returning a lot of 404 without showing any error message.

Indeed the called url was: https://pubsub.googleapis.com/v1/projects/{project_id}/subscriptions/messages:pull instead of: https://pubsub.googleapis.com/v1/projects/{project_id}/subscriptions/enqueue.commands:pull

I will search how to pass my Cloud Pub/Sub subscription name to make enqueue/messenger-adapter working properly when consuming message from Cloud Pub/Sub as I think it should be possible. And if it's not possible yet, I will try to make a PR.


However, I just tried your solution and make a Processor class like this:

<?php

namespace App\Message\CommandProcessor;

use Enqueue\Consumption\QueueSubscriberInterface;
use Interop\Queue\PsrMessage;
use Interop\Queue\PsrContext;
use Interop\Queue\PsrProcessor;

class GpsCommandProcessor implements PsrProcessor, QueueSubscriberInterface
{
    public function process(PsrMessage $message, PsrContext $session)
    {
        dump($message->getBody());
        dump($message->getHeader('type'));

        return self::ACK;
    }

    public static function getSubscribedQueues()
    {
        return [
            'enqueue.commands'
        ];
    }
}
#services.yaml
services:
    app.gps_command_processor:
        public: true
        class: 'App\Message\CommandProcessor\GpsCommandProcessor'
        tags:
            - { name: 'enqueue.client.processor' }
#enqueue.yaml
enqueue:
    transport:
        default: 'gps'
        gps:
            projectId: '%env(GOOGLE_PROJECT_ID)%'
            keyFilePath: '%env(GOOGLE_APPLICATION_CREDENTIALS)%'
    consumption:
        receive_timeout: 300000

And run the consumer with bin/console enqueue:transport:consume app.gps_command_processor.

This solution work perfectly, thank you @makasim.

makasim commented 5 years ago

@ErikSaunier is there any news?

ErikSaunier commented 5 years ago

I did not have the time to check how the command bin/console messenger:consume-messages enqueue could use a custom pull url replacing https://pubsub.googleapis.com/v1/projects/{project_id}/subscriptions/messages:pull with https://pubsub.googleapis.com/v1/projects/{project_id}/subscriptions/{subscription_name}:pull . I'm sorry @makasim. I would like to have time to investigate but I can't right now.

SevenOfSpades commented 5 years ago

Declaring DSN in Symfony Messenger config as enqueue://gps?queue[name]=subscription-name&topic[name]=topic-name&receiveTimeout=300000 allowed me to consume messages from GPS using messenger:consume. Rest of configuration is exactly as in first post.

Gyalomalom commented 11 months ago

In the current version, how to set configure Enqueue to use GPS as the transport? The config provided in this thread is not working anymore. The documentation doesn't specify how to configure PubSub either.