laravel / framework

The Laravel Framework.
https://laravel.com
MIT License
32.68k stars 11.05k forks source link

The catch callback attached to a chain of jobs is called more than once #42883

Closed gerardroche closed 2 years ago

gerardroche commented 2 years ago

Description:

The catch callback attached to a chain of jobs is called more than once when using the sync connection. I expected the catch callback to only be called once. It's only called once for the redis and sqs queue connection. This is best illustrated as an example.

        Bus::chain([
            new TestJob(1),
            new TestJob(2),
            new TestJob(3),
            new TestJob(4, exception: true),
            new TestJob(5),
            new TestJob(6),
            new TestJob(7),
        ])->catch(function (Throwable $e): void {
            dump('A job within the chain has failed...');
        })->onConnection('sync')->dispatch();

The above chain will call catch 4 times.

"A job within the chain has failed..."
"A job within the chain has failed..."
"A job within the chain has failed..."
"A job within the chain has failed..."

   Exception

  foobar 4

The further down the exception the more times catch will be called for example if the exception at index 4 catch will be called 5 times, if it's at index 5 it will be called 6 times, etc.

Steps To Reproduce:

Here is an isolated console command to test it:

<?php

declare(strict_types=1);

namespace App\Console\Commands;

use Illuminate\Bus\Queueable;
use Illuminate\Console\Command;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Support\Facades\Bus;

final class TestJob implements ShouldQueue
{
    use Dispatchable;
    use InteractsWithQueue;
    use Queueable;

    public function __construct(protected int $id, protected bool $exception = false)
    {
    }

    public function handle(): void
    {
        dump(__METHOD__." {$this->id}");

        if ($this->exception) {
            throw new \Exception("foobar {$this->id}");
        }
    }
}

final class TestChainCommand extends Command
{
    /**
     * The name and signature of the console command.
     *
     * @var string
     */
    protected $signature = 'sandbox:chain';

    /**
     * The console command description.
     *
     * @var string
     */
    protected $description = 'Test chained job exceptions';

    /**
     * Execute the console command.
     */
    public function handle(): int
    {
        Bus::chain([
            new TestJob(1),
            new TestJob(2),
            new TestJob(3),
            new TestJob(4, exception: true),
            new TestJob(5),
            new TestJob(6),
            new TestJob(7),
        ])->catch(function (\Throwable $e): void {
            dump('A job within the chain has failed...');
        })->onConnection('sync')->dispatch();

        return 0;
    }
}
driesvints commented 2 years ago

This indeed sounds like a bug. Would appreciate any help with figuring this one out.

gerardroche commented 2 years ago

My understanding is that the sync queue handles exceptions directly rather than inserting them into the failed_jobs database table:

After an asynchronous job has exceeded this number of attempts, it will be inserted into the failed_jobs database table. Synchronously dispatched jobs that fail are not stored in this table and their exceptions are immediately handled by the application. -- https://laravel.com/docs/9.x/queues#dealing-with-failed-jobs

So one issue is that I only expect the catch callback to be executed once, but also I don't really expect the exception to bubble up if a catch callback is defined, I expect the exception to caught by the callback and maybe only bubble up if the catch callback rethrows it.

The same issue applies to batched jobs and I've noticed that the finally callback is not called if an exception is thrown either:

        Bus::batch($jobs)
            ->catch(static fn (...$args) => self::onCatch(...$args))
            ->finally(static fn (...$args) => self::onFinally(...$args))
            ->dispatch();

In the above the catch callback will be called for each job and the finally callback won't be called. Again I expect the catch to only be called once, the finally callback to be called, and like the chained jobs I really expect the catch callback to prevent the exception from bubbling up.

Toggling allowFailures makes no difference:

        Bus::batch($jobs)
            ->allowFailures(true)
            ->catch(static fn (...$args) => self::onCatch(...$args))
            ->finally(static fn (...$args) => self::onFinally(...$args))
            ->dispatch();
rodrigopedra commented 2 years ago

Sent PR #42950 to address this issue

gerardroche commented 2 years ago

One of my tests failed with the recent release.

I've narrowed the issue down to the finally method on a batch of jobs is not being called if there is an exception in one of the jobs and the exception occurs before the last job.

For example in the following the finally method will not be invoked. However, if you comment out job 5 and 6 the finally will be called. Should we open up a separate issue for this?


        $jobs = [
            new TestBatchJob(1),
            new TestBatchJob(2),
            new TestBatchJob(3),
            new TestBatchJob(4, exception: true),
            new TestBatchJob(5),
            new TestBatchJob(6),
        ];

        Bus::batch($jobs)
            ->allowFailures(true)
            ->catch(function (): void {
                dump('A job within the chain has failed...');
            })
            ->finally(function (): void {
                dump('Batch finally called...');
            })
            ->onConnection('sync')->dispatch();

Here is the TestBatchJob:

final class TestBatchJob implements ShouldQueue
{
    use Batchable;
    use Dispatchable;
    use InteractsWithQueue;
    use Queueable;

    public function __construct(protected int $id, protected bool $exception = false)
    {
    }

    public function handle(): void
    {
        dump(__METHOD__." {$this->id}");

        if ($this->exception) {
            dump(__METHOD__." {$this->id} throwing exception");
            throw new \Exception("foobar {$this->id}");
        }
    }
}

I've tested the new fixes on more complicated dispatches. The new fix seems to resolve all the other issues. There are no longer duplicate calls to the catch callbacks and the exceptions no longer bubble up. :sparkles: :rocket:


        Bus::chain([
            new TestJob(1),
            new TestJob(2),
            new TestJob(3),
            // new TestJob(4, exception: true),
            function (): void {
                Bus::batch([
                        new TestBatchJob(1),
                        new TestBatchJob(2),
                        new TestBatchJob(3),
                        new TestBatchJob(4, exception: true),
                        // new TestBatchJob(5),
                        // new TestBatchJob(6),
                    ])
                    ->allowFailures(true)
                    ->catch(function (): void {
                        dump('A job within the BATCH has failed...');
                    })
                    ->finally(function (): void {
                        dump('The batch FINALLY called...');
                    })
                    ->onConnection('sync')->dispatch();
            },
        ])->catch(function (\Throwable $e): void {
            dump('A job within the CHAIN has failed...');
        })->onConnection('sync')->dispatch();
gerardroche commented 2 years ago

I just noticed something else, if there is an exception in a batch job, is it rolling back the DB after the finally callback?

rodrigopedra commented 2 years ago

I've tested the first snippet you sent here ( https://github.com/laravel/framework/issues/42883#issuecomment-1183473224 ) without the changes from #42950 and the finally callback is not called when using the sync connection AND allowing failures.

When not allowing failures it works as expected

After first inspection it seems to be a different issue, specific to the command bus, but my feeling is the fix might be similar, but I won't have time to further until the weekend.

gerardroche commented 2 years ago

Cool. Note that changing allowFailures doesn't make any difference in my testing. The finally method is called fine if no exceptions are throw. It's also called if the exception occurs in the last job in the batch. It's not called if the exception occurs before the last job.

There may be one other separate issue too: it looks like the finally call is wrapped in a db transaction so when an exception occurs any changes to db in the finally callback are rolled back. This doesn't happen in other connections like redis.

driesvints commented 2 years ago

Let's create separate issues for the other bugs. Thanks all!

driesvints commented 2 years ago

Does anyone have any ideas on how to resend in https://github.com/laravel/framework/pull/42950 without the breaking change?

driesvints commented 2 years ago

Hi all, it looks like this cannot be solved without introducing a breaking change. Feel free to PR master with the above PR.

@bramdevries feel free to provide a comment on that PR if you want to advocate against a breaking change.

amcsi commented 3 months ago

Any good workaround until then?

rodrigopedra commented 3 months ago

@amcsi I didn't test it, but maybe using the once() helper introduced on Laravel 11 might help?

https://laravel.com/docs/11.x/helpers#method-once

amcsi commented 3 months ago

@rodrigopedra I don't think that would be good. Wouldn't that cache the function call for the whole duration of the queue worker? So if the failure callback is called, and then a different job chain runs and fails, it would not call the callback to handle the failure.

rodrigopedra commented 3 months ago

@amcsi I was thinking about something like this:

Bus::chain([
    new ProcessPodcast,
    new OptimizePodcast,
    new ReleasePodcast,
])->catch(function (Throwable $e) {
    return once(function () use ($e) {
        // logic here
    });
})->dispatch();

Then it would only cache upon first usage.

If an exception is thrown, the chain should stop processing, right?

From my understanding about this issue, the callback was called immediately after first failure, and then again when bailing the batch, which using once() like this might at least prevent the catch's callback to have its logic executed twice.

amcsi commented 3 months ago

@rodrigopedra except that I dispatch chain both within a daemon queue worker, and also Octane, where multiple "requests" are done in the same process/CLI run. Or are you saying that both daemon queue workers and Octane clear the once() cache between requests and queue items?

rodrigopedra commented 3 months ago

The once() should be instance-aware... in the sense that if you use it in different instances of the same class, it would be triggered once per instance.

As Bus::chain() creates a new instance, in theory, it should work.

Of course, it is best to test it before using it in production.