ratchetphp / Ratchet

Asynchronous WebSocket server
http://socketo.me
MIT License
6.24k stars 727 forks source link

Integrating with RabbitMQ #659

Closed ghost closed 6 years ago

ghost commented 6 years ago

Is there an example to integrate Ratchet with RabbitMQ?

WyriHaximus commented 6 years ago

Have you looked at https://github.com/jakubkulhan/bunny/ ?

mikealmond commented 6 years ago

Basic example


use Bunny\Async\Client as AsyncClient;
use Bunny\Channel;
use Bunny\Message;
use React\EventLoop\Factory as LoopFactory;
use App\Pusher;

require dirname(__DIR__) . '/vendor/autoload.php';

$loop = LoopFactory::create();
$pusher = new Pusher;

$queueAsyncClient = new AsyncClient($loop, [
    "host"     => $_ENV['RABBITMQ_HOST'],
    "port"     => $_ENV['RABBITMQ_PORT'],
    "vhost"    => $_ENV['RABBITMQ_VHOST'],
    "user"     => $_ENV['RABBITMQ_USERNAME'],
    "password" => $_ENV['RABBITMQ_PASSWORD'],
]);

$connect = $queueAsyncClient->connect();

// When client has connected, retrieve channel (as promise)
$connect->then(function (AsyncClient $client) {
    return $client->channel();

// Then declare the queue and exchange
})->then(function (Channel $channel) {

    // These method calls all return promises, so we need to combine them
    return React\Promise\all([
        $channel,

        // Create the queue we'll be using
        $channel->queueDeclare($_ENV['RABBITMQ_QUEUE']),

        // Declare an exchange
        $channel->exchangeDeclare($_ENV['RABBITMQ_EXCHANGE']),

        // Bind the queue to the exchange
        $channel->queueBind($_ENV['RABBITMQ_QUEUE'], $_ENV['RABBITMQ_EXCHANGE']),
    ]);

// Then, when the exchange is all hooked up, hook up the pusher
})->then(function ($connection) use ($pusher) {

    /** @var Channel $channel (see first section of all() promise above) */
    $channel = $connection[0];

    // On messages, consume them using the pusher
    return $channel->consume(
        function (Message $message) use ($pusher, $channel) {
            $pusher->onQueueReceived($message->content);
            echo "Got a message" . PHP_EOL;
        },
        $_ENV['RABBITMQ_QUEUE'],
        '',
        false,
        true // Acknowledges messages
    );
});

// Set up our WebSocket server for clients wanting real-time updates
$webSocketServer = new React\Socket\Server($loop);
$webSocketServer->listen(8080, '0.0.0.0'); // Binding to 0.0.0.0 means remotes can connect

// Stack of servers from base IO, through HTTP, through WebSockets to WAMP
$ioServer = new Ratchet\Server\IoServer(
    new Ratchet\Http\HttpServer(
        new Ratchet\WebSocket\WsServer(
            // The WAMP server takes the pusher to publish messages through WAMP
            new Ratchet\Wamp\WampServer($pusher)
        )
    ),
    $webSocketServer
);

$loop->run();
ghost commented 6 years ago

I owe you