RichardKnop / machinery

Machinery is an asynchronous task queue/job queue based on distributed message passing.
Mozilla Public License 2.0
7.57k stars 917 forks source link

Adding new workers is not picking up the tasks from the existing queue #261

Open bseenu opened 6 years ago

bseenu commented 6 years ago

I have a queue with few tasks and some workers doing the tasks, but adding new workers to is not picking up the tasks, bringing up the worker just shows the log

INFO: 2018/03/07 17:19:28 amqp.go:74 [*] Waiting for messages. To exit press CTRL+C

When there are many messages to consume

bseenu commented 6 years ago

Looks like this is only push based model, the worker is waiting for the event to pick up the task - https://github.com/RichardKnop/machinery/blob/master/v1/brokers/amqp.go#L172-L203

I think the model should be to connect to the queue directly and start consuming which will take care of the cases where the worker is added later on too

RichardKnop commented 6 years ago

@bseenu I will need to run tests but I am quite confident this is not the case. You can easily test that by adding some tasks to a queue and then after that launching a worker and it will consume the tasks.

Easiest way to test it:

go run example/machinery.go send

Later do:

go run example/machinery.go worker

You can even just publish JSON payloads directly to a queue and then launch worker and it will consume the tasks.

Make sure your routing configuration is correct. Take a look at ExchangeType or BindingKey.

For direct exchanges, it will use the same binding and routing key: https://github.com/RichardKnop/machinery/blob/c532b2a685fe0c35d77fca82f861845ad8b8a09b/v1/brokers/broker.go#L100

So it will consume tasks from the same queue as your binding key and tasks will be published using routing key which is identical.

Make sure you understand how RabbitMQ routing works: https://www.rabbitmq.com/tutorials/tutorial-four-python.html

And verify your AMQP routing settings are what you want them to be (i.e. in simplest setups you will probably want to use direct exchange type and then all tasks will simply be routed to the same queue based on the binding key and when you send tasks to workers they will also be published to the same queue).

bseenu commented 6 years ago

My settings are identical and i am using the direct exchange type as well

I have started some long running tasks yesterday with 4 workers and the messages were not getting de-queued as expected so to make it faster i will bring up 4 additional workers with the same settings as earlier ones and they just keep waiting there even though when they are message

I am able to reproduce this only the workers which were adding during pushing of tasks are working

RichardKnop commented 6 years ago

If these tasks are long running, they won't get acked by workers until they are processed. Could it be other worker is already processing them?

bseenu commented 6 years ago

Possible, but i had many messages in the queue in hundreds

georgekarrv commented 6 years ago

I am seeing similar behavior with Redis as the queue broker.

georgekarrv commented 6 years ago

so we are spinning up 10 docker containers that each launch 1 worker. using redis as the broker it works fine for some time and then it will stop pushing pending jobs through and when i jump onto the redis image and run 'dump machinery_tasks' the value is empty. There seems to be an issue where the key machinery tasks gets clobbered.

georgekarrv commented 6 years ago

@RichardKnop I would like to address this issue with a PR that limits the worker from grabbing a task off the queue unless it has a pool available but I do not have permissions to create a branch to base the PR off of.

georgekarrv commented 6 years ago

At least for redis I simply moved the pools creation to StartConsuming and monitor len(pool) - len(deliveries) > 0 to allow pulling off the queue This works very well for me.

diff --git a/v1/brokers/redis.go b/v1/brokers/redis.go
index afa46e6..9ea6e40 100644
--- a/v1/brokers/redis.go
+++ b/v1/brokers/redis.go
@@ -70,6 +70,14 @@ func (b *RedisBroker) StartConsuming(consumerTag string, concurrency int, taskPr

    // Channel to which we will push tasks ready for processing by worker
    deliveries := make(chan []byte)
+   pool := make(chan struct{}, concurrency)
+
+   // initialize worker pool with maxWorkers workers
+   go func() {
+       for i := 0; i < concurrency; i++ {
+           pool <- struct{}{}
+       }
+   }()

    // A receivig goroutine keeps popping messages from the queue by BLPOP
    // If the message is valid and can be unmarshaled into a proper structure
@@ -85,12 +93,16 @@ func (b *RedisBroker) StartConsuming(consumerTag string, concurrency int, taskPr
            case <-b.stopReceivingChan:
                return
            default:
-               task, err := b.nextTask(b.cnf.DefaultQueue)
-               if err != nil {
-                   continue
+               // If concurrency is limited, limit the tasks being pulled off the queue
+               // until a pool is available
+               if concurrency == 0 || (len(pool) - len(deliveries) > 0) {
+                   task, err := b.nextTask(b.cnf.DefaultQueue)
+                   if err != nil {
+                       continue
+                   }
+
+                   deliveries <- task
                }
-
-               deliveries <- task
            }
        }
    }()
@@ -125,7 +137,7 @@ func (b *RedisBroker) StartConsuming(consumerTag string, concurrency int, taskPr
        }
    }()

-   if err := b.consume(deliveries, concurrency, taskProcessor); err != nil {
+   if err := b.consume(deliveries, pool, concurrency, taskProcessor); err != nil {
        return b.retry, err
    }

@@ -214,16 +226,7 @@ func (b *RedisBroker) GetPendingTasks(queue string) ([]*tasks.Signature, error)

 // consume takes delivered messages from the channel and manages a worker pool
 // to process tasks concurrently
-func (b *RedisBroker) consume(deliveries <-chan []byte, concurrency int, taskProcessor TaskProcessor) error {
-   pool := make(chan struct{}, concurrency)
-
-   // initialize worker pool with maxWorkers workers
-   go func() {
-       for i := 0; i < concurrency; i++ {
-           pool <- struct{}{}
-       }
-   }()
-
+func (b *RedisBroker) consume(deliveries <-chan []byte, pool chan struct{}, concurrency int, taskProcessor TaskProcessor) error {
    errorsChan := make(chan error, concurrency*2)

    for {
RichardKnop commented 6 years ago

@georgekarrv You can fork the repo and create PR like that.

georgekarrv commented 6 years ago

271 but this is a solution for the redis broker. something similar would have to be done for the other brokers.