vyuldashev / laravel-queue-rabbitmq

RabbitMQ driver for Laravel Queue. Supports Laravel Horizon.
MIT License
1.91k stars 377 forks source link

Queue listener enters an endless loop error state when dispatching a delayed job #374

Closed judgej closed 3 years ago

judgej commented 3 years ago

Describe the bug

The connection or channel seems to get closed when dispatching a delayed job, leading to a cascade of issues that result in a listener running into an endless loop of error messages.

Steps To Reproduce

A running job dispatches another job with a delay.

Current behavior

First I get an ACCESS_DENIED exception, then an endless loop of AMQPChannelClosedException exceptions. The job and lister do not exit, but enter this endless loop state.

Expected behavior

A job should be dispatched.

Additional context

The lost connection or channel, resulting in the ACCESS_DENIED error can be reproducred in artisan tinker. With some class names changed, it looks like this:

>>> $j = new App\Jobs\TestJob()
=> App\Jobs\TestJob {#3155
     +connection: null,
     +queue: null,
     +chainConnection: null,
     +chainQueue: null,
     +delay: null,
     +middleware: [],
     +chained: [],
   }
>>> dispatch($j->delay(Carbon\Carbon::now()->addSeconds(50)))
=> Laravel\Lumen\Bus\PendingDispatch {#3156}
>>> DB::commit()
=> null

Then the same again in the same tinker session:

>>> $j = new App\Jobs\TestJob()
=> App\Jobs\TestJob {#3198
     +connection: null,
     +queue: null,
     +chainConnection: null,
     +chainQueue: null,
     +delay: null,
     +middleware: [],
     +chained: [],
   }
>>> dispatch($j->delay(Carbon\Carbon::now()->addSeconds(50)))
=> Laravel\Lumen\Bus\PendingDispatch {#3199}
>>> DB::commit()
PhpAmqpLib/Exception/AMQPProtocolChannelException with message 'ACCESS_REFUSED - access to exchange 'amq.default' in vhost 'myhost' refused for user 'myuser''

Notes:

judgej commented 3 years ago

Now, the running queue listener that goes into the endless loop. It first hits this exeption:

PhpAmqpLib\\Exception\\AMQPProtocolChannelException: ACCESS_REFUSED - access to exchange 'amq.default' in vhost 'sand' refused for user 'agg' in /var/www/vendor/php-amqplib/php-amqplib/PhpAmqpLib/Channel/AMQPChannel.php:215
Stack trace:
#0 [internal function]: PhpAmqpLib\\Channel\\AMQPChannel->channel_close(Object(PhpAmqpLib\\Wire\\AMQPReader))
#1 /var/www/vendor/php-amqplib/php-amqplib/PhpAmqpLib/Channel/AbstractChannel.php(216): call_user_func(Array, Object(PhpAmqpLib\\Wire\\AMQPReader))
#2 /var/www/vendor/php-amqplib/php-amqplib/PhpAmqpLib/Channel/AbstractChannel.php(373): PhpAmqpLib\\Channel\\AbstractChannel->dispatch('20,40', '\\x01\\x93XACCESS_REFUS...', NULL)
#3 /var/www/vendor/php-amqplib/php-amqplib/PhpAmqpLib/Channel/AMQPChannel.php(1065): PhpAmqpLib\\Channel\\AbstractChannel->wait(Array, false, 0)
#4 /var/www/vendor/vladimir-yuldashev/laravel-queue-rabbitmq/src/Queue/RabbitMQQueue.php(232): PhpAmqpLib\\Channel\\AMQPChannel->basic_get('agg.jobs.fetch-...')
#5 /var/www/vendor/illuminate/queue/Worker.php(284): VladimirYuldashev\\LaravelQueueRabbitMQ\\Queue\\RabbitMQQueue->pop('agg.jobs.fetch-...')
#6 /var/www/vendor/illuminate/queue/Worker.php(123): Illuminate\\Queue\\Worker->getNextJob(Object(VladimirYuldashev\\LaravelQueueRabbitMQ\\Queue\\RabbitMQQueue), 'agg.jobs.fetch-...')
#7 /var/www/vendor/illuminate/queue/Console/WorkCommand.php(112): Illuminate\\Queue\\Worker->daemon('rabbitmq', 'agg.jobs.fetch-...', Object(Illuminate\\Queue\\WorkerOptions))
#8 /var/www/vendor/illuminate/queue/Console/WorkCommand.php(96): Illuminate\\Queue\\Console\\WorkCommand->runWorker('rabbitmq', 'agg.jobs.fetch-...')
#9 /var/www/vendor/illuminate/container/BoundMethod.php(31): Illuminate\\Queue\\Console\\WorkCommand->handle()
#10 /var/www/vendor/illuminate/container/Util.php(37): Illuminate\\Container\\BoundMethod::Illuminate\\Container\\{closure}()
#11 /var/www/vendor/illuminate/container/BoundMethod.php(88): Illuminate\\Container\\Util::unwrapIfClosure(Object(Closure))
#12 /var/www/vendor/illuminate/container/BoundMethod.php(32): Illuminate\\Container\\BoundMethod::callBoundMethod(Object(Laravel\\Lumen\\Application), Array, Object(Closure))
#13 /var/www/vendor/illuminate/container/Container.php(590): Illuminate\\Container\\BoundMethod::call(Object(Laravel\\Lumen\\Application), Array, Array, NULL)
#14 /var/www/vendor/illuminate/console/Command.php(134): Illuminate\\Container\\Container->call(Array)
#15 /var/www/vendor/symfony/console/Command/Command.php(255): Illuminate\\Console\\Command->execute(Object(Symfony\\Component\\Console\\Input\\ArgvInput), Object(Illuminate\\Console\\OutputStyle))
#16 /var/www/vendor/illuminate/console/Command.php(121): Symfony\\Component\\Console\\Command\\Command->run(Object(Symfony\\Component\\Console\\Input\\ArgvInput), Object(Illuminate\\Console\\OutputStyle))
#17 /var/www/vendor/symfony/console/Application.php(1009): Illuminate\\Console\\Command->run(Object(Symfony\\Component\\Console\\Input\\ArgvInput), Object(Symfony\\Component\\Console\\Output\\ConsoleOutput))
#18 /var/www/vendor/symfony/console/Application.php(273): Symfony\\Component\\Console\\Application->doRunCommand(Object(Illuminate\\Queue\\Console\\WorkCommand), Object(Symfony\\Component\\Console\\Input\\ArgvInput), Object(Symfony\\Component\\Console\\Output\\ConsoleOutput))
#19 /var/www/vendor/symfony/console/Application.php(149): Symfony\\Component\\Console\\Application->doRun(Object(Symfony\\Component\\Console\\Input\\ArgvInput), Object(Symfony\\Component\\Console\\Output\\ConsoleOutput))
#20 /var/www/vendor/illuminate/console/Application.php(93): Symfony\\Component\\Console\\Application->run(Object(Symfony\\Component\\Console\\Input\\ArgvInput), Object(Symfony\\Component\\Console\\Output\\ConsoleOutput))
#21 /var/www/vendor/laravel/lumen-framework/src/Console/Kernel.php(115): Illuminate\\Console\\Application->run(Object(Symfony\\Component\\Console\\Input\\ArgvInput), Object(Symfony\\Component\\Console\\Output\\ConsoleOutput))
#22 /var/www/artisan(35): Laravel\\Lumen\\Console\\Kernel->handle(Object(Symfony\\Component\\Console\\Input\\ArgvInput), Object(Symfony\\Component\\Console\\Output\\ConsoleOutput))
#23 {main}

Again, it should not be touching amq.default - we are very explicit about the exchange everything goes to.

We are using php artisan queue:work --queue=jobs.default to listen.

After the initial access denied, it looks like the connectino gets closed, and the listener goes into a loop issing this exception:

PhpAmqpLib\\Exception\\AMQPChannelClosedException: Channel connection is closed. in /var/www/vendor/php-amqplib/php-amqplib/PhpAmqpLib/Channel/AbstractChannel.php:244
Stack trace:
#0 /var/www/vendor/php-amqplib/php-amqplib/PhpAmqpLib/Channel/AMQPChannel.php(1060): PhpAmqpLib\\Channel\\AbstractChannel->send_method_frame(Array, Object(PhpAmqpLib\\Wire\\AMQPWriter))
#1 /var/www/vendor/vladimir-yuldashev/laravel-queue-rabbitmq/src/Queue/RabbitMQQueue.php(232): PhpAmqpLib\\Channel\\AMQPChannel->basic_get('agg.jobs.fetch-...')
#2 /var/www/vendor/illuminate/queue/Worker.php(284): VladimirYuldashev\\LaravelQueueRabbitMQ\\Queue\\RabbitMQQueue->pop('agg.jobs.fetch-...')
#3 /var/www/vendor/illuminate/queue/Worker.php(123): Illuminate\\Queue\\Worker->getNextJob(Object(VladimirYuldashev\\LaravelQueueRabbitMQ\\Queue\\RabbitMQQueue), 'agg.jobs.fetch-...')
#4 /var/www/vendor/illuminate/queue/Console/WorkCommand.php(112): Illuminate\\Queue\\Worker->daemon('rabbitmq', 'agg.jobs.fetch-...', Object(Illuminate\\Queue\\WorkerOptions))
#5 /var/www/vendor/illuminate/queue/Console/WorkCommand.php(96): Illuminate\\Queue\\Console\\WorkCommand->runWorker('rabbitmq', 'agg.jobs.fetch-...')
#6 /var/www/vendor/illuminate/container/BoundMethod.php(31): Illuminate\\Queue\\Console\\WorkCommand->handle()
#7 /var/www/vendor/illuminate/container/Util.php(37): Illuminate\\Container\\BoundMethod::Illuminate\\Container\\{closure}()
#8 /var/www/vendor/illuminate/container/BoundMethod.php(88): Illuminate\\Container\\Util::unwrapIfClosure(Object(Closure))
#9 /var/www/vendor/illuminate/container/BoundMethod.php(32): Illuminate\\Container\\BoundMethod::callBoundMethod(Object(Laravel\\Lumen\\Application), Array, Object(Closure))
#10 /var/www/vendor/illuminate/container/Container.php(590): Illuminate\\Container\\BoundMethod::call(Object(Laravel\\Lumen\\Application), Array, Array, NULL)
#11 /var/www/vendor/illuminate/console/Command.php(134): Illuminate\\Container\\Container->call(Array)
#12 /var/www/vendor/symfony/console/Command/Command.php(255): Illuminate\\Console\\Command->execute(Object(Symfony\\Component\\Console\\Input\\ArgvInput), Object(Illuminate\\Console\\OutputStyle))
#13 /var/www/vendor/illuminate/console/Command.php(121): Symfony\\Component\\Console\\Command\\Command->run(Object(Symfony\\Component\\Console\\Input\\ArgvInput), Object(Illuminate\\Console\\OutputStyle))
#14 /var/www/vendor/symfony/console/Application.php(1009): Illuminate\\Console\\Command->run(Object(Symfony\\Component\\Console\\Input\\ArgvInput), Object(Symfony\\Component\\Console\\Output\\ConsoleOutput))
#15 /var/www/vendor/symfony/console/Application.php(273): Symfony\\Component\\Console\\Application->doRunCommand(Object(Illuminate\\Queue\\Console\\WorkCommand), Object(Symfony\\Component\\Console\\Input\\ArgvInput), Object(Symfony\\Component\\Console\\Output\\ConsoleOutput))
#16 /var/www/vendor/symfony/console/Application.php(149): Symfony\\Component\\Console\\Application->doRun(Object(Symfony\\Component\\Console\\Input\\ArgvInput), Object(Symfony\\Component\\Console\\Output\\ConsoleOutput))
#17 /var/www/vendor/illuminate/console/Application.php(93): Symfony\\Component\\Console\\Application->run(Object(Symfony\\Component\\Console\\Input\\ArgvInput), Object(Symfony\\Component\\Console\\Output\\ConsoleOutput))
#18 /var/www/vendor/laravel/lumen-framework/src/Console/Kernel.php(115): Illuminate\\Console\\Application->run(Object(Symfony\\Component\\Console\\Input\\ArgvInput), Object(Symfony\\Component\\Console\\Output\\ConsoleOutput))
#19 /var/www/artisan(35): Laravel\\Lumen\\Console\\Kernel->handle(Object(Symfony\\Component\\Console\\Input\\ArgvInput), Object(Symfony\\Component\\Console\\Output\\ConsoleOutput))
#20 {main}

Note: nothing in this package seems to be able to recognise that as a lost connection. The AMQPProtocolChannelException exception is not explicitly handled anywhere (I believe it should). The text of the exception is not recognised by laravel or lumen as a lost connection. The text is "Channel connection is closed", and the list of [English] phases that Laravel recognises as a lost connection does not cover this: https://github.com/laravel/framework/blob/8.x/src/Illuminate/Database/DetectsLostConnections.php#L16

Those laravel lost connection phrases are an awful solution IMO. No package can expand on them, they were designed primarily for lost database connections, but seem to be used to detect lost queue connections too (this is all in Illuminate), and they are all in English, so make some massive assumptions about teh locale that servers are set up in.

judgej commented 3 years ago

So the perfect storm:

I think this needs a fix (a) in Laravel (to add "connection is closed" to its list of lost connection hints), and a fix in this library to (b) allow two delayed jobs to be dispatched without the second one failing; and (c) to recognise the lost connection when this does happen and recover from it nicely rather than eating CPU, IO and blocking the queue.

I'm working on finding the cause of (b) as that is what will allow us to move forward. (a) and (c) will help to make the framework more robust.

judgej commented 3 years ago

One theory I checked was that the message was beind added to the queue before the queue was actually created, in case it was being created asynchronously. A ten second sleep showed me this was not the case.

judgej commented 3 years ago

This quickly throws my queue listener into a spin, when handled as a a job:

public function handle()
{
   $job = new static();
   dispatch($job->delay(Carbon::now()->addSeconds($this->delayedDispatch)));
}

Again, the delay queue is created, but nothing gets put into it. Here are some [truncated] log entries showing the sequence:

 {"message":"Dispatching a new instance of this job with a delay of 30 seconds","context":{},"level":200,"level_name":"INFO","channel":"development","datetime":"2020-10-28T16:52:18

 {"message":"SlowProcessTestJob iteration 4 of 6 sleeping for 1 seconds","context":

 [2020-10-28 16:52:22][cbf2b235-f565-4991-b1f0-de6142f868f7] Processed:  App\Jobs\TestJob

 {"message":"PhpAmqpLib\\Exception\\AMQPProtocolChannelException: ACCESS_REFUSED - access to exchange 'amq.default' in vhost 'dev' refused for user 'agg' in /var/www/vendor/php-amq

 {"message":"PhpAmqpLib\\Exception\\AMQPChannelClosedException: Channel connection is closed. in /var/www/vendor/php-amqplib/php-amqplib/PhpAmqpLib/Channel/AbstractChannel.php:244\

 {"message":"PhpAmqpLib\\Exception\\AMQPChannelClosedException: Channel connection is closed. in /var/www/vendor/php-amqplib/php-amqplib/PhpAmqpLib/Channel/AbstractChannel.php:244\

 {"message":"PhpAmqpLib\\Exception\\AMQPChannelClosedException: Channel connection is closed. in /var/www/vendor/php-amqplib/php-amqplib/PhpAmqpLib/Channel/AbstractChannel.php:244\

 {"message":"PhpAmqpLib\\Exception\\AMQPChannelClosedException: Channel connection is closed. in /var/www/vendor/php-amqplib/php-amqplib/PhpAmqpLib/Channel/AbstractChannel.php:244\

etc
adm-bome commented 3 years ago

most or not all, is related to your access in RabbitMQ for the vhost and user

judgej commented 3 years ago

I believe the user has access to everything that the application should be touching. Nothing should be going to the default exchange.

Running the worker for the test job, it is creating the delay queue correctly, but not putting the delayed job into it. The code says it puts the message directly into the queue, without touching an exchange. Once it creates the queue, the connection is lost - teh job does not get ACKed. So every time I run the job (php artisan queue:work --once --queue=jobs.default) it is still there to run again OR the job that is not getting to the the delay queue is ending up in the non-delayed queue through some kind of routing.

What is supposed to happen? Why is the amq.default exchange involved in any of the steps to dispatch a delayed job, when I have an explicit exchange set up to send jobs to?

adm-bome commented 3 years ago

amq.default is the default exchange rabbitMQ uses internally when routing messages directly to a queue.

RabbitMQ is always using an exchange ;)

But

Try with the guest account and vhost /

adm-bome commented 3 years ago

Ohw and when delaying messages the messages are put into an queue "directly" via the exchange amq.direct. this queue name is based on the queue your publishing to and death letters to the queue in the connection config or the queue set in the JobClass.

So the queue must exist to be correcly routed by DLX

Example:

When Your Job is dispaching to queue: orders on connection rabbitmq with a delay of 30 seconds.

An delay queue is created with the name: orders.x.delay.30000 and with an DLX set to: orders.

The message must arrive in the delay queue (under the hood, via the default direct exchange).

After 30000 miliseconds the message death-letters into the DLX queue, but only when it exists. (This is done by RabbitMQ) In order to pick up the message when the message death-letters.... the orders queue must exist. And finally your message is picked-up by the worker.

Important!!! When your process, is starting off with the delaying of a message, the orders queue is missing. So you have to pre declare this queue if its not present.

adm-bome commented 3 years ago

And if you wonder why this Lib does not create queues in these situations.....

  1. There are a dozen use-cases to handle. And this Library is not about handling all these cases.

and more importantly:

  1. This keeps RabbitMQ separated from laravel and visa versa. (separation of concerns) When the software keeps running, but you don't want the integration anymore. An admin can delete the orders queue and the process behind it will stop. This is very handy in cases where you lose track of your own software implementations... or want to be flexible without breaking things in the code.
judgej commented 3 years ago

Thank you, lots of great details there.

We do create all the main queues in advance, so they exist and are bound to the exchange we have set up. I totally agree this is the way to handle it. It lets us handle the config for unrouted messages and deadlettering in a known and consistent way outside of this package. We have a named exchange as we are running multiple applications in a group (using docker and K8s) that share the RabbitMQ servers. We wanted to make sure the jobs are kept separate betwee the applications, and queues and exchanges have permissions set so they they cannot access each others resources. This is as much for safety in the event of misconfiguration as any security reason.

Now, using the amq.default exchange caught us off-guard. The application has the default exchange set for that application, and we expected all job messages to go through that exchange. The delayed jobs don't go through that exchange - they go through a default exchange that is shared by all applications. That makes it hard to protect queues with permissions (based on name-spaced patterns), it opens the risk of similar named queues on different applications picking up messages from other applications (we mitigate that to some extent with prefixes to the queue names to namespace them by application and function), and it makes it harder to catch unrouted messages within the applications that generated them since all applications send their delayed job messages through a single point. We really wanted to avoid that.

I can imagine a delayed job could get unrouted by a race condition: the queue exists to put it on, so a new queue is not created, but is just about to expire. By the time the message gets to the agg.default exchange, the queue may have gone. It is a tiny time during which this can happen, but high volumes of jobs will find those conditions eventually.

So, would it make sense to use the same exchange as defined in the package config as the "default exchange" for routing the delayed jobs? I personally think it should for consistency. We have already set up our unroutable policies in that exchange, so it is best ready to serve that application. I realise it may involve an additional request to the RabbitMQ server to create the binding, if the delay queue also needs creating.

judgej commented 3 years ago

Just on the expiry of the temporary delay queues. They are set to expire at double the delay they are implementing, i.e. the expiry of the messages put into them. RabbitMQ will expire those queues at the alloted time if they are not being used. And by "used", taht means they have consumers - and these temporary queues will not have consumers. If the queue expires and it has messages in, those messages could be dropped (I'm not entirely clear from the docs, but it implies the queues will not be dead-lettered).

However, it is possible to redeclare the queue, or "reclaim" the queue, and that will reset the expiry timer back to the start again. If this is not being done, and I am right about messages being dropped on an expired queue with no consumers, then I would recommend reclaiming it each time it is used to push a delayed job onto it. Maybe this is done already - I don't know.

adm-bome commented 3 years ago

Oke oke, Now it becomes a litle bit clearer, what you want to accomplish.

As I pointed out... rabbitMQ has its own way of working/handling messages and directly publishing to a queue is not possible. Under the hood rabbitMQ makes use of its own exchages.

I think there is a way within RabbitMQ to accomplish what you want. This is called virtual separation and can be done with VHosts. Each vhost has its own default (direct) exchange and is not shared.

Note: do not mistake de default queue in the config as te default exchange. The default queue in the connection config is a fallback when a Job or the Dispatcher has no queue configured of its own.

U can and almost always will configure multiple connections for different use-cases. Depending on the use-case your Jobs specify the connection and queue for consistancy.

Also keep in mind workers must work and only do as little as possible. They are not responsible for publishing Jobs or routing or whatever... Try not to mix the concepts and keep the responsibility where it belongs. It save you from allot of headaches. JOBS FAIL OR THEY DO NOT. when you want to do something with failed jobs you have to workout a diffrent workflow to handle these situations. Dont put all things into one 'smart big peace of a' Job which handles every edge case.

Most of all the times, Job has only one way it can be handled, all other way's it has-to/must fail. And if not, then I can suggest you have to rethink your current workflow and create more smaler Jobs.

judgej commented 3 years ago

Out vhosts are being used to separate environments at the moment (e.g. sandpit, staging) so we would need to rethink that to be vhosts to separate applications.

Our jobs tend to do small chunks of functionality, which is why jobs are dispatching new jobs. For example, a job may be dispatched to handle a webhook. That job then inspects the webhook, then dispatches another job to handle the action for the webhook, depending on just what hthe webhook says. Some of those jobs need a delay because some webhooks tell our application that X has happened, but X needs to be left for a minute to settle down before we can actually process it. That's where the delayed jobs come in.

Some jobs use APIs with strict rate limits - we may fetch 60 records over an API, and then need to wait for 60 seconds before we can make any more requests. In that case, the job saves what it has got so far, then dispatches itself to carry on where it left off, with a delay of 60 seconds.

Our jobs interact with Laravel, and do not touch any of the AMQP messaging directly. They just do things "the laravel way" and we are not trying to break out of that.

It is the exchange in the config that we have set to out own named exchange. We were expecting the package to always use that exchange, which it does most of the time, but not when publishing a delayed job. I'm not clear why that would need to be different.

adm-bome commented 3 years ago

There are multiple types of expiration on queues. And when it gets removed or not. You're partly right about consumers and queues and auto removing. This is also an setup you can use or implement.

Only: In the case of delayed queues created with this library and DLX configured, Consumers have nothing to do with the queues. Laravel does not even know the queues exist. It is all handled by RabbitMQ.

A delay queue must only contain messages with one ttl. If u put multiple messages with diffrent ttl into one queue messages get lost. Beleave me when i say so. But if not google it.

In case there is no message into a delay queue for the delay * 2 the queue can be savely deleted. This is done by RabbitMQ so instantly. When laravel wants to delay a message it is run through code. And can create it again. your concerns about lossing messages most likely will not occure. This concept is proven and well tested. Every time a message gets added to the delay queue the timer resets to the dubble amount of time a message can be pressent in this queue. So no woories when ther are no messages there are no messages. Not even between te 0.0001 and 0.0002 seconds.

adm-bome commented 3 years ago

Prefix or suffix you vhosts with a dot notation if you want your environments to also split up.

application.1 application.1.dev application.1.staging

adm-bome commented 3 years ago

Have you looked at Laravels Job chaining an how laravel handles these chaines? Realy what you describe above is exacly this concept. But i untherstand why one would dispatch jobs from within jobs. There is nothing wrong with it as long as you know what your doing and there is no other way/alternative.

Realy have a look at Job chaining... and queuing of these chaines

adm-bome commented 3 years ago

We were expecting the package to always use that exchange, which it does most of the time, but not when publishing a delayed job. I'm not clear why that would need to be different.

Again when pushing a message directly to a queue name this is done with de Direct exchange. I diddnt write rabbitMQ. If you use exchanges you publish on a route not a queue name.

There is a real difference between those methods.

Delay queues are not bound to an exchange by this Library. Messages are published directly because there is no need to create an exchange a queue and a binding for an queue you know the name of.

Exchanges come into play when you want to route message to one ore multiple queues and you dont know the queue names but only the context.

adm-bome commented 3 years ago

I'm closing this issue. Because there is no issue. ;)

Feel free to continue the conversation

judgej commented 3 years ago

So to be clear, so it helps me and others following to understand how this package expects RabbitMQ to be set up:

adm-bome commented 3 years ago

Overall, Yep.

  • Going into an endless loop if permissions in RabbitMQ are not correctly set up is acceptable. It is something to keep an eye on, but should only be a problem when initially setting up a project.

For every setup/framework with any broker, if its: RabbitMQ, Redis, Beenstalked or...... If you don't use it like it should.... you get errors and unexpected results. Should it be the Libraries responsibility to correct these errors? I don't think so....

Endless loops can also be explained a an good thing. Because maybe you loose messages in the process. so when an worker is failing to setup a normal connection this is the behavior you want. But this is how Workers of Laravel work. Also when a headless process is failing to normally run, its almost impossible to catch this scenario from Code. Fail = bail. but a process manager is restarting processes. loops whatever....

Again to handle these loop cases is not the responsibility of this Library. Why would I or some other developer tell you how to handle things... This is a Layer/Translator between two entities.

  • Using the exchange specified in the config for non-delayed jobs, but using the apq.default exchange for delayed jobs is a design feature that is going to stay.

Probably Yes, because when using exchange you probably don't know the queue name or the actual destination you message/ Job ends-up. because your publishing against routes. not actual queues...

Keep in mind that workers are separated from publishers and are not the same thing. Separation of concern. I can publish multiple Jobs against a connection on an exchange with different routes and The publisher does not know where the message ends-up and how the queue is called. only a worker knows the queue and witch types of jobs it wants to receive.. in case of topic exchanges. fan-out is also different and ......

So maybe you can see, why this is complicated material. This is also very flexible setup because DevOps can manipulate the flow of a message, not known to your App.

adm-bome commented 3 years ago

As for your second statement...

RabbitMQ has always an default direct Exchange. It's a necessary good thing, under the hood. Every queue created, is by default bound to this exchange. This is by design in RabbitMQ, not by design in this Library.

Again EVERY queue created... is bound to this default direct exchange.

So there is NO issue when it comes to this exchange. Delayed messages go to a temporally created queue for temporally amount of time to end up is the original queue. via the exchange configured in the settings ;)

Denying this default direct exchange his rights or pretend its not there because some weird reason, is the same as... Here you have a nice $600000 car: but without a motor and only one wheel.

adm-bome commented 3 years ago

As for your second statement...

Delaying is a feature that only is known to Laravel. RabbitMQ does not know this concept. To accomplish the delaying strategy in RabbitMQ the DLX method and temporally queue(s) are a solution/workaround. The end result will be the feeling of delaying messages. This solution is something else as, marking a message in a queue to wait.

When you as a devOps admin, create a user an that users has the right to publish to exchanges, but not has the ability to create queues.

Because you use topic exchange Strategy Only workers know the actual queueName to work on. Because when using topic exchange and route you cant predict the internal setup of RabbitMQ.

The Laravel solution and the need to temporally store a message, will fail if you want to store the message on the same exchange as you initially published your message. There are a lot of setups possible. the only constant thing in these kind of situations is the amq.default exchange.

So when you use the feature Delaying in Laravel u also bound to an open amq.default.

To much explanation on this topic.

adm-bome commented 3 years ago

Don't want to be rude..... But......

I'm stopping to comment and give these Explanaitions. Of the HOW and WHY? These discussions have nothing to do with the functionality of this Library, Nor all the possible implementations or outcomes

When yo want to consult on:

Strategies, Concepts, design patterns, (micro)Services, Sync or Async and so on... And:

There are other platforms to ask people to solve your puzzle ;)