voryx / Thruway

PHP Client and Router Library for Autobahn and WAMP (Web Application Messaging Protocol) for Real-Time Application Messaging
MIT License
676 stars 118 forks source link

Synchronous client? #42

Closed adosaiguas closed 9 years ago

adosaiguas commented 10 years ago

I would like to publish to a topic in the php code that is run within an http request, and it would be great if I can just connect, publish and disconnect in a synchronous way. How would you do that?

davidwdan commented 10 years ago

@adosaiguas

At the moment, we don't have anything built in that will allow you to publish synchronously.

You can make your own function for a synchronous call/publish by doing something like this:

function blockingpublish($args)
{

    $result     = null;
    $connection = new \Thruway\Connection(
        [
            "realm" => 'com.example.test',
            "url"   => 'ws://demo.thruway.ws:9090', //You can use this demo server or replace it with your router's IP
        ]
    );

    $connection->on('open', function (\Thruway\ClientSession $session) use ($connection, $args, &$result) {

        //publish an event
        $session->publish('com.example.hello', [$args], [], ["acknowledge" => true])->then(
            function () use ($connection, &$result) {
                $result = "published";
                echo "Publish Acknowledged!\n";
                $connection->close(); //You must close the connection or this will hang
            },
            function ($error) use ($connection, &$result) {
                // publish failed
                echo "Publish Error {$error}\n";
                $result = $error;
                $connection->close();
            }
        );
    });

    $connection->open(); //Will block here until the connection is closed

    return $result;

}

//publish 
$result = blockingpublish("test");
adosaiguas commented 10 years ago

Thank you @davidwdan , I've been trying a similar approach and it fails, but it might be to some other issue. I'll let you know through here or within another issue. Meanwhile, can we leave this one open as a feature request of a synchronous client? (I might take a look at it in the near future)

davidwdan commented 10 years ago

@adosaiguas I'm just following up to see if you were able to get this working.

adosaiguas commented 10 years ago

Although I don't like it much I followed your proposal and it works perfect. Once I have time to collaborate I might work on the synchronous client.

2014-10-28 20:33 GMT+01:00 David Dan notifications@github.com:

@adosaiguas https://github.com/adosaiguas I'm just following up to see if you were able to get this working.

— Reply to this email directly or view it on GitHub https://github.com/voryx/Thruway/issues/42#issuecomment-60817164.

davidwdan commented 10 years ago

@adosaiguas Yeah, I'm not crazy about that solution either. We haven't had a big need for a synchronous client yet, so we haven't looked into making a more elegant solution.

adosaiguas commented 10 years ago

Maybe I am doing something wrong? Isn't this quite a common use case? And if so, which is the best practice? Maybe queue things up in some queue (redis) in order to push things from the queue to thruway? (I initially though on this solution but I ended up discarting it)

2014-10-29 1:40 GMT+01:00 David Dan notifications@github.com:

@adosaiguas https://github.com/adosaiguas Yeah, I'm not crazy about that solution either. We haven't had a big need for a synchronous client yet, so we haven't looked into making a more elegant solution.

— Reply to this email directly or view it on GitHub https://github.com/voryx/Thruway/issues/42#issuecomment-60856745.

davidwdan commented 10 years ago

In our case, most of what we do is asynchronous and in a lot of cases we return promises.

What are you trying to do?

adosaiguas commented 10 years ago

We have a PHP REST API that updates a DB and we notify our web clients through web sockets about the changes. The "synchronous" client is used in some of the REST calls to queue the "update" messages into the topics so they are received by the web clients.

2014-10-29 2:21 GMT+01:00 David Dan notifications@github.com:

In our case, most of what we do is asynchronous and in a lot of cases we return promises.

What are you trying to do?

— Reply to this email directly or view it on GitHub https://github.com/voryx/Thruway/issues/42#issuecomment-60859854.

zo commented 10 years ago

I am probably in the same kind of situation than @adosaiguas! I'm using Laravel Events, and triggering the synchronous script above was the only thing that work for me... But I'm new in the WAMP world and probably don't understand how to init asynchronous PHP tasks in the background using a framework like Laravel!

Any hints are welcome... (anyway thanks for the above solution)

mbonneau commented 10 years ago

Hi @adosaiguas and @no2pxl

We have put together a simple HTTP proxy that allows you to publish into a realm from an HTTP request.

Please take a look at https://github.com/voryx/WampPost

Please let me know how this works for you.

Thanks, Matt

adosaiguas commented 9 years ago

I've added this to my task queue, lets see when I can test it. Although I am thinking on using a real queue system for this (Redis, Beanstalkd, ...).

2014-11-21 14:34 GMT+01:00 Matt Bonneau notifications@github.com:

Hi @adosaiguas https://github.com/adosaiguas and @no2pxl https://github.com/no2pxl

We have put together a simple HTTP proxy that allows you to publish into a realm from an HTTP request.

Please take a look at https://github.com/voryx/WampPost

Please let me know how this works for you.

Thanks, Matt

— Reply to this email directly or view it on GitHub https://github.com/voryx/Thruway/issues/42#issuecomment-63970090.

nhouse commented 9 years ago

Sorry to hijack this issue, but I have a question regarding the blockingpublish() function @davidwdan provided.

I'm integrating Thruway with my site and so far it's going great. I'm using your blockingpublish() function to push updates from PHP code, but in the case where the WsServer isn't running (which should never happen, but I want to plan for it anyway), blockingpublish() lives up to it's name and blocks -- forever.

Is there any way to set a timeout or something so that if WsServer isn't running the script isn't blocked indefinitely?

I think I could potentially use shell_exec() to check if the WsServer process is running, but I thought there might be a better solution.

Thanks for your time.

mbonneau commented 9 years ago

Hi @nhouse ,

Just add

$connection->getClient()->setAttemptRetry(false);

after the new connection is created to tell it to try to connect once only. This will make it exit the loop if connection is refused or there is some other issue.

You may want to take a look at https://github.com/voryx/WampPost too as it would work as a synchronous publisher through a regular http call.

nhouse commented 9 years ago

Thanks for your help once again.

I tried adding the code after my $connection = new \Thruway\Connection( line, but the function still blocks. I set a break point inside retryConnection(), but it's never triggered.

Stepping through with the debugger shows the code blocks inside streamSelect() at this line:

return stream_select($read, $write, $except, $timeout === null ? null : 0, $timeout);

It's because $timeout is null, as set in run():

 // The only possible event is stream activity, so wait forever ...
        } elseif ($this->readStreams || $this->writeStreams) {
            $timeout = null;

This causes stream_select() to block indefinitely.

I tried changing $timeout = null; to $timeout = 0;, but then the loop in run() just loops forever.

mbonneau commented 9 years ago

Timeout would only be non-null if you are using timers in the event loop, so that is normal. stream_select should return if a network error is encountered (connection refused, connection timeout, etc.) At that point the client connection will be removed from the event loop and run() will exit. The speed that it will do that will be different under different circumstances.

While you are testing this I assume that you take a server on localhost down and then hit the script with the blocking call? Is your function identical to the example @davidwdan gave except for the addition of that one line? If you let the blocking call sit for a while does the connection time out?

nhouse commented 9 years ago

While you are testing this I assume that you take a server on localhost down and then hit the script with the blocking call?

Yes, I stop the server script and test the blocking function call.

Is your function identical to the example @davidwdan gave except for the addition of that one line?

It's basically identical to the above function:

    function thruwayBlockingPublish($topic, $args)
    {
        require_once __DIR__ . '/../vendor/autoload.php';

        ob_start(); // @todo figure out how to disable thruway's Logger output instead of using output buffering to catch and discard it

        $result     = null;
        $connection = new \Thruway\Connection(
            [
                "realm" => 'restricted_realm',
                "url"   => 'ws://127.0.0.1:9091', //You can use this demo server or replace it with your router's IP
            ]
        );
        $connection->getClient()->setAttemptRetry(false); // @todo untested https://github.com/voryx/Thruway/issues/42#issuecomment-86805502 doesn't work!

        $connection->on('open', function (\Thruway\ClientSession $session) use ($connection, $args, &$result, $topic) {

            //publish an event
            $session->publish($topic, [$args], [], ["acknowledge" => true])->then(
                function () use ($connection, &$result) {
                    $result = "published";

                    $connection->close(); // You must close the connection or this will hang
                },
                function ($error) use ($connection, &$result) {
                    // publish failed
                    $result = $error;

                    $connection->close(); // You must close the connection or this will hang
                }
            );
        });

        $connection->open(); // Will block here until the connection is closed

        ob_end_clean();

        return $result;
    }

If you let the blocking call sit for a while does the connection time out?

The longest I've let a script that made the blocking publish call run is about a minute.

stream_select should return if a network error is encountered (connection refused, connection timeout, etc.) At that point the client connection will be removed from the event loop and run() will exit. The speed that it will do that will be different under different circumstances.

I'm not familiar with stream_select() at all, but the documentation says:

If tv_sec is NULL stream_select() can block indefinitely, returning only when an event on one of the watched streams occurs (or if a signal interrupts the system call).

All I know is that when I hard coded $timeout to 0 stream_select() doesn't block, but when $timeout is null it blocks (I'm running WampServer on Windows).

I appreciate your help, but as this issue should never arise on my live server it's probably not worth spending a lot of time trying to figure out.

mbonneau commented 9 years ago

First off, you can kill the logging with:

use Psr\Log\NullLogger;
use Thruway\Logging\Logger;

Logger::set(new NullLogger());

As for the real problem, I haven't run this stuff too much on windows, although I wouldn't think there would be much of a difference. If I get a chance I will give it a try and see how it behaves for me.

In the meantime, my first recommendation is to use WampPost.

Another option is to set a timeout that kills the loop using:

$loop = $connection->getClient()->getLoop();
$loop->addTimer(5, function () use ($loop) {
    $loop->stop();
});

Put that in the same spot as $connection->getClient()->setAttemptRetry(false); and that will give you a 5 second timeout which you can obviously adjust to fit your needs (it accepts fractional times also).

bkdotcom commented 8 years ago

came here looking for a synchronous solution.. blockingpublish seems very inefficient as it has to create a new connection for each request. likewise with wamppost

my use case: implementing a logger

dlorenso commented 7 years ago

we need a synchronous client that can publish multiple messages using a single connection with option for connection failure timeouts, etc:

$client = new SynchronousPublisherClient(...);
$client->publish('com.example.notify', ['start']);
sleep(2);
$client->publish('com.example.notify', ['middle']);
sleep(2);
$client->publish('com.example.notify', ['end']);

This will be used by REST api code that is called through HTTPS that wants to inject publish messages into crossbar or other WAMP router. Since the REST api is running via Apache/Nginx, a new connection to the WAMP router must be opened through web socket connection for each api call.

Its possible that asynchronous isn't really necessary because we don't care about when messages are delivered as long as they are delivered in the proper order and eventually ... so, maybe we can make all those publish command remain asynchronous ... but only exit the client once we know all the messages have been delivered to the WAMP router?

$client = new SynchronousPublisherClient(...);
$client->publish('com.example.notify', ['start']);
sleep(2);
$client->publish('com.example.notify', ['middle']);
sleep(2);
$client->publish('com.example.notify', ['end']);
$client->waitAll(); // ???

-- Dante

mbonneau commented 7 years ago

@dlorenso - There are many solutions as you can see above. I would suggest using WampPost for this to simplify things. See https://github.com/voryx/WampPost .

You can then make posts into the router using something like:

function publishToWampTopic($topic, $args, $url = 'http://localhost:8181/pub')
    {
        $body = [
            "topic" => $topic,
            "args" => $args
        ];
        $context = stream_context_create([
            'http' => [
                'method' => "POST",
                'header' => "Content-type: application/json\r\n",
                'content' => json_encode($body)
            ]
        ]);

        return file_get_contents($url, null, $context);
    }

Otherwise, (in the async Client) you can have publish return a promise by setting the acknowledge option to true and then wait on the promise using https://github.com/clue/php-block-react .

For timeouts I would suggest possibly using: https://github.com/reactphp/promise-timer to wrap up other promises.

If you are comfortable with reactive extensions, the best solution would be to move to Rx and use https://github.com/voryx/RxThruwayClient which is built on top of Observables and gives a very high degree of control over ordering, timing, and failures.