depoplabs / celery-message-consumer

Tool for using the battle-tested `bin/celery` worker to consume vanilla AMQP messages (i.e. not Celery tasks)
Apache License 2.0
53 stars 12 forks source link

Can celery-message-consumer run concurrency #18

Open nhapq opened 5 years ago

nhapq commented 5 years ago

Hello,

I would like to know if celery-message-consumer can run concurrency with multiple workers, and if so, how can I do it?

Best,

anentropic commented 5 years ago

It's just a celery worker, the answer is exactly the same as for Celery (yes it can)

nucklehead commented 4 years ago

Actually I noticed your consumer is not making use of the Celery pool. This means even if you set --concurrency, the processes will get created but the other processes will never be used. You can update your __call__ function in AMQPRetryHandler to do something like this

                    self.pool.apply_async(
                        func,
                        args=(msg,),
                        # accept_callback=self.on_accepted,
                        # timeout_callback=self.on_timeout,
                        # callback=message.ack,
                        # error_callback=self.on_failure,
                        # soft_timeout=soft_time_limit or task.soft_time_limit,
                        # timeout=time_limit or task.time_limit,
                        # correlation_id=task_id,
                    )

You can get pool in the start method of AMQPRetryConsumerStep

        def start(self, c):
            self.pool = c.pool
            ...

This would also be valid for any other type of concurrency

victorct-pronto commented 3 years ago

@nucklehead I had the same issue right now, it seems to be the library doesn't user the workers pool and instead all of the messages are handled on the main process, which makes the worker_signals useless, and if you connect and disconnect to your database in those signals, the connections are not present on the main process which leads to errors and bugs.

erdnaxeli commented 3 years ago

So if this lib actually does not use the Celery worker pool, what is its use? It looks like all it does is setup a kombu consumer and then let Celery drain the connection.

DennyWeinberg commented 1 year ago

Well without concurrency it is quite worthless. Strange that celery doesn't support topic exchanges and wildcard bindings out of the box :(