Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Deserializing model in event constructor fails for a queued event listener? #21

Open
localpath opened this issue Jun 23, 2022 · 7 comments

Comments

@localpath
Copy link
Contributor

Hello. Ty for the great package. Using it I've come across a serialization issue when using queued event listeners. Seems like it's trying to connect on a null connection? Like it's reverting to the default database driver? Using the model created event and listening to that event with a queued event listener fails. I can toArray() or pass the DynamoDB partition keys to do a lookup but seems like laravel should be able to hydrate no matter the driver right?

Laravel 9.x
laravel-dynamodb 1.x

[2022-06-23 01:20:15] local.ERROR: Call to a member function prepare() on null {"exception":"[object] (Error(code: 0): Call to a member function prepare() on null at /var/www/unified-api/vendor/laravel/framework/src/Illuminate/Database/Connection.php:396)
[stacktrace]
#0 /var/www/unified-api/vendor/laravel/framework/src/Illuminate/Database/Connection.php(735): Illuminate\\Database\\Connection->Illuminate\\Database\\{closure}('select * from \"...', Array)
#1 /var/www/unified-api/vendor/laravel/framework/src/Illuminate/Database/Connection.php(702): Illuminate\\Database\\Connection->runQueryCallback('select * from \"...', Array, Object(Closure))
#2 /var/www/unified-api/vendor/laravel/framework/src/Illuminate/Database/Connection.php(404): Illuminate\\Database\\Connection->run('select * from \"...', Array, Object(Closure))
#3 /var/www/unified-api/vendor/laravel/framework/src/Illuminate/Database/Query/Builder.php(2630): Illuminate\\Database\\Connection->select('select * from \"...', Array, false)
#4 /var/www/unified-api/vendor/laravel/framework/src/Illuminate/Database/Query/Builder.php(2618): Illuminate\\Database\\Query\\Builder->runSelect()
#5 /var/www/unified-api/vendor/laravel/framework/src/Illuminate/Database/Query/Builder.php(3154): Illuminate\\Database\\Query\\Builder->Illuminate\\Database\\Query\\{closure}()
#6 /var/www/unified-api/vendor/laravel/framework/src/Illuminate/Database/Query/Builder.php(2619): Illuminate\\Database\\Query\\Builder->onceWithColumns(Array, Object(Closure))
#7 /var/www/unified-api/vendor/laravel/framework/src/Illuminate/Database/Eloquent/Builder.php(698): Illuminate\\Database\\Query\\Builder->get(Array)
#8 /var/www/unified-api/vendor/laravel/framework/src/Illuminate/Database/Eloquent/Builder.php(682): Illuminate\\Database\\Eloquent\\Builder->getModels(Array)
#9 /var/www/unified-api/vendor/laravel/framework/src/Illuminate/Queue/SerializesAndRestoresModelIdentifiers.php(74): Illuminate\\Database\\Eloquent\\Builder->get()
#10 /var/www/unified-api/vendor/laravel/framework/src/Illuminate/Queue/SerializesAndRestoresModelIdentifiers.php(56): App\\Events\\SystemEvents\\SystemEventCreated->restoreCollection(Object(Illuminate\\Contracts\\Database\\ModelIdentifier))
#11 /var/www/unified-api/vendor/laravel/framework/src/Illuminate/Queue/SerializesModels.php(126): App\\Events\\SystemEvents\\SystemEventCreated->getRestoredPropertyValue(Object(Illuminate\\Contracts\\Database\\ModelIdentifier))
#12 [internal function]: App\\Events\\SystemEvents\\SystemEventCreated->__unserialize(Array)
#13 /var/www/unified-api/vendor/laravel/framework/src/Illuminate/Queue/CallQueuedHandler.php(96): unserialize('O:36:\"Illuminat...')
#14 /var/www/unified-api/vendor/laravel/framework/src/Illuminate/Queue/CallQueuedHandler.php(257): Illuminate\\Queue\\CallQueuedHandler->getCommand(Array)
#15 /var/www/unified-api/vendor/laravel/framework/src/Illuminate/Queue/Jobs/Job.php(213): Illuminate\\Queue\\CallQueuedHandler->failed(Array, Object(Error), 'c02f0cf2-5f3f-4...')
#16 /var/www/unified-api/vendor/laravel/framework/src/Illuminate/Queue/Jobs/Job.php(192): Illuminate\\Queue\\Jobs\\Job->failed(Object(Error))
#17 /var/www/unified-api/vendor/laravel/framework/src/Illuminate/Queue/Worker.php(581): Illuminate\\Queue\\Jobs\\Job->fail(Object(Error))
#18 /var/www/unified-api/vendor/laravel/framework/src/Illuminate/Queue/Worker.php(527): Illuminate\\Queue\\Worker->failJob(Object(Illuminate\\Queue\\Jobs\\DatabaseJob), Object(Error))
#19 /var/www/unified-api/vendor/laravel/framework/src/Illuminate/Queue/Worker.php(455): Illuminate\\Queue\\Worker->markJobAsFailedIfWillExceedMaxAttempts('database', Object(Illuminate\\Queue\\Jobs\\DatabaseJob), 1, Object(Error))
#20 /var/www/unified-api/vendor/laravel/framework/src/Illuminate/Queue/Worker.php(432): Illuminate\\Queue\\Worker->handleJobException('database', Object(Illuminate\\Queue\\Jobs\\DatabaseJob), Object(Illuminate\\Queue\\WorkerOptions), Object(Error))
#21 /var/www/unified-api/vendor/laravel/framework/src/Illuminate/Queue/Worker.php(378): Illuminate\\Queue\\Worker->process('database', Object(Illuminate\\Queue\\Jobs\\DatabaseJob), Object(Illuminate\\Queue\\WorkerOptions))
#22 /var/www/unified-api/vendor/laravel/framework/src/Illuminate/Queue/Worker.php(329): Illuminate\\Queue\\Worker->runJob(Object(Illuminate\\Queue\\Jobs\\DatabaseJob), 'database', Object(Illuminate\\Queue\\WorkerOptions))
#23 /var/www/unified-api/vendor/laravel/framework/src/Illuminate/Queue/Console/WorkCommand.php(130): Illuminate\\Queue\\Worker->runNextJob('database', 'unified-default...', Object(Illuminate\\Queue\\WorkerOptions))
#24 /var/www/unified-api/vendor/laravel/framework/src/Illuminate/Queue/Console/WorkCommand.php(114): Illuminate\\Queue\\Console\\WorkCommand->runWorker('database', 'unified-default...')
#25 /var/www/unified-api/vendor/laravel/framework/src/Illuminate/Container/BoundMethod.php(36): Illuminate\\Queue\\Console\\WorkCommand->handle()
#26 /var/www/unified-api/vendor/laravel/framework/src/Illuminate/Container/Util.php(41): Illuminate\\Container\\BoundMethod::Illuminate\\Container\\{closure}()
#27 /var/www/unified-api/vendor/laravel/framework/src/Illuminate/Container/BoundMethod.php(93): Illuminate\\Container\\Util::unwrapIfClosure(Object(Closure))
#28 /var/www/unified-api/vendor/laravel/framework/src/Illuminate/Container/BoundMethod.php(37): Illuminate\\Container\\BoundMethod::callBoundMethod(Object(Illuminate\\Foundation\\Application), Array, Object(Closure))
#29 /var/www/unified-api/vendor/laravel/framework/src/Illuminate/Container/Container.php(651): Illuminate\\Container\\BoundMethod::call(Object(Illuminate\\Foundation\\Application), Array, Array, NULL)
#30 /var/www/unified-api/vendor/laravel/framework/src/Illuminate/Console/Command.php(136): Illuminate\\Container\\Container->call(Array)
#31 /var/www/unified-api/vendor/symfony/console/Command/Command.php(291): Illuminate\\Console\\Command->execute(Object(Symfony\\Component\\Console\\Input\\ArgvInput), Object(Illuminate\\Console\\OutputStyle))
#32 /var/www/unified-api/vendor/laravel/framework/src/Illuminate/Console/Command.php(121): Symfony\\Component\\Console\\Command\\Command->run(Object(Symfony\\Component\\Console\\Input\\ArgvInput), Object(Illuminate\\Console\\OutputStyle))
#33 /var/www/unified-api/vendor/symfony/console/Application.php(998): Illuminate\\Console\\Command->run(Object(Symfony\\Component\\Console\\Input\\ArgvInput), Object(Symfony\\Component\\Console\\Output\\ConsoleOutput))
#34 /var/www/unified-api/vendor/symfony/console/Application.php(299): Symfony\\Component\\Console\\Application->doRunCommand(Object(Illuminate\\Queue\\Console\\WorkCommand), Object(Symfony\\Component\\Console\\Input\\ArgvInput), Object(Symfony\\Component\\Console\\Output\\ConsoleOutput))
#35 /var/www/unified-api/vendor/symfony/console/Application.php(171): Symfony\\Component\\Console\\Application->doRun(Object(Symfony\\Component\\Console\\Input\\ArgvInput), Object(Symfony\\Component\\Console\\Output\\ConsoleOutput))
#36 /var/www/unified-api/vendor/laravel/framework/src/Illuminate/Console/Application.php(102): Symfony\\Component\\Console\\Application->run(Object(Symfony\\Component\\Console\\Input\\ArgvInput), Object(Symfony\\Component\\Console\\Output\\ConsoleOutput))
#37 /var/www/unified-api/vendor/laravel/framework/src/Illuminate/Foundation/Console/Kernel.php(129): Illuminate\\Console\\Application->run(Object(Symfony\\Component\\Console\\Input\\ArgvInput), Object(Symfony\\Component\\Console\\Output\\ConsoleOutput))
#38 /var/www/unified-api/artisan(37): Illuminate\\Foundation\\Console\\Kernel->handle(Object(Symfony\\Component\\Console\\Input\\ArgvInput), Object(Symfony\\Component\\Console\\Output\\ConsoleOutput))
#39 {main}
"} 
@kitar
Copy link
Owner

kitar commented Jun 25, 2022

@localpathcomp
Hi! Thank you for the report :)
I've tried following steps, but it seems I can't reproduce the problem.

  1. Define the model with a created event.
namespace App\Models;

use Illuminate\Support\Facades\Log;
use Kitar\Dynamodb\Model\Model as DynamodbModel;

use function Illuminate\Events\queueable;

class Model extends DynamodbModel
{
    protected $table = 'my-table';
    protected $primaryKey = 'PK';
    protected $sortKey = 'SK';
    protected $fillable = ['PK', 'SK'];

    protected static function booted()
    {
        static::created(queueable(function ($item) {
            Log::info("Model created (PK:{$item->PK}, SK:{$item->SK})");
        }));
    }
}
  1. Use Redis as a queue connection.
QUEUE_CONNECTION=redis
  1. Dispatch the event.
use App\Models\Model;

Model::create(['PK' => 'PK1', 'SK' => 'SK1']);
  1. Run the queue.
php artisan queue:work

And then, job was executed with no error and log was successfully created.

[2022-06-25 05:28:02] local.INFO: Model created (PK:PK1, SK:SK1)

Could you share a bit more details for reproducing the problem?

@localpath
Copy link
Contributor Author

Hi Sure thing. Here's the two models I use. I can try again as well with more debugging in place if it would help. I was thinking maybe its calling get default connection and I have a multi tenant setup?

Queue connection = database

<?php

namespace App\Models;

use Illuminate\Support\Str;
use Kitar\Dynamodb\Model\Model;

class SystemEvent extends Model
{
    /**
     * The database connection that should be used by the model.
     *
     * @var string
     */
    protected $connection = 'dynamodb';

    /**
     * The table associated with the model.
     *
     * @var string
     */
    protected $table = 'system_events';

    /**
     * The Partition Key.
     *
     * @var string
     */
    protected $primaryKey = 'PK';

    /**
     * The Sort Key.
     *
     * @var string|null
     */
    protected $sortKey = 'SK';

    /**
     * The attributes that are mass assignable.
     *
     * @var array
     */
    protected $fillable = [
        'id',
        'tenant_uuid',
        'event_data',
        'event_origin',
        'event_type',
    ];

    /**
     * The attributes that should be hidden for arrays.
     *
     * @var array
     */
    protected $hidden = [
        'PK',
        'SK',
        'GSI1PK',
        'GSI1SK',
        'GSI2PK',
        'GSI2SK',
        'TYPE',
    ];

    /**
     * The attributes that should be cast.
     *
     * @var array
     */
    protected $casts = [
        'event_data' => 'array',
    ];

    protected static function booted()
    {
        static::creating(function ($systemEvent) {
            if (empty($systemEvent->event_type)) {
                throw new \InvalidArgumentException();
            }

            $uuid = (string) Str::uuid();

            // index attributes
            $systemEvent->PK = "EVENT#{$uuid}";
            $systemEvent->SK = "EVENT#{$uuid}";
            $systemEvent->GSI1PK = "EVENT_TYPE#{$systemEvent->event_type}";
            $systemEvent->GSI1SK = "EVENT#{$uuid}";
            if (! empty($systemEvent->tenant_uuid)) {
                $systemEvent->GSI2PK = "TENANT#{$systemEvent->tenant_uuid}";
                $systemEvent->GSI2SK = "EVENT#{$uuid}";
            }
            $systemEvent->TYPE = self::class;

            // item attributes
            $systemEvent->id = $uuid;
        });
    }

    public static function find($id)
    {
        return parent::find(['PK' => "EVENT#{$id}", 'SK' => "EVENT#{$id}"]);
    }

    public static function extractLastEvaluatedKey($item)
    {
        if (empty($item)) {
            return null;
        }

        return $item->meta()['LastEvaluatedKey'] ?? null;
    }
}
<?php

namespace App\Models;

use Kitar\Dynamodb\Model\Model;

class WebhookLog extends Model
{
    /**
     * The database connection that should be used by the model.
     *
     * @var string
     */
    protected $connection = 'dynamodb';

    /**
     * The table associated with the model.
     *
     * @var string
     */
    protected $table = 'webhook_logs';

    /**
     * The Partition Key.
     *
     * @var string
     */
    protected $primaryKey = 'PK';

    /**
     * The Sort Key.
     *
     * @var string|null
     */
    protected $sortKey = 'SK';

    /**
     * The attributes that are mass assignable.
     *
     * @var array
     */
    protected $fillable = [
        'id',
        'event_id',
        'application_id',
        'attempt',
        'headers',
        'http_verb',
        'webhook_url',
        'payload',
        'response',
        'error_type',
        'error_message',
    ];

    /**
     * The attributes that should be hidden for arrays.
     *
     * @var array
     */
    protected $hidden = [
        'PK',
        'SK',
        'GSI1PK',
        'GSI1SK',
        'GSI2PK',
        'GSI2SK',
        'TYPE',
    ];

    /**
     * The attributes that should be cast.
     *
     * @var array
     */
    protected $casts = [
        'payload' => 'array',
        'response' => 'array',
        'headers' => 'array',
    ];

    protected static function booted()
    {
        static::creating(function ($webhookLog) {
            if (empty($webhookLog->id)) {
                throw new \InvalidArgumentException();
            }
            if (empty($webhookLog->application_id)) {
                throw new \InvalidArgumentException();
            }
            if (empty($webhookLog->event_id)) {
                throw new \InvalidArgumentException();
            }

            // index attributes
            $webhookLog->PK = "WEBHOOK#{$webhookLog->id}";
            $webhookLog->SK = "WEBHOOK#{$webhookLog->id}";
            $webhookLog->GSI1PK = "APPLICATION_ID#{$webhookLog->application_id}";
            $webhookLog->GSI1SK = "WEBHOOK#{$webhookLog->id}";
            $webhookLog->GSI2PK = "EVENT#{$webhookLog->event_id}";
            $webhookLog->GSI2SK = "WEBHOOK#{$webhookLog->id}";
            $webhookLog->TYPE = self::class;
        });
    }

    public static function find($id)
    {
        return parent::find(['PK' => "WEBHOOK#{$id}", 'SK' => "WEBHOOK#{$id}"]);
    }

    public static function extractLastEvaluatedKey($item)
    {
        if (empty($item)) {
            return null;
        }

        return $item->meta()['LastEvaluatedKey'] ?? null;
    }
}

@kitar
Copy link
Owner

kitar commented Jul 22, 2022

Thanks for sharing! The code looks interesting. However, I can't reproduce the problem yet. Let me clarify a few things.

  • Are you using multiple DB connections? (eg. your default DB_CONNECTION is not dynamodb, but the models given above are using dynamodb connection)
  • Do you have a problem interacting with DynamoDB even outside the queue?
  • What are you trying to do in SystemEventCreated job? (There may be some operations causing the laravel-dynamodb bug in that job somewhere like restoreCollection())

If you dump the $connection variable inside the queue (eg. $model->getConnectionName()), and if it is dynamodb, there is likely to be a different problem than the connection.

@localpath
Copy link
Contributor Author

Hi @kitar. yes we are using multiple DB connections.

Are you using multiple DB connections? (eg. your default DB_CONNECTION is not dynamodb, but the models given above are using dynamodb connection)

I think thats what it is. On my dynamo DB model do I need to explicitly define the getConnectionName? I bet thats what it is bc the connection is returning null.

Do you have a problem interacting with DynamoDB even outside the queue?

The model works perfectly outside of the queue.

What are you trying to do in SystemEventCreated job? (There may be some operations causing the laravel-dynamodb bug in that job somewhere like restoreCollection())

The event class. I wanted to inject the model instance and not the plain array.

<?php

namespace App\Events\SystemEvents;

use App\Models\SystemEvent;
use Illuminate\Broadcasting\Channel;
use Illuminate\Broadcasting\InteractsWithSockets;
use Illuminate\Broadcasting\PresenceChannel;
use Illuminate\Broadcasting\PrivateChannel;
use Illuminate\Contracts\Broadcasting\ShouldBroadcast;
use Illuminate\Foundation\Events\Dispatchable;
use Illuminate\Queue\SerializesModels;

class SystemEventCreated
{
    use Dispatchable, InteractsWithSockets, SerializesModels;

    /**
     * Create a new event instance.
     *
     * @param  array  $systemEvent
     * @return void
     */
    public function __construct(
        public array $systemEvent,
    ) {
    }
}

The listener for example

<?php

namespace App\Listeners\Webhooks;

use App\Events\SystemEvents\SystemEventCreated;
use App\Models\WebhookSubscriber;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Database\Eloquent\Builder;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Support\Facades\Cache;
use Spatie\WebhookServer\WebhookCall;

class SendSystemEventNotifications implements ShouldQueue
{
    /**
     * Create the event listener.
     *
     * @return void
     */
    public function __construct()
    {
        //
    }

    /**
     * Handle the event.
     *
     * @param  \App\Events\SystemEvents\SystemEventCreated  $event
     * @return void
     */
    public function handle(SystemEventCreated $event)
    {
        // use the model off the event class for example
    }
}

@kitar
Copy link
Owner

kitar commented Aug 24, 2022

@localpath Sorry for the late reply.

Thanks for the information and you were indeed right. However, there are several issues involved that make it difficult to resolve immediately. Please pass keys or array instead of a model instance to the listener at this time.

Notes on this issue:

  • Our Query Builder and Model should work correctly with Illuminate\Queue\SerializesAndRestoresModelIdentifiers.
    • There are some missing methods such as newQueryForRestoration(), first(), firstOrFail()
    • SerializesAndRestoresModelIdentifiers::getRestoredPropertyValue() expects identifier is always single value, but with DynamoDB, it could be multiple values (like PK and SK).

@localpath
Copy link
Contributor Author

@kitar Hi. Thanks for looking at it. Yes that makes perfect sense. Easy enough to get around and just rehydrate models later. Thanks!!

@Mahan-Shoghy
Copy link

Mahan-Shoghy commented Dec 2, 2024

@kitar Hi, I have same issue with broadcast notifications. what should I do with it?
I've tested notification with another Model that not using dynamodb as database and it works well

this is my example code:

Notification::send(User::query()->limit(1)->scan()->first(), new TestNotification()

this is the User Model:

class User extends Model implements AuthenticationContract {
    use Authenticatable, HasFactory, Notifiable;

    public const CREATED_AT = 'CreatedAt';

    public const UPDATED_AT = 'UpdatedAt';

    protected $table = 'Users';

    protected $primaryKey = 'Id';

    protected $connection = 'dynamodb';

    protected $fillable = [
        'FirstName',
        'LastName',
        'Mobile',
        'Email',
        'Username',
        'Avatar',
        'Password',
        'RememberToken',
        'DeletedAt',
    ];

    /**
     * @var array<int, string>
     */
    protected $hidden = [
        'Password',
        'RememberToken',
    ];

    public function getRememberToken(): string
    {
        return 'RememberToken';
    }

    protected function casts(): array
    {
        return [
            'DeletedAt' => 'date',
        ];
    }

    protected static function boot(): void
    {
        parent::boot();

        static::creating(function ($model) {
            if ($model->{$model->getKeyName()} === null) {
                $model->setAttribute($model->getKeyName(), Str::orderedUuid()->toString());
            }
        });
    }

    public function getIncrementing(): bool
    {
        return false;
    }

    public function getKeyType(): string
    {
        return 'string';
    }
}

I've made some changes to your code but still not working well:

Model:

// adding $implode to get keys as string

/**
 * Get the key of the current item.
 *
 * @return string|array
 */
public function getKey(bool $implode = true)
{
    /**
     * Your code here
     */

    if ($implode) {
        return implode("|", $key);
    }

    return $key;
}

public function newQueryWithoutScopes()
{
    return $this->newQuery();
}

public function newQueryForRestoration($ids)
{
    return $this->newQueryWithoutScopes()->whereKey($ids);
}

Builder:

public function whereKey($id)
{
    if ($id instanceof Model) {
        $id = $id->getKey(false);
    }

    if (is_array($id) || $id instanceof Arrayable) {
        $this->filterIn('Id', $id);

        return $this;
    }

    if ($id !== null && is_string($id)) {
        $id = (string) $id;
    }

    return $this->filter("Id", '=', $id);
}

public function get($columns = [])
{
    return collect($this->scan()->all());
}

I know this changes not helping. but I think it's better to send to you

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants