predis / predis

A flexible and feature-complete Redis client for PHP.
https://github.com/predis/predis/wiki
MIT License
7.61k stars 988 forks source link

cannot use pub/sub with redis cluster #562

Closed abbood closed 2 years ago

abbood commented 5 years ago

I got a redis cluster server which I created by following this tutorial.

This is what my database/config.php setup looks like (see details here):

'redis' => [
    'client' => 'predis',
    'cluster' => true,
    'options' => [
        'cluster' => 'redis',
        'parameters' => [
            'host' => env('REDIS_DEFAULT_HOST', '127.0.01'),
            'password' => env('REDIS_PASSWORD', null),
            'port' => env('REDIS_DEFAULT_PORT', 6379),
            'database' => 0,
            ],
        ],
    'clusters' => [
         'default' => [
            'host' => env('REDIS_DEFAULT_HOST', '127.0.01'),
            'password' => env('REDIS_PASSWORD', null),
            'port' => env('REDIS_DEFAULT_PORT', 6379),
            'database' => 0,
        ],
        'jobs' => [
            'host' => env('REDIS_JOBS_HOST', '127.0.01'),
            'password' => env('REDIS_PASSWORD', null),
            'port' => env('REDIS_JOBS_PORT', 6379),
            'database' => 0,
        ],
        'content' => [
            'host' => env('REDIS_CONTENT_HOST', '127.0.01'),
            'password' => env('REDIS_PASSWORD', null),
            'port' => env('REDIS_CONTENT_PORT', 6379),
            'database' => 0,
        ],
        'options' => [
            'cluster' => 'redis'
        ],
    ]
]

and what my config/broadcasting.php looks like:

<?php

return [

    /*
    |--------------------------------------------------------------------------
    | Default Broadcaster
    |--------------------------------------------------------------------------
    |
    | This option controls the default broadcaster that will be used by the
    | framework when an event needs to be broadcast. You may set this to
    | any of the connections defined in the "connections" array below.
    |
    */

    'default' => env('BROADCAST_DRIVER', 'redis'),

    /*
    |--------------------------------------------------------------------------
    | Broadcast Connections
    |--------------------------------------------------------------------------
    |
    | Here you may define all of the broadcast connections that will be used
    | to broadcast events to other systems or over websockets. Samples of
    | each available type of connection are provided inside this array.
    |
    */

    'connections' => [

        'pusher' => [
            'driver' => 'pusher',
            'key' => env('PUSHER_KEY'),
            'secret' => env('PUSHER_SECRET'),
            'app_id' => env('PUSHER_APP_ID'),
        ],

        'redis' => [
            'driver' => 'redis',
            'connection' => 'default',
        ],

        'log' => [
            'driver' => 'log',
        ],

    ],

];

when i reach this part in my code when i want to publish an event in redis:

   // Dispatch an event indicates order status has been changed.
                event(new OrderStatusChanged($order->id, OrderActionsConstants::INCOMPLETE_CART));

I get this error:

Cannot use 'PUBLISH' with redis-cluster.

with this stack trace

[2019-06-10 17:05:06] local.ERROR: 
0 /Users/Shared/dev/php/toters-api/vendor/predis/predis/src/Connection/Aggregate/RedisCluster.php(550): Predis\Connection\Aggregate\RedisCluster->getConnection(Object(Predis\Command\PubSubPublish)) 
1 /Users/Shared/dev/php/toters-api/vendor/predis/predis/src/Connection/Aggregate/RedisCluster.php(593): Predis\Connection\Aggregate\RedisCluster->retryCommandOnFailure(Object(Predis\Command\PubSubPublish), 'executeCommand') 
2 /Users/Shared$dev/php/toters-api/vendor/predis/predis/src/Client.php(331): Predis\Connection\Aggregate\RedisCluster->executeCommand(Object(Predis\Command\PubSubPublish)) 
3 /Users/Shared/dev/php/toters-api/vendor/predis/predis/src/Client.php(314): Pre$is\Client->executeCommand(Object(Predis\Command\PubSubPublish)) 
4 /Users/Shared/dev/php/toters-api/vendor/laravel/framework/src/Illuminate/Redis/Connections/Connection.php(72): Predis\Client->__call('publish', Array) 
5 /Users/Shared/de$/php/toters-api/vendor/laravel/framework/src/Illuminate/Redis/Connections/Connection.php(84): Illuminate\Redis\Connections\Connection->command('publish', Array) 
6 /Users/Shared/dev/php/toters-api/vendor/laravel/framework/src/Illuminate/$roadcasting/Broadcasters/RedisBroadcaster.php(99): Illuminate\Redis\Connections\Connection->__call('publish', Array) 
7 /Users/Shared/dev/php/toters-api/vendor/laravel/framework/src/Illuminate/Broadcasting/BroadcastEvent.php(48): Illumin$te\Broadcasting\Broadcasters\RedisBroadcaster->broadcast(Array, 'order.status', '{"event":"order...') 
8 [internal function]: Illuminate\Broadcasting\BroadcastEvent->handle(Object(Illuminate\Broadcasting\Broadcasters\RedisBroadcaster)) 
$ /Users/Shared/dev/php/toters-api/vendor/laravel/framework/src/Illuminate/Container/BoundMethod.php(29): call_user_func_array(Array, Array) 
10 /Users/Shared/dev/php/toters-api/vendor/laravel/framework/src/Illuminate/Container/BoundMetho$.php(87): Illuminate\Container\BoundMethod::Illuminate\Container\{closure}() 
11 /Users/Shared/dev/php/toters-api/vendor/laravel/framework/src/Illuminate/Container/BoundMethod.php(31): Illuminate\Container\BoundMethod::callBoundMethod(Ob$ect(Illuminate\Foundation\Application), Array, Object(Closure)) 
12 /Users/Shared/dev/php/toters-api/vendor/laravel/framework/src/Illuminate/Container/Container.php(539): Illuminate\Container\BoundMethod::call(Object(Illuminate\Foundatio$\Application), Array, Array, NULL) 
13 /Users/Shared/dev/php/toters-api/vendor/laravel/framework/src/Illuminate/Bus/Dispatcher.php(94): Illuminate\Container\Container->call(Array) 
14 /Users/Shared/dev/php/toters-api/vendor/laravel/frame$ork/src/Illuminate/Pipeline/Pipeline.php(114): Illuminate\Bus\Dispatcher->Illuminate\Bus\{closure}(Object(Illuminate\Broadcasting\BroadcastEvent)) 
15 /Users/Shared/dev/php/toters-api/vendor/laravel/framework/src/Illuminate/Pipeline/Pipe$ine.php(102): Illuminate\Pipeline\Pipeline->Illuminate\Pipeline\{closure}(Object(Illuminate\Broadcasting\BroadcastEvent)) 
16 /Users/Shared/dev/php/toters-api/vendor/laravel/framework/src/Illuminate/Bus/Dispatcher.php(98): Illuminate\Pip$line\Pipeline->then(Object(Closure)) 
17 /Users/Shared/dev/php/toters-api/vendor/laravel/framework/src/Illuminate/Queue/CallQueuedHandler.php(42): Illuminate\Bus\Dispatcher->dispatchNow(Object(Illuminate\Broadcasting\BroadcastEvent), fal$e) 
18 /Users/Shared/dev/php/toters-api/vendor/laravel/framework/src/Illuminate/Queue/Jobs/Job.php(69): Illuminate\Queue\CallQueuedHandler->call(Object(Illuminate\Queue\Jobs\SyncJob), Array) 
19 /Users/Shared/dev/php/toters-api/vendor/la$avel/framework/src/Illuminate/Queue/SyncQueue.php(42): Illuminate\Queue\Jobs\Job->fire() 
20 /Users/Shared/dev/php/toters-api/vendor/laravel/framework/src/Illuminate/Queue/Queue.php(42): Illuminate\Queue\SyncQueue->push(Object(Illuminate$Broadcasting\BroadcastEvent), '', NULL) 
21 /Users/Shared/dev/php/toters-api/vendor/laravel/framework/src/Illuminate/Broadcasting/BroadcastManager.php(128): Illuminate\Queue\Queue->pushOn(NULL, Object(Illuminate\Broadcasting\BroadcastEve$t)) 
22 /Users/Shared/dev/php/toters-api/vendor/laravel/framework/src/Illuminate/Events/Dispatcher.php(270): Illuminate\Broadcasting\BroadcastManager->queue(Object(App\Events\OrderStatusChanged)) 
23 /Users/Shared/dev/php/toters-api/vend$r/laravel/framework/src/Illuminate/Events/Dispatcher.php(193): Illuminate\Events\Dispatcher->broadcastEvent(Object(App\Events\OrderStatusChanged)) 
24 /Users/Shared/dev/php/toters-api/vendor/laravel/framework/src/Illuminate/Foundation/he$pers.php(446): Illuminate\Events\Dispatcher->dispatch('App\\Events\\Orde...') 
25 /Users/Shared/dev/php/toters-api/app/Repositories/Orders/AppOrdersRepo.php(107): event(Object(App\Events\OrderStatusChanged)) 

how do i fix this?

guyromb commented 5 years ago

Did you find any solution @abbood ?

guyromb commented 5 years ago

Related to https://github.com/nrk/predis/issues/293

abbood commented 5 years ago

@guyromb i'm afraid i haven't. Well actually I did but as a hack, basically i'm gonna create two distinct redis connections in my laravel app, clustered for caching/queing and non-clustered for pub/sub, see details here: https://stackoverflow.com/a/57752477/766570 (vote up if you like :)

nrk commented 4 years ago

Hi @abbood,

I don't know how Laravel works so I will just give an example by using a plain Predis\Client instance connected to redis-cluster:

$pubsubClient = $client->getClientFor("$ip:$port");
$pubsubClient->publish($channel, $message);

The first line returns a new client instance targeting a single node by using its $ip:$port pair (which is the ID of a node in the cluster), that client can then be used for Pub/Sub (either by using Redis commands directly or by using the higher-level abstraction returned by Predis\Client::pubSubLoop()).

If you don't have an $ip:$port pair and you are just interested in a random node, the snipped is a bit more convoluted:

$randomSlot = mt_rand(0, 16383);   // Random slot in the range used by redis-cluster
$nodeConnection = $client->getConnection()->getConnectionBySlot($randomSlot);
$pubsubClient = new Predis\Client($nodeConnection, $client->getOptions());
$pubsubClient->publish($channel, $message);

Admittedly this is not an ideal approach but we already have some improvements in this regard planned (and in part already commited) for Predis v2.0.

As for Laravel, like I said I don't know its internals so I'm probably going to say something stupid but since they use a facade anyway maybe they could implement concrete methods for PUBLISH, SUBSCRIBE, UNSUBSCRIBE and related Pub/Sub commands in a way so that when connected to redis-cluster these commands can be transparently relayed to a single node using a random slot generated like in the above snippet. The catch is that once a slot is picked it's better to stick to the same slot for subsequent Pub/Sub commands (caching the slot ID internally should be fine). This is just a rough idea, @tillkruss is the Laravel expert here and should know if such an approach would be feasible or not.