kitar / laravel-dynamodb

A DynamoDB based Eloquent model and Query builder for Laravel.
MIT License
179 stars 27 forks source link

Deserializing model in event constructor fails for a queued event listener? #21

Open localpath opened 2 years ago

localpath commented 2 years ago

Hello. Ty for the great package. Using it I've come across a serialization issue when using queued event listeners. Seems like it's trying to connect on a null connection? Like it's reverting to the default database driver? Using the model created event and listening to that event with a queued event listener fails. I can toArray() or pass the DynamoDB partition keys to do a lookup but seems like laravel should be able to hydrate no matter the driver right?

Laravel 9.x laravel-dynamodb 1.x

[2022-06-23 01:20:15] local.ERROR: Call to a member function prepare() on null {"exception":"[object] (Error(code: 0): Call to a member function prepare() on null at /var/www/unified-api/vendor/laravel/framework/src/Illuminate/Database/Connection.php:396)
[stacktrace]
#0 /var/www/unified-api/vendor/laravel/framework/src/Illuminate/Database/Connection.php(735): Illuminate\\Database\\Connection->Illuminate\\Database\\{closure}('select * from \"...', Array)
#1 /var/www/unified-api/vendor/laravel/framework/src/Illuminate/Database/Connection.php(702): Illuminate\\Database\\Connection->runQueryCallback('select * from \"...', Array, Object(Closure))
#2 /var/www/unified-api/vendor/laravel/framework/src/Illuminate/Database/Connection.php(404): Illuminate\\Database\\Connection->run('select * from \"...', Array, Object(Closure))
#3 /var/www/unified-api/vendor/laravel/framework/src/Illuminate/Database/Query/Builder.php(2630): Illuminate\\Database\\Connection->select('select * from \"...', Array, false)
#4 /var/www/unified-api/vendor/laravel/framework/src/Illuminate/Database/Query/Builder.php(2618): Illuminate\\Database\\Query\\Builder->runSelect()
#5 /var/www/unified-api/vendor/laravel/framework/src/Illuminate/Database/Query/Builder.php(3154): Illuminate\\Database\\Query\\Builder->Illuminate\\Database\\Query\\{closure}()
#6 /var/www/unified-api/vendor/laravel/framework/src/Illuminate/Database/Query/Builder.php(2619): Illuminate\\Database\\Query\\Builder->onceWithColumns(Array, Object(Closure))
#7 /var/www/unified-api/vendor/laravel/framework/src/Illuminate/Database/Eloquent/Builder.php(698): Illuminate\\Database\\Query\\Builder->get(Array)
#8 /var/www/unified-api/vendor/laravel/framework/src/Illuminate/Database/Eloquent/Builder.php(682): Illuminate\\Database\\Eloquent\\Builder->getModels(Array)
#9 /var/www/unified-api/vendor/laravel/framework/src/Illuminate/Queue/SerializesAndRestoresModelIdentifiers.php(74): Illuminate\\Database\\Eloquent\\Builder->get()
#10 /var/www/unified-api/vendor/laravel/framework/src/Illuminate/Queue/SerializesAndRestoresModelIdentifiers.php(56): App\\Events\\SystemEvents\\SystemEventCreated->restoreCollection(Object(Illuminate\\Contracts\\Database\\ModelIdentifier))
#11 /var/www/unified-api/vendor/laravel/framework/src/Illuminate/Queue/SerializesModels.php(126): App\\Events\\SystemEvents\\SystemEventCreated->getRestoredPropertyValue(Object(Illuminate\\Contracts\\Database\\ModelIdentifier))
#12 [internal function]: App\\Events\\SystemEvents\\SystemEventCreated->__unserialize(Array)
#13 /var/www/unified-api/vendor/laravel/framework/src/Illuminate/Queue/CallQueuedHandler.php(96): unserialize('O:36:\"Illuminat...')
#14 /var/www/unified-api/vendor/laravel/framework/src/Illuminate/Queue/CallQueuedHandler.php(257): Illuminate\\Queue\\CallQueuedHandler->getCommand(Array)
#15 /var/www/unified-api/vendor/laravel/framework/src/Illuminate/Queue/Jobs/Job.php(213): Illuminate\\Queue\\CallQueuedHandler->failed(Array, Object(Error), 'c02f0cf2-5f3f-4...')
#16 /var/www/unified-api/vendor/laravel/framework/src/Illuminate/Queue/Jobs/Job.php(192): Illuminate\\Queue\\Jobs\\Job->failed(Object(Error))
#17 /var/www/unified-api/vendor/laravel/framework/src/Illuminate/Queue/Worker.php(581): Illuminate\\Queue\\Jobs\\Job->fail(Object(Error))
#18 /var/www/unified-api/vendor/laravel/framework/src/Illuminate/Queue/Worker.php(527): Illuminate\\Queue\\Worker->failJob(Object(Illuminate\\Queue\\Jobs\\DatabaseJob), Object(Error))
#19 /var/www/unified-api/vendor/laravel/framework/src/Illuminate/Queue/Worker.php(455): Illuminate\\Queue\\Worker->markJobAsFailedIfWillExceedMaxAttempts('database', Object(Illuminate\\Queue\\Jobs\\DatabaseJob), 1, Object(Error))
#20 /var/www/unified-api/vendor/laravel/framework/src/Illuminate/Queue/Worker.php(432): Illuminate\\Queue\\Worker->handleJobException('database', Object(Illuminate\\Queue\\Jobs\\DatabaseJob), Object(Illuminate\\Queue\\WorkerOptions), Object(Error))
#21 /var/www/unified-api/vendor/laravel/framework/src/Illuminate/Queue/Worker.php(378): Illuminate\\Queue\\Worker->process('database', Object(Illuminate\\Queue\\Jobs\\DatabaseJob), Object(Illuminate\\Queue\\WorkerOptions))
#22 /var/www/unified-api/vendor/laravel/framework/src/Illuminate/Queue/Worker.php(329): Illuminate\\Queue\\Worker->runJob(Object(Illuminate\\Queue\\Jobs\\DatabaseJob), 'database', Object(Illuminate\\Queue\\WorkerOptions))
#23 /var/www/unified-api/vendor/laravel/framework/src/Illuminate/Queue/Console/WorkCommand.php(130): Illuminate\\Queue\\Worker->runNextJob('database', 'unified-default...', Object(Illuminate\\Queue\\WorkerOptions))
#24 /var/www/unified-api/vendor/laravel/framework/src/Illuminate/Queue/Console/WorkCommand.php(114): Illuminate\\Queue\\Console\\WorkCommand->runWorker('database', 'unified-default...')
#25 /var/www/unified-api/vendor/laravel/framework/src/Illuminate/Container/BoundMethod.php(36): Illuminate\\Queue\\Console\\WorkCommand->handle()
#26 /var/www/unified-api/vendor/laravel/framework/src/Illuminate/Container/Util.php(41): Illuminate\\Container\\BoundMethod::Illuminate\\Container\\{closure}()
#27 /var/www/unified-api/vendor/laravel/framework/src/Illuminate/Container/BoundMethod.php(93): Illuminate\\Container\\Util::unwrapIfClosure(Object(Closure))
#28 /var/www/unified-api/vendor/laravel/framework/src/Illuminate/Container/BoundMethod.php(37): Illuminate\\Container\\BoundMethod::callBoundMethod(Object(Illuminate\\Foundation\\Application), Array, Object(Closure))
#29 /var/www/unified-api/vendor/laravel/framework/src/Illuminate/Container/Container.php(651): Illuminate\\Container\\BoundMethod::call(Object(Illuminate\\Foundation\\Application), Array, Array, NULL)
#30 /var/www/unified-api/vendor/laravel/framework/src/Illuminate/Console/Command.php(136): Illuminate\\Container\\Container->call(Array)
#31 /var/www/unified-api/vendor/symfony/console/Command/Command.php(291): Illuminate\\Console\\Command->execute(Object(Symfony\\Component\\Console\\Input\\ArgvInput), Object(Illuminate\\Console\\OutputStyle))
#32 /var/www/unified-api/vendor/laravel/framework/src/Illuminate/Console/Command.php(121): Symfony\\Component\\Console\\Command\\Command->run(Object(Symfony\\Component\\Console\\Input\\ArgvInput), Object(Illuminate\\Console\\OutputStyle))
#33 /var/www/unified-api/vendor/symfony/console/Application.php(998): Illuminate\\Console\\Command->run(Object(Symfony\\Component\\Console\\Input\\ArgvInput), Object(Symfony\\Component\\Console\\Output\\ConsoleOutput))
#34 /var/www/unified-api/vendor/symfony/console/Application.php(299): Symfony\\Component\\Console\\Application->doRunCommand(Object(Illuminate\\Queue\\Console\\WorkCommand), Object(Symfony\\Component\\Console\\Input\\ArgvInput), Object(Symfony\\Component\\Console\\Output\\ConsoleOutput))
#35 /var/www/unified-api/vendor/symfony/console/Application.php(171): Symfony\\Component\\Console\\Application->doRun(Object(Symfony\\Component\\Console\\Input\\ArgvInput), Object(Symfony\\Component\\Console\\Output\\ConsoleOutput))
#36 /var/www/unified-api/vendor/laravel/framework/src/Illuminate/Console/Application.php(102): Symfony\\Component\\Console\\Application->run(Object(Symfony\\Component\\Console\\Input\\ArgvInput), Object(Symfony\\Component\\Console\\Output\\ConsoleOutput))
#37 /var/www/unified-api/vendor/laravel/framework/src/Illuminate/Foundation/Console/Kernel.php(129): Illuminate\\Console\\Application->run(Object(Symfony\\Component\\Console\\Input\\ArgvInput), Object(Symfony\\Component\\Console\\Output\\ConsoleOutput))
#38 /var/www/unified-api/artisan(37): Illuminate\\Foundation\\Console\\Kernel->handle(Object(Symfony\\Component\\Console\\Input\\ArgvInput), Object(Symfony\\Component\\Console\\Output\\ConsoleOutput))
#39 {main}
"} 
kitar commented 2 years ago

@localpathcomp Hi! Thank you for the report :) I've tried following steps, but it seems I can't reproduce the problem.

  1. Define the model with a created event.
namespace App\Models;

use Illuminate\Support\Facades\Log;
use Kitar\Dynamodb\Model\Model as DynamodbModel;

use function Illuminate\Events\queueable;

class Model extends DynamodbModel
{
    protected $table = 'my-table';
    protected $primaryKey = 'PK';
    protected $sortKey = 'SK';
    protected $fillable = ['PK', 'SK'];

    protected static function booted()
    {
        static::created(queueable(function ($item) {
            Log::info("Model created (PK:{$item->PK}, SK:{$item->SK})");
        }));
    }
}
  1. Use Redis as a queue connection.
QUEUE_CONNECTION=redis
  1. Dispatch the event.
use App\Models\Model;

Model::create(['PK' => 'PK1', 'SK' => 'SK1']);
  1. Run the queue.
php artisan queue:work

And then, job was executed with no error and log was successfully created.

[2022-06-25 05:28:02] local.INFO: Model created (PK:PK1, SK:SK1)

Could you share a bit more details for reproducing the problem?

localpath commented 2 years ago

Hi Sure thing. Here's the two models I use. I can try again as well with more debugging in place if it would help. I was thinking maybe its calling get default connection and I have a multi tenant setup?

Queue connection = database

<?php

namespace App\Models;

use Illuminate\Support\Str;
use Kitar\Dynamodb\Model\Model;

class SystemEvent extends Model
{
    /**
     * The database connection that should be used by the model.
     *
     * @var string
     */
    protected $connection = 'dynamodb';

    /**
     * The table associated with the model.
     *
     * @var string
     */
    protected $table = 'system_events';

    /**
     * The Partition Key.
     *
     * @var string
     */
    protected $primaryKey = 'PK';

    /**
     * The Sort Key.
     *
     * @var string|null
     */
    protected $sortKey = 'SK';

    /**
     * The attributes that are mass assignable.
     *
     * @var array
     */
    protected $fillable = [
        'id',
        'tenant_uuid',
        'event_data',
        'event_origin',
        'event_type',
    ];

    /**
     * The attributes that should be hidden for arrays.
     *
     * @var array
     */
    protected $hidden = [
        'PK',
        'SK',
        'GSI1PK',
        'GSI1SK',
        'GSI2PK',
        'GSI2SK',
        'TYPE',
    ];

    /**
     * The attributes that should be cast.
     *
     * @var array
     */
    protected $casts = [
        'event_data' => 'array',
    ];

    protected static function booted()
    {
        static::creating(function ($systemEvent) {
            if (empty($systemEvent->event_type)) {
                throw new \InvalidArgumentException();
            }

            $uuid = (string) Str::uuid();

            // index attributes
            $systemEvent->PK = "EVENT#{$uuid}";
            $systemEvent->SK = "EVENT#{$uuid}";
            $systemEvent->GSI1PK = "EVENT_TYPE#{$systemEvent->event_type}";
            $systemEvent->GSI1SK = "EVENT#{$uuid}";
            if (! empty($systemEvent->tenant_uuid)) {
                $systemEvent->GSI2PK = "TENANT#{$systemEvent->tenant_uuid}";
                $systemEvent->GSI2SK = "EVENT#{$uuid}";
            }
            $systemEvent->TYPE = self::class;

            // item attributes
            $systemEvent->id = $uuid;
        });
    }

    public static function find($id)
    {
        return parent::find(['PK' => "EVENT#{$id}", 'SK' => "EVENT#{$id}"]);
    }

    public static function extractLastEvaluatedKey($item)
    {
        if (empty($item)) {
            return null;
        }

        return $item->meta()['LastEvaluatedKey'] ?? null;
    }
}
<?php

namespace App\Models;

use Kitar\Dynamodb\Model\Model;

class WebhookLog extends Model
{
    /**
     * The database connection that should be used by the model.
     *
     * @var string
     */
    protected $connection = 'dynamodb';

    /**
     * The table associated with the model.
     *
     * @var string
     */
    protected $table = 'webhook_logs';

    /**
     * The Partition Key.
     *
     * @var string
     */
    protected $primaryKey = 'PK';

    /**
     * The Sort Key.
     *
     * @var string|null
     */
    protected $sortKey = 'SK';

    /**
     * The attributes that are mass assignable.
     *
     * @var array
     */
    protected $fillable = [
        'id',
        'event_id',
        'application_id',
        'attempt',
        'headers',
        'http_verb',
        'webhook_url',
        'payload',
        'response',
        'error_type',
        'error_message',
    ];

    /**
     * The attributes that should be hidden for arrays.
     *
     * @var array
     */
    protected $hidden = [
        'PK',
        'SK',
        'GSI1PK',
        'GSI1SK',
        'GSI2PK',
        'GSI2SK',
        'TYPE',
    ];

    /**
     * The attributes that should be cast.
     *
     * @var array
     */
    protected $casts = [
        'payload' => 'array',
        'response' => 'array',
        'headers' => 'array',
    ];

    protected static function booted()
    {
        static::creating(function ($webhookLog) {
            if (empty($webhookLog->id)) {
                throw new \InvalidArgumentException();
            }
            if (empty($webhookLog->application_id)) {
                throw new \InvalidArgumentException();
            }
            if (empty($webhookLog->event_id)) {
                throw new \InvalidArgumentException();
            }

            // index attributes
            $webhookLog->PK = "WEBHOOK#{$webhookLog->id}";
            $webhookLog->SK = "WEBHOOK#{$webhookLog->id}";
            $webhookLog->GSI1PK = "APPLICATION_ID#{$webhookLog->application_id}";
            $webhookLog->GSI1SK = "WEBHOOK#{$webhookLog->id}";
            $webhookLog->GSI2PK = "EVENT#{$webhookLog->event_id}";
            $webhookLog->GSI2SK = "WEBHOOK#{$webhookLog->id}";
            $webhookLog->TYPE = self::class;
        });
    }

    public static function find($id)
    {
        return parent::find(['PK' => "WEBHOOK#{$id}", 'SK' => "WEBHOOK#{$id}"]);
    }

    public static function extractLastEvaluatedKey($item)
    {
        if (empty($item)) {
            return null;
        }

        return $item->meta()['LastEvaluatedKey'] ?? null;
    }
}
kitar commented 2 years ago

Thanks for sharing! The code looks interesting. However, I can't reproduce the problem yet. Let me clarify a few things.

If you dump the $connection variable inside the queue (eg. $model->getConnectionName()), and if it is dynamodb, there is likely to be a different problem than the connection.

localpath commented 2 years ago

Hi @kitar. yes we are using multiple DB connections.

Are you using multiple DB connections? (eg. your default DB_CONNECTION is not dynamodb, but the models given above are using dynamodb connection)

I think thats what it is. On my dynamo DB model do I need to explicitly define the getConnectionName? I bet thats what it is bc the connection is returning null.

Do you have a problem interacting with DynamoDB even outside the queue?

The model works perfectly outside of the queue.

What are you trying to do in SystemEventCreated job? (There may be some operations causing the laravel-dynamodb bug in that job somewhere like restoreCollection())

The event class. I wanted to inject the model instance and not the plain array.

<?php

namespace App\Events\SystemEvents;

use App\Models\SystemEvent;
use Illuminate\Broadcasting\Channel;
use Illuminate\Broadcasting\InteractsWithSockets;
use Illuminate\Broadcasting\PresenceChannel;
use Illuminate\Broadcasting\PrivateChannel;
use Illuminate\Contracts\Broadcasting\ShouldBroadcast;
use Illuminate\Foundation\Events\Dispatchable;
use Illuminate\Queue\SerializesModels;

class SystemEventCreated
{
    use Dispatchable, InteractsWithSockets, SerializesModels;

    /**
     * Create a new event instance.
     *
     * @param  array  $systemEvent
     * @return void
     */
    public function __construct(
        public array $systemEvent,
    ) {
    }
}

The listener for example

<?php

namespace App\Listeners\Webhooks;

use App\Events\SystemEvents\SystemEventCreated;
use App\Models\WebhookSubscriber;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Database\Eloquent\Builder;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Support\Facades\Cache;
use Spatie\WebhookServer\WebhookCall;

class SendSystemEventNotifications implements ShouldQueue
{
    /**
     * Create the event listener.
     *
     * @return void
     */
    public function __construct()
    {
        //
    }

    /**
     * Handle the event.
     *
     * @param  \App\Events\SystemEvents\SystemEventCreated  $event
     * @return void
     */
    public function handle(SystemEventCreated $event)
    {
        // use the model off the event class for example
    }
}
kitar commented 1 year ago

@localpath Sorry for the late reply.

Thanks for the information and you were indeed right. However, there are several issues involved that make it difficult to resolve immediately. Please pass keys or array instead of a model instance to the listener at this time.

Notes on this issue:

localpath commented 1 year ago

@kitar Hi. Thanks for looking at it. Yes that makes perfect sense. Easy enough to get around and just rehydrate models later. Thanks!!