amphp / websocket-client

Async WebSocket client for PHP based on Amp.
https://amphp.org/websocket-client
MIT License
147 stars 17 forks source link

Problem determining connection was closed #49

Open crimson273 opened 9 months ago

crimson273 commented 9 months ago

I'm having problems figuring out the connection was closed. Below is the code example I'm using for testing. It connects to endpoint, sends ping messages to a server every 5 second, and sends pong messages to a server when it gets a ping message from server. It also closes connection on the client side after 30 seconds just for testing purporses. What I am expecting:

What I am getting:

In the end I need a reliable way to understand, that the connection was closed. On the server in particular. Any advice is very apriciated.

<?php

require dirname(__DIR__) . '/vendor/autoload.php';

use Revolt\EventLoop;
use function Amp\weakClosure;
use function Amp\Websocket\Client\connect;

$serverUrl = 'ws://dev.sportlevel.com/highway';

$connection = connect($serverUrl);

$connection->onClose(function () {
    echo "connection is closed!" . PHP_EOL;
});

EventLoop::repeat(5, weakClosure(function () use ($connection) {
    $pingMessage = json_encode([
        'msg_type' => 'ping',
        'timestamp' => microtime(true),
    ]);

    echo "sending ping: {$pingMessage}" . PHP_EOL;
    try {
        $connection->sendText($pingMessage);
    } catch (\Exception $e) {
        echo "send ping failed! " . $e->getMessage() . PHP_EOL;
    }

}));

EventLoop::delay(30, function () use ($connection) {
    $connection->close();
});

while (1) {
    if ($message = $connection->receive()) {
        $payload = $message->buffer();

        echo "Received: {$payload}" . PHP_EOL;

        $message = json_decode($payload, true);

        if ($message['msg_type'] == 'ping') {
            $pongMessage = json_encode([
                'msg_type' => 'pong',
                'timestamp' => $message['timestamp'],
            ]);

            echo "send pong: {$pongMessage}" . PHP_EOL;
            try {
                $connection->sendText($pongMessage);
            } catch (\Exception $e) {
                echo "send pong failed! " . $e->getMessage() . PHP_EOL;
            }
        }
    }
}
crimson273 commented 9 months ago

ok, I've managed to partially figure this one out.

I've rewritten the code and now I do have everything I needed.

What concerns me is that I don't really understand why the previous example didn't work as I supposed.

Another concern is if the below example is the "right" way of doing things. I can use EventLoop::run() call here while it's description says that I should use Suspension API instead.

I understand that this isn't about websocket client in particular and is really a question of correct usage for Revolt\EventLoop but it is really frustrating that I can't seem to find any verbose description of using it while the official documentation seem lacking.

So, if someone could point me out for any good sources of information about this - I would be very grateful.

Here's an updated code example

<?php

require dirname(__DIR__) . '/vendor/autoload.php';

use Revolt\EventLoop;
use function Amp\weakClosure;
use function Amp\Websocket\Client\connect;

$serverUrl = 'ws://dev.sportlevel.com/highway';

$connection = connect($serverUrl);

$connection->onClose(function () {
    echo "connection is closed!" . PHP_EOL;
});

// use suspension
$suspension = EventLoop::getSuspension();

EventLoop::repeat(5, weakClosure(function () use ($connection, $suspension) {
    $pingMessage = json_encode([
        'msg_type' => 'ping',
        'timestamp' => microtime(true),
    ]);

    echo "sending ping: {$pingMessage}" . PHP_EOL;
    try {
        $connection->sendText($pingMessage);
    } catch (\Exception $e) {
        echo "send ping failed! " . $e->getMessage() . PHP_EOL;
    }

    // use suspension
    $suspension->resume();
}));

EventLoop::repeat(1, weakClosure(function () use ($connection, $suspension) {
    if ($message = $connection->receive()) {
        $payload = $message->buffer();

        echo "Received: {$payload}" . PHP_EOL;

        $message = json_decode($payload, true);

        if ($message['msg_type'] == 'ping') {
            $pongMessage = json_encode([
                'msg_type' => 'pong',
                'timestamp' => $message['timestamp'],
            ]);

            echo "send pong: {$pongMessage}" . PHP_EOL;
            try {
                $connection->sendText($pongMessage);
            } catch (\Exception $e) {
                echo "send pong failed! " . $e->getMessage() . PHP_EOL;
            }
        }
    }

    // use suspension
    $suspension->resume();
}));

// use suspension in endless while loop
while (1) {
    $suspension->suspend();
}

// don't understand why it is not a preferred way of doing things
//EventLoop::run();
bwoebi commented 9 months ago

EventLoop::run() is perfectly fine when you have no specific top-level event to wait on, i.e. if your code is just repeats.

A common alternative to just using EventLoop::run() here would be using Amp\trapSignal() on SIGINT and do some cleanup afterwards.

crimson273 commented 9 months ago

thanks for the tip.

the other thing I can't understand is the proper way of expecting new messages to be received. The approach I used in the example below is workable but this limits me to getting new messages only once per second. What would be the right approach to get new messages immediately once they are received on the client side?

bwoebi commented 9 months ago

The proper way would be to just do a top-level loop (I.e. outside of EventLoop:: and not call EventLoop::run - the receive() call keeps the event loop alive):

    while ($message = $connection->receive()) {
        $payload = $message->buffer();

        echo "Received: {$payload}" . PHP_EOL;

        $message = json_decode($payload, true);

        if ($message['msg_type'] == 'ping') {
            $pongMessage = json_encode([
                'msg_type' => 'pong',
                'timestamp' => $message['timestamp'],
            ]);

            echo "send pong: {$pongMessage}" . PHP_EOL;
            try {
                $connection->sendText($pongMessage);
            } catch (\Exception $e) {
                echo "send pong failed! " . $e->getMessage() . PHP_EOL;
            }
        }
    }
crimson273 commented 9 months ago

such approach doesn't trigger the onClose for some reason. No matter what closed the connection - neither the server I am connecting to nor the scirpt itself through the delayed $connection->close(); This still leaves me feeling like I am walking through a maze. Although now I understand how my initial while loop was incorrect, I still do not understand how this is related for triggering (or not triggering) the EventLoop::repeat() and $connection->onClose()

<?php

require dirname(__DIR__) . '/vendor/autoload.php';

use Revolt\EventLoop;
use function Amp\Websocket\Client\connect;

$serverUrl = 'ws://dev.sportlevel.com/highway';

$connection = connect($serverUrl);

$connection->onClose(function () {
    echo "connection is closed!" . PHP_EOL;
});

EventLoop::repeat(5, function () use ($connection) {
    $pingMessage = json_encode([
        'msg_type' => 'ping',
        'timestamp' => microtime(true),
    ]);

    echo "sending ping: {$pingMessage}" . PHP_EOL;
    try {
        $connection->sendText($pingMessage);
    } catch (\Exception $e) {
        echo "send ping failed! " . $e->getMessage() . PHP_EOL;
    }
});

EventLoop::delay(30, function () use ($connection) {
    $connection->close();
});

while ($message = $connection->receive()) {
    $payload = $message->buffer();

    echo "Received: {$payload}" . PHP_EOL;

    $message = json_decode($payload, true);

    if ($message['msg_type'] == 'ping') {
        $pongMessage = json_encode([
            'msg_type' => 'pong',
            'timestamp' => $message['timestamp'],
        ]);

        echo "send pong: {$pongMessage}" . PHP_EOL;
        try {
            $connection->sendText($pongMessage);
        } catch (\Exception $e) {
            echo "send pong failed! " . $e->getMessage() . PHP_EOL;
        }
    }
}
kelunik commented 9 months ago

I'm on mobile right now, so can't test this, but try putting EventLoop::run() at the end. I think the onClose is queued, but not executed, because your script ends at that point.

korobkovandrey commented 5 months ago
//...
$deferred = new Amp\DeferredFuture;
$connection->onClose(function () use ($deferred) {
    echo "connection is closed!" . PHP_EOL;
    $deferred->complete();
});
//...
while ($message = $connection->receive()) {
//...
}
$deferred->getFuture()->await();
kelunik commented 4 months ago

I can reproduce the onClose callback not being executed, exactly how I assumed, because it is queued in the event loop, but the event loop isn't re-entered.

<?php

require dirname(__DIR__) . '/vendor/autoload.php';

use Amp\Websocket\Client\WebsocketHandshake;
use Revolt\EventLoop;
use function Amp\Websocket\Client\connect;

$serverUrl = 'wss://libwebsockets.org';

$handshake = (new WebsocketHandshake('wss://libwebsockets.org'))
    ->withHeader('Sec-WebSocket-Protocol', 'dumb-increment-protocol');

$connection = connect($handshake);

EventLoop::delay(5, function () use ($connection) {
    $connection->close();
});

$connection->onClose(fn () => print 'closed');

while ($message = $connection->receive()) {
    $payload = $message->buffer();

    print $payload;
}

// Remove this call to reproduce the initial report
EventLoop::run();