swoole / swoole-src

🚀 Coroutine-based concurrency library for PHP
https://www.swoole.com
Apache License 2.0
18.44k stars 3.16k forks source link

Swoole socket server doesn't work with RabbitMQ #1517

Open rvaliev opened 6 years ago

rvaliev commented 6 years ago

Please answer these questions before submitting your issue. Thanks!

  1. What did you do? If possible, provide a recipe for reproducing the error. I want to send data to open socket connections using Swoole and RabbitMQ. Therefore I've created following files: Swoole server (swoole_sever.php) which constantly listens to connections:
    
    $server = new \swoole_websocket_server("0.0.0.0", 2345, SWOOLE_BASE);

$server->on('open', function(\Swoole\Websocket\Server $server, $req) { echo "connection open: {$req->fd}\n"; });

$server->on('message', function($server, \Swoole\Websocket\Frame $frame) { echo "received message: {$frame->data}\n"; $server->push($frame->fd, json_encode(["hello", "world"])); });

$server->on('close', function($server, $fd) { echo "connection close: {$fd}\n"; });

$server->start();


Worker which receives message from RabbitMQ, then makes connection to Swoole and broadcasts the message via socket connection (worker.php)

$connection = new AMQPStreamConnection('0.0.0.0', 5672, 'guest', 'guest'); $channel = $connection->channel();

$channel->queue_declare('task_queue', false, true, false, false);

echo ' [*] Waiting for messages. To exit press CTRL+C', "\n";

$callback = function($msg){ echo " [x] Received ", $msg->body, "\n"; sleep(substr_count($msg->body, '.')); echo " [x] Done", "\n"; $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);

// Here I'm trying to make connection to Swoole server and sernd data
$cli = new \swoole_http_client('0.0.0.0', 2345);

$cli->on('message', function ($_cli, $frame) {
    var_dump($frame);
});

$cli->upgrade('/', function($cli)
{
    $cli->push('This is the message to send to Swoole server');
    $cli->close();
});

};

$channel->basic_qos(null, 1, null); $channel->basic_consume('task_queue', '', false, false, false, false, $callback);

while(count($channel->callbacks)) { $channel->wait(); }

$channel->close(); $connection->close();


New task where the message will be send to RabbitMQ (new_task.php):

$connection = new AMQPStreamConnection('0.0.0.0', 5672, 'guest', 'guest'); $channel = $connection->channel();

$channel->queue_declare('task_queue', false, true, false, false);

$data = implode(' ', array_slice($argv, 1)); if(empty($data)) $data = "Hello World!"; $msg = new AMQPMessage($data, array('delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT) );

$channel->basic_publish($msg, '', 'task_queue');

echo " [x] Sent ", $data, "\n";

$channel->close(); $connection->close();


Then I start Swoole socket server:
`php swoole_sever.php`

And RabbitMQ worker:
`php worker.php`

After that I run the new task to send the message to RabbitMQ:
`php new_task.php`

After that in command line prompt where a RabbitMQ Worker is running (worker.php) I can see that a message is delivered to the worker ("[x] Received Hello World!" message is appearing).

However in command line prompt where Swoole server is running  nothing happens.

But if I terminate worker.php (ctrl + x) a message in command prompt where swoole server is running appears:

> "connection close: 1"

It's like a connection is being established but callback function from swoole server
`$server->on('open', function(\Swoole\Websocket\Server $server, $req)`
is never being triggered.

2. What did you expect to see?
After running a new_task.php I expect to broadcast message to RabbitMQ worker, which then connects to Swoole socket server and passes the message to it. Then the Swoole server receives the message and 
`$cli->on('message', function ($_cli, $frame) ` 
 is triggered.

3. What did you see instead?
In command line prompt with swoole_server.php happens nothing.

4. What version of Swoole are you using (`php --ri swoole`)?

> swoole support => enabled
> Version => 2.1.1
> Author => tianfeng.han[email: mikan.tenny@gmail.com]
> coroutine => enabled
> epoll => enabled
> eventfd => enabled
> timerfd => enabled
> signalfd => enabled
> cpu affinity => enabled
> spinlock => enabled
> rwlock => enabled
> async http/websocket client => enabled
> Linux Native AIO => enabled
> pcre => enabled
> zlib => enabled
> mutex_timedlock => enabled
> pthread_barrier => enabled
> futex => enabled
> 
> Directive => Local Value => Master Value
> swoole.aio_thread_num => 2 => 2
> swoole.display_errors => On => On
> swoole.use_namespace => On => On
> swoole.use_shortname => On => On
> swoole.fast_serialize => Off => Off
> swoole.unixsock_buffer_size => 8388608 => 8388608

5. What is your machine environment used (including version of kernel & php & gcc) ? 

PHP 7.1.1-1+deb.sury.org~xenial+1 (cli) (built: Jan 20 2017 09:20:20) ( NTS ) gcc version 5.4.0 20160609 (Ubuntu 5.4.0-6ubuntu1~16.04.4)



6. If you are using ssl, what is your openssl version?

don't use ssl
matyhtf commented 6 years ago

https://github.com/swoole/php-amqplib This is a coroutine client , that can only run on swoole-2.1.1 or later

demo

$server = new \swoole_websocket_server("0.0.0.0", 2345);

include(__DIR__ . '/config.php');

Co::set(['log_level' => 0]);

use PhpAmqpLib\Connection\AMQPSwooleConnection;

/**
 * @param \PhpAmqpLib\Message\AMQPMessage $message
 */
function process_message($message)
{
    echo "\n--------\n";
    echo $message->body;
    echo "\n--------\n";

    global $server;

    $message->delivery_info['channel']->basic_ack($message->delivery_info['delivery_tag']);

    if ($message->body === 'quit')
    {
        $message->delivery_info['channel']->basic_cancel($message->delivery_info['consumer_tag']);
    }

    foreach ($server->connections as $fd)
    {
        if ($server->isEstablished($fd))
        {
            $server->push($fd, $message->body);
        }
    }
}

$server->on('workerStart', function () {
    go(function () {
        $exchange = 'exchange';
        $queue = 'queue2';
        $consumerTag = 'consumer';

        $connection = new AMQPSwooleConnection(HOST, PORT, USER, PASS, VHOST);
        $channel = $connection->channel();
        $channel->queue_declare($queue, false, true, false, false);
        $channel->exchange_declare($exchange, 'direct', false, true, false);
        $channel->queue_bind($queue, $exchange);
        $channel->basic_consume($queue, $consumerTag, false, false, false, false, 'process_message');

        while (count($channel->callbacks))
        {
            $channel->wait();
        }
    });
});

$server->on('open', function (\Swoole\Websocket\Server $server, $req) {
    echo "connection open: {$req->fd}\n";
});

$server->on('message', function ($server, \Swoole\Websocket\Frame $frame) {
    echo "received message: {$frame->data}\n";
});

$server->on('close', function ($server, $fd) {
    echo "connection close: {$fd}\n";
});

$server->on('request', function ($req, $resp) {
    $resp->end(<<<HTML
    <h1>Swoole WebSocket Server</h1>
    <script>
var wsServer = 'ws://127.0.0.1:2345';
var websocket = new WebSocket(wsServer);
websocket.onopen = function (evt) {
    console.log("Connected to WebSocket server.");
};

websocket.onclose = function (evt) {
    console.log("Disconnected");
};

websocket.onmessage = function (evt) {
    console.log('Retrieved data from server: ' + evt.data);
};

websocket.onerror = function (evt, e) {
    console.log('Error occured: ' + evt.data);
};
</script>
HTML
);
});

$server->start();
rvaliev commented 6 years ago

@matyhtf thank you for your help, but unfortunately the example you've provided isn't working. I'm not sure whether it lies on my machine setup or something else.

This is the exact code I'm using:

<?php

namespace AppBundle\amqp;

require_once __DIR__ . '../../../../vendor/autoload.php';

$server = new \swoole_websocket_server("0.0.0.0", 2345);

\Co::set(['log_level' => 0]);

use PhpAmqpLib\Connection\AMQPSwooleConnection;

/**
 * @param \PhpAmqpLib\Message\AMQPMessage $message
 */
function process_message($message)
{
    echo "\n--------\n";
    echo $message->body;
    echo "\n--------\n";

    global $server;

    $message->delivery_info['channel']->basic_ack($message->delivery_info['delivery_tag']);

    if ($message->body === 'quit')
    {
        $message->delivery_info['channel']->basic_cancel($message->delivery_info['consumer_tag']);
    }

    foreach ($server->connections as $fd)
    {
        if ($server->isEstablished($fd))
        {
            $server->push($fd, $message->body);
        }
    }
}

$server->on('workerStart', function () {
    go(function ()
    {
        $exchange = 'router';
        $queue = 'msgs';
        $consumerTag = 'consumer';

        echo 'Before Connection';
        $connection = new AMQPSwooleConnection('localhost', 5672, 'guest', 'guest', '/');
        echo 'After Connection';

        $channel = $connection->channel();
        $channel->queue_declare($queue, false, true, false, false);
        $channel->exchange_declare($exchange, 'direct', false, true, false);
        $channel->queue_bind($queue, $exchange);
        $channel->basic_consume($queue, $consumerTag, false, false, false, false, 'process_message');

        while (count($channel->callbacks))
        {
            $channel->wait();
        }
    });
});

$server->on('open', function (\Swoole\Websocket\Server $server, $req) {
    echo "connection open: {$req->fd}\n";
});

$server->on('message', function ($server, \Swoole\Websocket\Frame $frame) {
    echo "received message: {$frame->data}\n";
});

$server->on('close', function ($server, $fd) {
    echo "connection close: {$fd}\n";
});

$server->on('request', function ($req, $resp)
{
    $resp->end(<<<HTML
    <h1>Swoole WebSocket Server</h1>
    <script>
var wsServer = 'ws://127.0.0.1:2345';
var websocket = new WebSocket(wsServer);
websocket.onopen = function (evt) {
    console.log("Connected to WebSocket server.");
};

websocket.onclose = function (evt) {
    console.log("Disconnected");
};

websocket.onmessage = function (evt) {
    console.log('Retrieved data from server: ' + evt.data);
};

websocket.onerror = function (evt, e) {
    console.log('Error occured: ' + evt.data);
};
</script>
HTML
    );
});

$server->start();

As you can see in callback $server->on('workerStart', function ()

before and after making connection (AMQPSwooleConnection) I'm outputting messages 'Before Connection' and 'After Connection'.

So after I start the server I only can see the 'Before Connection' appearing in terminal. So it looks like the whole thing is freezing within

$connection = new AMQPSwooleConnection('localhost', 5672, 'guest', 'guest', '/');

and the further code can't be reached anymore.

Also if I try to send message to the RabbitMQ from publisher nothing happens in the terminal.

PS. Connecting directly to socket server from web browser goes well.

Any idea what it could be? Thank you!

rvaliev commented 6 years ago

Can anyone confirm that the code provided above by @matyhtf works?

indiwine commented 5 years ago

Hello.

Its almost a year passed since last message, so its kind of necropost. @rvaliev I can confirm, code provided by @matyhtf is actually working.

Here is my test server code:

use ema\Engine\Commands\AbstractCommand;
use ema\Engine\Config;
use ema\Models\Messages;
use Monolog\Logger;
use PhpAmqpLib\Connection\AMQPSwooleConnection;
use Swoole\Http\Request;
use Swoole\WebSocket\Frame;
use Swoole\WebSocket\Server;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Output\OutputInterface;

class WampCommand extends AbstractCommand
{

    public function __invoke(InputInterface $input, OutputInterface $output)
    {
        /**
         * @var $logger Logger
         */
        $logger = $this->container['logger'];
        $server = new Server('0.0.0.0', 49152);

        $callback = function ($msg) use ($logger, $server) {
            $logger->info('AMQP Message received ', [$msg->body]);
            foreach ($server->connections as $fd)
            {
                if ($server->isEstablished($fd))
                {
                    $server->push($fd, $msg->body);
                }
            }
        };

        $server->on('workerStart', function () use ($callback) {
            go(function () use ($callback){
                $config = Config::get('amqp');

                $connection = new AMQPSwooleConnection($config['host'], $config['port'], $config['username'], $config['password']);

                $channel = $connection->channel();
                $channel->exchange_declare(Messages::AMQP_EXCHANGE, 'fanout');
                list($queue_name, ,) = $channel->queue_declare();
                $channel->queue_bind($queue_name, Messages::AMQP_EXCHANGE);

                $channel->basic_consume($queue_name, '', false, true, false, false, $callback);
                $channel->wait(null, true);
                while (\count($channel->callbacks)) {
                    $channel->wait();
                }
            });
        });

        $server->on('open', function (Server $server, Request $request) use ($logger) {
            $logger->info('Websocket connection Opened', [$request]);
        });

        $server->on('message', function (Server $server, Frame $frame) use ($logger) {
            $logger->info('Message received from client', [$frame->data]);
            $server->push($frame->fd, 'Message Received: ' . $frame->data);
        });

        $server->on('close', function (Server $server, $fd) use ($logger) {
            $logger->info('Connection closed', [$fd]);
        });

        $server->start();
    }
}

Using this code I finally get a round trip: Browser App => AJAX Request => AMQP exchange => WebSocket => Browser App

I am not familiar with swoole, but for some reason this code opens two simultaneous connections with RabbitMQ server (I assume number of connections will depend on a number of cores available on your particular server) so fanout exchange is probably not what you wish to use. amqp