jakubkulhan / bunny

Performant pure-PHP AMQP (RabbitMQ) sync/async (ReactPHP) library
MIT License
705 stars 102 forks source link

Client::disconnect is asynchronous #93

Open schlndh opened 4 years ago

schlndh commented 4 years ago

Hi. As the title says, Client::disconnect is asynchronous (i.e. it returns before the connection is closed). While I don't think it is a problem for most use-cases, it can lead to hard-to-debug scenarios.

In our scenario, we opened a new connection for each published message (naive implementation). Which worked fine until we started publishing many messages in one process, where we occasionally ran out of sockets (default limit on Linux is 1024). Figuring this out took me several hours and I fixed it by improving our implementation by using a single connection for all the published messages.

Originally, I though that this was a bug, but after looking into it more I discovered, that it's not so much a bug, but rather a misleading semantic of the disconnect method. To achieve synchronous disconnect, one has to make sure the client's destructor is called. Unfortunately, because there is a circular reference between the client and the channel, it does not happen by itself consistently.

Here is an example which reproduces the issue:

<?php

declare(strict_types=1);

require_once  __DIR__ . '/vendor/autoload.php';

use Bunny\Client;

$config = [
    'host' => '127.0.0.1',
    'vhost' => 'dev',
    'user' => 'guest',
    'password' => 'guest',
];

$files = [];
@mkdir('/tmp/bunny_test');

// open some files to artificially decrease the open files limit
for ($i = 0; $i < 1000; $i++) {
    $files[] = fopen("/tmp/bunny_test/{$i}.txt", 'w');
}

$queue = 'test_queue';

$messages = 50;

echo "---- start sending messages ----\n";

try {
    for ($i = 0; $i < $messages; $i++) {
        $bunny = new Client($config);
        $bunny->connect();
        $channel = $bunny->channel();
        $channel->queueDeclare($queue, false, true, false, false, false, ['x-max-priority' => 10]);
        $channel->qos(0, 1);
        $channel->publish('ABC', ['delivery-mode' => 2, 'priority' => 0], '', $queue);

        // This is what we did:
        $bunny->disconnect(); // DOES NOT WORK

        // This is what the tutorials do:
        //$channel->close(); $bunny->disconnect(); // DOES NOT WORK

        // A variant of these would have to be called to make it work:
        //$bunny->__destruct(); // WORKS
        //unset($bunny); unset($channel); gc_collect_cycles(); // WORKS

        echo "---- {$i} done. ----\n";
    }
} catch (Throwable $e) {
    echo "Error after {$i} messages.\n";

    throw $e;
}

echo "\nDone\n";

As it is, it ends with:

PHP Fatal error:  Uncaught Error: Class 'Bunny\Exception\ClientException' not found in /www/custom/bunny_test/vendor/bunny/bunny/src/Bunny/AbstractClient.php:261
Stack trace:
#0 /www/custom/bunny_test/vendor/bunny/bunny/src/Bunny/AbstractClient.php(327): Bunny\AbstractClient->getStream()
#1 /www/custom/bunny_test/vendor/bunny/bunny/src/Bunny/Client.php(87): Bunny\AbstractClient->write()
#2 /www/custom/bunny_test/vendor/bunny/bunny/src/Bunny/Client.php(108): Bunny\Client->flushWriteBuffer()
#3 /www/custom/bunny_test/test.php(33): Bunny\Client->connect()
#4 {main}
  thrown in /www/custom/bunny_test/vendor/bunny/bunny/src/Bunny/AbstractClient.php on line 261
WyriHaximus commented 4 years ago

Should be easily fixable with https://github.com/clue/reactphp-block

mermshaus commented 4 years ago

The synchronous client uses its own event loop implementation. I am not sure that reactphp-block can help here.

Client::disconnect tries to close open channels before disconnecting. Channels need a running event loop (the custom one) to close, as a channel waits for a Channel.Close-Ok response before resolving its deferred. I think the need for a running event loop is the important bit. It is not very intuitive.

So the basic code for client disconnection is:

$client->disconnect()->done(
    function () use ($client) {
        $client->stop();
    }
);

$client->run();

There’s one thing to add: If the client has no open channels, the done callback fires instantly, and the connection is closed before starting the event loop. The code needs to be able to handle this:

$client->disconnect()->done(
    function () use ($client) {
        $client->stop();
    }
);

if ($client->isConnected()) {
    $client->run();
}

As of now, this probably is the correct way to disconnect a synchronous Bunny client. This is also exactly what the destructor does.

PS:

A related tip: If, for whatever reason, you are trying to disconnect a client that has already been unknowingly disconnected (eg. by running into the heartbeat timeout), the client will throw a ClientException such as Broken pipe or closed connection., but it won’t change its internal state to “disconnected” in all cases.

You can catch this exception in your disconnection code, but if the client thinks that it is still connected when it is garbage-collected (eg. at the end of script execution), it tries to disconnect again within the destructor. This will lead to the same ClientException but in a place where you can’t really catch it.

So it is probably a good idea to override the client’s state to “disconnected” when your code notices that there is no longer a connection.

try {
    $client->disconnect()->done(
        function () use ($client) {
            $client->stop();
        }
    );

    if ($client->isConnected()) {
        $client->run();
    }
} catch (ClientException $exception) {
    if ($exception->getMessage() === 'Broken pipe or closed connection.') {
        $reflectionProperty = new \ReflectionProperty(Client::class, 'state');
        $reflectionProperty->setAccessible(true);
        $reflectionProperty->setValue($client, ClientStateEnum::ERROR);
    } else {
        throw $exception;
    }
}