cakephp / queue

A queue-interop compatible Queueing library
MIT License
37 stars 19 forks source link

Queue Worker filtered removing messages from all queues #117

Open andreinocenti opened 1 year ago

andreinocenti commented 1 year ago

Hello,

I'm using cake4 and mongoDb as transport

I've the following scenario where I've 2 different messages in 2 different queues

QueueManager::push(ExampleJob::class, ['id' => 1, 'code' => 'oi1'], ['queue' => 'test']); QueueManager::push(ExampleJob::class, ['id' => 1, 'code' => 'oi1'], ['queue' => 'new-test']);

And when I run the worker by queue, all messages are removed from the database but only the correct queue is called. If I run the cli below, both jobs will be removed from database, but only the first one will run. bin/cake queue worker --queue=test

Am I using it wrongly or is this a bug?

Thank you

markstory commented 1 year ago

How would someone go about reproducing the issue you're having? Are you able to reproduce this issue using a transport that isn't mongo db?

andreinocenti commented 1 year ago

I didn't try it in another transport, only did it with mongodb. But if it helps I can try it with another. This simple steps can reproduce it (I tried in 2 different enviroments with the same infra)

Push 2 jobs to the queue, both with different 'queue' options as below QueueManager::push(ExampleJob::class, ['id' => 1, 'code' => 'oi1'], ['queue' => 'test']); QueueManager::push(ExampleJob::class, ['id' => 1, 'code' => 'oi1'], ['queue' => 'new-test']);

Open any database client (I'm using MongoDB Compass) to check if the jobs were saved on the queue collection.

After that, run the worker as below bin/cake queue worker --queue=test

After you run the worker, only the "test" job will be executed, but both jobs will be erased from the queue. The "new-test" job will be erased without execution.

amayer5125 commented 1 year ago

I tried to reproduce this and think there is something fishy going on. I'm using beanstalkd as the backend. It looks like all jobs are being put in the same tube (beanstalk queue).

I created an ExampleCommand that accepts --queue and passes that to the options parameter of QueueManager::push(). When I run the command it looks like both jobs are put on the default queue.

$ galley cake example --queue test                  
Options: {"queue":"test"}
Job Queued
$ galley exec -T beanstalk ash -s < stats.sh
==> enqueue.app.default <==
current-jobs-ready: 1

$ galley cake example --queue new-test      
Options: {"queue":"new-test"}
Job Queued
$ galley exec -T beanstalk ash -s < stats.sh
==> enqueue.app.default <==
current-jobs-ready: 2

When I start a worker it looks like both jobs get pulled off the default queue. One of them is "routed" to an event subscriber. The other is routed to "0" subscribers.

$ galley cake worker --verbose --queue test
2023-03-01 04:09:49 Debug: Change logger from "Cake\Log\Engine\FileLog" to "Cake\Log\Engine\ConsoleLog"
2023-03-01 04:09:49 Debug: Consumption has started
2023-03-01 04:09:49 Debug: [SetRouterPropertiesExtension] Set router processor "enqueue.client.router_processor"
2023-03-01 04:09:49 Debug: Received from enqueue.app.default    {"class":["App\\Job\\ExampleJob","execute"],"args":[{"id":1,"code":"oi1"}],"data":{"id":1,"code":"oi1"},"requeueOptions":{"config":"default","priority":null,"queue":"test"}}
2023-03-01 04:09:49 Info: [client] Processed test -> enqueue.client.router_processor    {"class":["App\\Job\\ExampleJob","execute"],"args":[{"id":1,"code":"oi1"}],"data":{"id":1,"code":"oi1"},"requeueOptions":{"config":"default","priority":null,"queue":"test"}}   ack Routed to "1" event subscribers
2023-03-01 04:09:49 Debug: [SetRouterPropertiesExtension] Set router processor "enqueue.client.router_processor"
2023-03-01 04:09:49 Debug: Received from enqueue.app.default    {"class":["App\\Job\\ExampleJob","execute"],"args":[{"id":1,"code":"oi1"}],"data":{"id":1,"code":"oi1"},"requeueOptions":{"config":"default","priority":null,"queue":"new-test"}}
2023-03-01 04:09:49 Info: [client] Processed new-test -> enqueue.client.router_processor    {"class":["App\\Job\\ExampleJob","execute"],"args":[{"id":1,"code":"oi1"}],"data":{"id":1,"code":"oi1"},"requeueOptions":{"config":"default","priority":null,"queue":"new-test"}}   ack Routed to "0" event subscribers
2023-03-01 04:09:49 Debug: Received from enqueue.app.default    {"class":["App\\Job\\ExampleJob","execute"],"args":[{"id":1,"code":"oi1"}],"data":{"id":1,"code":"oi1"},"requeueOptions":{"config":"default","priority":null,"queue":"test"}}
2023-03-01 04:09:49 Info: ExampleJob received arguments: {"id":1,"code":"oi1"}
2023-03-01 04:09:49 Debug: Message processed sucessfully
2023-03-01 04:09:49 Info: [client] Processed test -> Cake\Queue\Queue\Processor63fed00d9e9aa    {"class":["App\\Job\\ExampleJob","execute"],"args":[{"id":1,"code":"oi1"}],"data":{"id":1,"code":"oi1"},"requeueOptions":{"config":"default","priority":null,"queue":"test"}}   ack

The only way I can seem to get the jobs onto different queues is to add the queue option to the config.

'Queue' => [
    'test' => [
        'url' => 'beanstalk://beanstalk:11300',
        'queue' => 'test',
    ],
    'new-test' => [
        'url' => 'beanstalk://beanstalk:11300',
        'queue' => 'new-test',
    ],
],

Then I pass ['config' => 'test'] and ['config' => 'new-test'] as the options parameter.

$ galley cake example --config test
Options: {"config":"test"}
Job Queued
$ galley cake example --config new-test
Options: {"config":"new-test"}
Job Queued
$ galley exec -T beanstalk ash -s < stats.sh
==> enqueue.app.test <==
current-jobs-ready: 1

==> enqueue.app.new-test <==
current-jobs-ready: 1

Then when I run the queue worker with the --config option it runs as expected. I have to run it twice, once for each queue.

markstory commented 1 year ago

The only way I can seem to get the jobs onto different queues is to add the queue option to the config.

I think we might have an issue in how messages are being enqueued then. I can take a look.

markstory commented 1 year ago

I've tracked this down to being related to how QueueManager::push() generates Enqueue Client instances. When a client instance is created. Enqueue\SimpleClient\SimpleClient will use our 'queue' configuration value to set the client.router_topic and client.router_queue configuration values. These values decide the beanstalk tube or filesystem job log name as seen in 8a0b4c7c35db05aba2109833bc60172f18f8fc46.

While the queue option does get persisted into the message it does not get reflected into the producer configuration which is why both of your messages ended up the enqueue.app.test 'topic'. When a consumer binds to the topic/queue it binds with a Route with the topic name. When a topic has mixed queue messages ones with no matching binding are silently acked (I've not tracked where this happens yet.

I'm starting to think that all the queue options are a poor design that doesn't work well within enqueue. Instead each 'queue' of work would need its own configuration key much like how we handle caching and database connections. While less flexible this lets us provide a functional interface, as we missed this earlier on in the plugin development.

markstory commented 1 year ago

When a topic has mixed queue messages ones with no matching binding are silently acked (I've not tracked where this happens yet.

The output we're seeing when messages are consumed is coming from the RouteProcessor which matches messages with 'routes'. Unfortunately our current worker command on binds a single route. I think if we were to call bindTopic() for all the possible queues, no messages would be lost. However you wouldn't gain any separation of work that you want from multiple queues.

I think if you're using a transport like beanstalk or files that don't support fanout workloads you need to use separate queue configurations instead.