php-enqueue / enqueue-dev

Message Queue, Job Queue, Broadcasting, WebSockets packages for PHP, Symfony, Laravel, Magento. DEVELOPMENT REPOSITORY - provided by Forma-Pro
https://enqueue.forma-pro.com/
MIT License
2.17k stars 432 forks source link

GPS consumer not working correctly #236

Closed bwegrzyn closed 6 years ago

bwegrzyn commented 7 years ago

I've started using the Google Pub/Sub queue integration now, and it doesn't seem to work quite right.

It would seem that the message gets sent to a destination topic that is not subscribed to in the consumer. I added some extra logging below.

The weird thing is that there are times where it picks up the message, but a long time later (like 10-15mins). This seems to occur randomly. Most of the time I can start the consumer, and send a message with the producer and it will never arrive.

This is my config.yml file.

enqueue:
  transport:
    default:
      alias: gps
    redis: 
      host: redis
      vendor: predis
    gps:
      projectId: ...
      keyFilePath: ...
  client: 
    traceable_producer: true

And the produce/consume commands with extra logging I added.

$ ./bin/console enqueue:produce testtopic testmessage -vvv
02:36:35 DEBUG     [event] Notified event "console.command" to listener "Symfony\Component\HttpKernel\EventListener\DebugHandlersListener::configure". ["event" => "console.command","listener" => "Symfony\Component\HttpKernel\EventListener\DebugHandlersListener::configure"]
02:36:35 DEBUG     [event] Notified event "console.command" to listener "Symfony\Component\HttpKernel\EventListener\DumpListener::configure". ["event" => "console.command","listener" => "Symfony\Component\HttpKernel\EventListener\DumpListener::configure"]
02:36:35 DEBUG     [event] Notified event "console.command" to listener "Symfony\Bridge\Monolog\Handler\ConsoleHandler::onCommand". ["event" => "console.command","listener" => "Symfony\Bridge\Monolog\Handler\ConsoleHandler::onCommand"]
02:36:35 DEBUG     [event] Notified event "console.command" to listener "Symfony\Bridge\Monolog\Handler\ConsoleHandler::onCommand". ["event" => "console.command","listener" => "Symfony\Bridge\Monolog\Handler\ConsoleHandler::onCommand"]
/app/vendor/enqueue/gps/GpsProducer.php:37:
string(35) "Sending message to: enqueue.default" <-- value of `$destination->getTopicName()`
Message is sent
$ ./bin/console enqueue:consume --setup-broker -vvv
02:40:31 DEBUG     [event] Notified event "console.command" to listener "Symfony\Component\HttpKernel\EventListener\DebugHandlersListener::configure". ["event" => "console.command","listener" => "Symfony\Component\HttpKernel\EventListener\DebugHandlersListener::configure"]
02:40:31 DEBUG     [event] Notified event "console.command" to listener "Symfony\Component\HttpKernel\EventListener\DumpListener::configure". ["event" => "console.command","listener" => "Symfony\Component\HttpKernel\EventListener\DumpListener::configure"]
02:40:31 DEBUG     [event] Notified event "console.command" to listener "Symfony\Bridge\Monolog\Handler\ConsoleHandler::onCommand". ["event" => "console.command","listener" => "Symfony\Bridge\Monolog\Handler\ConsoleHandler::onCommand"]
02:40:31 DEBUG     [event] Notified event "console.command" to listener "Symfony\Bridge\Monolog\Handler\ConsoleHandler::onCommand". ["event" => "console.command","listener" => "Symfony\Bridge\Monolog\Handler\ConsoleHandler::onCommand"]
[debug] Set context's logger Symfony\Component\Console\Logger\ConsoleLogger
[debug] [GpsDriver] Subscribe router topic to queue: enqueue.default -> enqueue.app.default
[debug] [GpsDriver] Subscribe processor topic to queue: enqueue.app.default -> enqueue.app.default
[info] Start consuming
/app/vendor/google/cloud-pubsub/Subscription.php:407:
string(92) "Pulling from GPS subscription: projects/.../subscriptions/enqueue.app.default"
/app/vendor/google/cloud-pubsub/Subscription.php:407:
string(92) "Pulling from GPS subscription: projects/.../subscriptions/enqueue.app.default"
/app/vendor/google/cloud-pubsub/Subscription.php:407:
string(92) "Pulling from GPS subscription: projects/.../subscriptions/enqueue.app.default"

So it seems like it sends to enqueue.default and pulls from enqueue.app.default.

After I switched to using Redis instead of GPS, everything worked as expected. I started a consumer, send a message, and the consumer picked it up a split second later.

ASKozienko commented 7 years ago

Hi, I think it happens only because default receive timeout is too low for this transport. Try to launch consumer with option --receive-timeout=10000. Dont think it is gps or enqueue issue its just how gps works.

bwegrzyn commented 7 years ago

@ASKozienko I was actually running it with a timeout of 5000 since the default (100 I think?) was causing Google to reject connections. Starting the consumer and then trying to send a message with enqueue:produce command would error out saying connection was refused.

I just tried running it with a timeout of 10000 but the results are the same.

Switching to redis from gps with all other settings the same works just fine.

bwegrzyn commented 7 years ago

Another thing I noticed is that the consume command pulls from enqueue.app.default but it has no subscriber listed in the GPS admin panel. If it has no subscriber, it will never get any messages? Only enqueue.default has a subscriber.

makasim commented 7 years ago

Could you post an output of debug:enqueue:queues command.

bwegrzyn commented 7 years ago
$ ./bin/console debug:enqueue:queues
Found 1 destinations

+-------------+---------------------+-----------------------------------------+
| Client Name | Transport Name      | processors                              |
+-------------+---------------------+-----------------------------------------+
| default     | enqueue.app.default | enqueue.client.router_processor         |
|             |                     | AppBundle\Queue\Processor\TestProcessor |
+-------------+---------------------+-----------------------------------------+
makasim commented 7 years ago

How do exactly send a message (you mentioned you had used enqueue:produce cmd)?

bwegrzyn commented 7 years ago

Yes, the command is in my original post along with output.

$ ./bin/console enqueue:produce testtopic testmessage -vvv
makasim commented 7 years ago

looks good. Finally, how does TestProcessor subscription look like?

bwegrzyn commented 7 years ago
class TestProcessor implements PsrProcessor, TopicSubscriberInterface
{

    public function process(PsrMessage $message, PsrContext $session)
    {
        echo $message->getBody();

        return self::ACK;
    }

    public static function getSubscribedTopics()
    {
        return ['testtopic'];
    }

}
services:

  _defaults:
    autowire: true
    autoconfigure: true
    public: true

  AppBundle\Queue\Processor\:
    resource: '../../../Queue/Processor/*'
    tags:
      - { name: 'enqueue.client.processor' }
ASKozienko commented 7 years ago

Lets switch router to separate queue:

enqueue:
  client:
    router_topic: router
    router_queue: router

remove previous topics/subcsriptions in the GPS admin panel, a run again setup-broker

bwegrzyn commented 7 years ago

That seems to work more reliably, but its really slow. With timeout set to 10000, it takes anywhere from 1 to 2 minutes (sometimes longer) for the TestProcessor class to actually get the message.

First I see it get a message from enqueue.app.router maybe 1 minute after it was sent, and then a minute later it gets the message from enqueue.app.default and finally processes it.

Test 1 -> router gets message after 70s -> processor gets message after 165s Test 2 -> routers get message after 95s -> processor get message after 180s

ASKozienko commented 7 years ago

you can do nothing with that. Gps requires long polling. Only one queue per consumer. So you can run 2 consumers, one for router and second one for processors. Or I hope you can set router_topic something like 'app.default' so as result you get same topic/subscription names for both router and processors

bwegrzyn commented 7 years ago

I realize that the messages won't be instant because it requires polling, but shouldn't the messages get picked up on the next poll? I don't think that it should take upwards of 2 minutes to process a message.

See the logs below with timestamps.

Message 1 took 24s to get to router and then another 58s to get to the processor. Message 2 took 68s to get to router and then another 86s to get to the processor.

It only seems to get worse after every message. Message 3 and onwards would take ever longer.

$ ./bin/console enqueue:produce topic testmessage1 -vv
Message is sent
2017-10-18T16:15:43+00:00
$ ./bin/console enqueue:produce topic testmessage2 -vv
Message is sent
2017-10-18T16:17:41+00:00
$ ./bin/console enqueue:consume --setup-broker -vvv --receive-timeout=10000
[debug] Set context's logger Symfony\Component\Console\Logger\ConsoleLogger
[debug] [GpsDriver] Subscribe router topic to queue: enqueue.router -> enqueue.app.router
[debug] [GpsDriver] Subscribe processor topic to queue: enqueue.app.router -> enqueue.app.router
[debug] [GpsDriver] Subscribe processor topic to queue: enqueue.app.default -> enqueue.app.default
[info] Start consuming
2017-10-18T16:15:23+00:00 - Starting pull with timeout 10000
Pulling from projects/.../subscriptions/enqueue.app.router
2017-10-18T16:15:33+00:00 - Timeout reached
2017-10-18T16:15:33+00:00 - Starting pull with timeout 10000
Pulling from projects/.../subscriptions/enqueue.app.default
2017-10-18T16:15:43+00:00 - Timeout reached
2017-10-18T16:15:43+00:00 - Starting pull with timeout 10000
Pulling from projects/.../subscriptions/enqueue.app.router
2017-10-18T16:15:53+00:00 - Timeout reached
2017-10-18T16:15:53+00:00 - Starting pull with timeout 10000
Pulling from projects/.../subscriptions/enqueue.app.default
2017-10-18T16:16:04+00:00 - Timeout reached
2017-10-18T16:16:04+00:00 - Starting pull with timeout 10000
Pulling from projects/.../subscriptions/enqueue.app.router
2017-10-18T16:16:14+00:00 - Timeout reached
2017-10-18T16:16:14+00:00 - Starting pull with timeout 10000
Pulling from projects/.../subscriptions/enqueue.app.default
2017-10-18T16:16:24+00:00 - Timeout reached
2017-10-18T16:16:24+00:00 - Starting pull with timeout 10000
Pulling from projects/.../subscriptions/enqueue.app.router
[info] Message received from the queue: enqueue.app.router
[debug] Headers: array (
  'message_id' => 'bf6c0682-7787-4a3e-9ba6-24ee9d72c56f',
  'timestamp' => 1508343342,
  'reply_to' => NULL,
  'correlation_id' => NULL,
)
[debug] Properties: array (
  'enqueue.topic_name' => 'topic',
)
[debug] Payload: 'testmessage1'
[info] Message processed: enqueue.ack
2017-10-18T16:16:25+00:00 - Starting pull with timeout 10000
Pulling from projects/.../subscriptions/enqueue.app.default
2017-10-18T16:16:35+00:00 - Timeout reached
2017-10-18T16:16:35+00:00 - Starting pull with timeout 10000
Pulling from projects/.../subscriptions/enqueue.app.router
2017-10-18T16:16:45+00:00 - Timeout reached
2017-10-18T16:16:45+00:00 - Starting pull with timeout 10000
Pulling from projects/.../subscriptions/enqueue.app.default
2017-10-18T16:16:56+00:00 - Timeout reached
2017-10-18T16:16:56+00:00 - Starting pull with timeout 10000
Pulling from projects/.../subscriptions/enqueue.app.router
2017-10-18T16:17:06+00:00 - Timeout reached
2017-10-18T16:17:06+00:00 - Starting pull with timeout 10000
Pulling from projects/.../subscriptions/enqueue.app.default
2017-10-18T16:17:16+00:00 - Timeout reached
2017-10-18T16:17:16+00:00 - Starting pull with timeout 10000
Pulling from projects/.../subscriptions/enqueue.app.router
2017-10-18T16:17:26+00:00 - Timeout reached
2017-10-18T16:17:26+00:00 - Starting pull with timeout 10000
Pulling from projects/.../subscriptions/enqueue.app.default
[info] Message received from the queue: enqueue.app.default
[debug] Headers: array (
  'message_id' => 'bf6c0682-7787-4a3e-9ba6-24ee9d72c56f',
  'timestamp' => 1508343342,
  'reply_to' => NULL,
  'correlation_id' => NULL,
)
[debug] Properties: array (
  'enqueue.topic_name' => 'topic',
  'enqueue.processor_name' => 'AppBundle\\Queue\\Processor\\TestProcessor',
  'enqueue.processor_queue_name' => 'default',
)
[debug] Payload: 'testmessage1'
testmessage1[info] Message processed: enqueue.ack
2017-10-18T16:17:29+00:00 - Starting pull with timeout 10000
Pulling from projects/.../subscriptions/enqueue.app.router
2017-10-18T16:17:39+00:00 - Timeout reached
2017-10-18T16:17:39+00:00 - Starting pull with timeout 10000
Pulling from projects/.../subscriptions/enqueue.app.default
2017-10-18T16:17:49+00:00 - Timeout reached
2017-10-18T16:17:49+00:00 - Starting pull with timeout 10000
Pulling from projects/.../subscriptions/enqueue.app.router
2017-10-18T16:17:59+00:00 - Timeout reached
2017-10-18T16:17:59+00:00 - Starting pull with timeout 10000
Pulling from projects/.../subscriptions/enqueue.app.default
2017-10-18T16:18:09+00:00 - Timeout reached
2017-10-18T16:18:09+00:00 - Starting pull with timeout 10000
Pulling from projects/.../subscriptions/enqueue.app.router
2017-10-18T16:18:19+00:00 - Timeout reached
2017-10-18T16:18:19+00:00 - Starting pull with timeout 10000
Pulling from projects/.../subscriptions/enqueue.app.default
2017-10-18T16:18:29+00:00 - Timeout reached
2017-10-18T16:18:29+00:00 - Starting pull with timeout 10000
Pulling from projects/.../subscriptions/enqueue.app.router
2017-10-18T16:18:39+00:00 - Timeout reached
2017-10-18T16:18:39+00:00 - Starting pull with timeout 10000
Pulling from projects/.../subscriptions/enqueue.app.default
2017-10-18T16:18:49+00:00 - Timeout reached
2017-10-18T16:18:49+00:00 - Starting pull with timeout 10000
Pulling from projects/.../subscriptions/enqueue.app.router
[info] Message received from the queue: enqueue.app.router
[debug] Headers: array (
  'message_id' => 'dc4ff5d8-ec75-4dab-8251-d4723f439f21',
  'timestamp' => 1508343461,
  'reply_to' => NULL,
  'correlation_id' => NULL,
)
[debug] Properties: array (
  'enqueue.topic_name' => 'topic',
)
[debug] Payload: 'testmessage2'
[info] Message processed: enqueue.ack
2017-10-18T16:18:55+00:00 - Starting pull with timeout 10000
Pulling from projects/.../subscriptions/enqueue.app.default
2017-10-18T16:19:05+00:00 - Timeout reached
2017-10-18T16:19:05+00:00 - Starting pull with timeout 10000
Pulling from projects/.../subscriptions/enqueue.app.router
2017-10-18T16:19:15+00:00 - Timeout reached
2017-10-18T16:19:15+00:00 - Starting pull with timeout 10000
Pulling from projects/.../subscriptions/enqueue.app.default
2017-10-18T16:19:25+00:00 - Timeout reached
2017-10-18T16:19:25+00:00 - Starting pull with timeout 10000
Pulling from projects/.../subscriptions/enqueue.app.router
2017-10-18T16:19:35+00:00 - Timeout reached
2017-10-18T16:19:35+00:00 - Starting pull with timeout 10000
Pulling from projects/.../subscriptions/enqueue.app.default
2017-10-18T16:19:45+00:00 - Timeout reached
2017-10-18T16:19:45+00:00 - Starting pull with timeout 10000
Pulling from projects/.../subscriptions/enqueue.app.router
2017-10-18T16:19:55+00:00 - Timeout reached
2017-10-18T16:19:55+00:00 - Starting pull with timeout 10000
Pulling from projects/.../subscriptions/enqueue.app.default
2017-10-18T16:20:05+00:00 - Timeout reached
2017-10-18T16:20:05+00:00 - Starting pull with timeout 10000
Pulling from projects/.../subscriptions/enqueue.app.router
2017-10-18T16:20:15+00:00 - Timeout reached
2017-10-18T16:20:15+00:00 - Starting pull with timeout 10000
Pulling from projects/.../subscriptions/enqueue.app.default
[info] Message received from the queue: enqueue.app.default
[debug] Headers: array (
  'message_id' => 'dc4ff5d8-ec75-4dab-8251-d4723f439f21',
  'timestamp' => 1508343461,
  'reply_to' => NULL,
  'correlation_id' => NULL,
)
[debug] Properties: array (
  'enqueue.topic_name' => 'topic',
  'enqueue.processor_name' => 'AppBundle\\Queue\\Processor\\TestProcessor',
  'enqueue.processor_queue_name' => 'default',
)
[debug] Payload: 'testmessage2'
testmessage2[info] Message processed: enqueue.ack
2017-10-18T16:20:20+00:00 - Starting pull with timeout 10000
Pulling from projects/.../subscriptions/enqueue.app.router
^C% 
bwegrzyn commented 7 years ago

So I think I know what is going on...

If you set the timeout to 0 (so that the server ends the connection instead of the client terminating it), it times out at exactly 90 seconds every time. This way, the messages come through instantly (assuming its pulling from the enqueue.app.router queue at the time). If you have multiple queues, it cycles through each queue with a 90s timeout, and only one at a time. So the message will not get picked up until the next time it is pulling from the enqueue.app.router queue. This sucks because if you have multiple queues, it will cycle through each queue with a 90s timeout until it gets back to the enqueue.app.router queue to pick up any new messages.

It would appear the reason why the messages don't get picked up (and only randomly at some point later) if you set the timeout to a lower value is because the server sends to the connection that the client had already terminated. Because the messages have not be acknowledged, rejected, or requeued, they won't get sent to another pull connection until ackDeadline occurs. After the deadline, it will get resent, but it might get sent to a connection that would already be terminated by the client.

I don't know enough about the internals of Enqueue so I have no idea how to fix this, but in the current multi queue implementation that Enqueue uses, the GPS driver is not very useful, unless you are ok with waiting a long time to get your messages.

bwegrzyn commented 7 years ago

Doing some more reading... it seems like the PubSub pull API endpoint does not support terminating the connection early. The documentation says it should be possible, but my testing doesn't seem to support this.

ASKozienko commented 7 years ago

if you set the timeout to a lower value is because the server sends to the connection that the client had already terminated.

enqueue does not terminate any connection, when you set --receive-timeout=10000 enqueue set option requestTimeout option, that's it. GPS library handles that, if there any bug its GPS library internal bug but not enqueue itself.

This sucks because if you have multiple queues, it will cycle through each queue with a 90s timeout until it gets back to the enqueue.app.router queue to pick up any new messages.

heh yes its true, but first of all its PHP, there is no threads. GPS its HTTP REST there is no socket connection like AMQP, there is no option to subscribe to many queues through one connection like in AMQP too.

As I said before you have to use one consumer per queue. Look at enqueue:consume option client-queue-names

makasim commented 7 years ago

As @ASKozienko said it was by design. The default behavior is to consume from all known queues, and it is good for development (no need to run 10-100 processes for every queue) or queues with a low number of messages coming from time to time (such queues could be served by a single consuming process).

The solution is to have at least one consumer process per queue with big receive timeout or even zero one.

bwegrzyn commented 7 years ago

Looks like this is indeed an issue with the PubSub library or the PubSub service. The PubSub server seems to send messages to pull requests that have already been terminated by the client.

I submitted a bug to the official PubSub library repo: https://github.com/GoogleCloudPlatform/google-cloud-php/issues/710

Might be a good idea to keep this issue open as well to track the issue and update to the a newer version of google/cloud-pubsub when this issue is fixed.

makasim commented 6 years ago

There is no reason to keep it open cuz there is nothing we can do. Though feel free to re-open if something new arrives.