jakubkulhan / bunny

Performant pure-PHP AMQP (RabbitMQ) sync/async (ReactPHP) library
MIT License
706 stars 102 forks source link

Async best practice question #84

Open LBanfalvi opened 5 years ago

LBanfalvi commented 5 years ago

Hi!

We have an async socket server written in ReactPHP. In some cases we would like to forward the processed packages to an AMQP server. Can you tell me what is the best practice to publish a message in this case? Currently we build up an async connection when the server starts, and in the onConnection event handler we connect to a channel, publish a message and then we close the channel. I don't think this is the proper way...

  $loop = React\EventLoop\Factory::create();
  $socket = new React\Socket\Server('127.0.0.1:9999', $loop);

  $amqp_options = [
    'host' => 'localhost',
    'vhost' => 'x',
    'user' => 'x',
    'password' => 'xx',
  ];

  $amqp_async_client = new Bunny\Async\Client($loop, $amqp_options);
  $amqp_async_client->connect()->then(function (Bunny\Async\Client $client) use (&$socket) {

    $socket->on('connection', function (React\Socket\ConnectionInterface $connection) use ($client) {
      $connection->write("Hello " . $connection->getRemoteAddress() . "!\n");
      $connection->on('data', function ($data) use ($connection, $client) {
        $connection->write("You said: " . strtoupper($data) . "\n");
        $client->channel()->then(function (Bunny\Channel $channel) use ($data) {
          return $channel->exchangeDeclare('test', 'direct');
        }
        )->then(function (Bunny\Channel $channel) use ($data) {
          $channel->publish($data, [], 'test')->then( function () use ($channel) {
            $channel->close();
          });
        }
        );
      });
      $connection->on('end', function () use ($connection) {
        $connection->close();
      });
      $connection->on('error', function (Exception $e) {
        echo 'conn error: ' . $e->getMessage() . PHP_EOL;
      });
    })->on('error', function (Exception $e) {
      echo 'server error: ' . $e->getMessage() . PHP_EOL;
    })->on('end', function () {
      echo 'server quit';
    });

  });

  $loop->run();
jakubkulhan commented 5 years ago

Hi. I'd create a single channel at the server startup, declare necessary exchanges & queues and then start accepting socket connections. In multi-threaded environments, such as JVM, it's common to create channel per thread to avoid locking. PHP is single-threaded, so there don't need to be any locks, a single channel should be sufficient.

LBanfalvi commented 5 years ago

Thanks for your reply! I tried this scenario before, it worked well, but then I read this article about concurrency handling (it's about rubybunny, but the situation is the same - singlethreaded environment "emulates" multithreading): http://rubybunny.info/articles/concurrency.html. It says "...This means that without synchronization on, publishing from multiple threads on a shared channel may result in frames being sent to RabbitMQ out of order".

jakubkulhan commented 5 years ago

Ruby supports threads, although as far as I know, MRI Ruby uses Global Interpreter Lock, that effectively makes it single-threaded, although there are also JRuby, IronRuby, etc. So I think RubyBunny docs refer to real concurrent access from multiple threads. This may not happen in the PHP runtime.

LBanfalvi commented 5 years ago

Thanks Jakub, we give it a try!