voryx / Thruway

PHP Client and Router Library for Autobahn and WAMP (Web Application Messaging Protocol) for Real-Time Application Messaging
MIT License
675 stars 117 forks source link

Issues with Implementation #249

Closed mkcheung closed 7 years ago

mkcheung commented 7 years ago

Hello everyone,

I'm hoping someone can help me out with this. I'm just learning about Websockets and only have a cursory understanding of them along with the WAMP v2.

I'm trying to write a multi-channel chat room using Symfony2 as a backend and I'm writing the client in React, utilizing Autobahn to tie into the sockets. I had to switch over to Thruway when I discovered that Ratchet wasn't compatible with the latest iteration of Autobahn because Ratchet was geared for WAMP v1.

I'm trying to build a clone of Slack. Essentially, what I want to accomplish, is to pick a user on the left sidebar of the screen (which I believe is a Topic), and have the messages I type into the chat window broadcast to all users associated with that topic. Unfortunately, I'm having a hard time setting up the implementation.

Sorry if the questions I'm about to ask sound mundane or if they've been asked before but my knowledge is very, very limited at this point. I'm experimenting to try and figure things out.

This is what I'm facing. The messages I'm sending up do seem to be making it up to the socket implementations via Thruway, but I can't seem to be able to publish them. I'm not certain what it is that I'm doing incorrectly.

This is where I set up my Thruway Router and RatchetTransportProvider....

**************************************************************************
<?php
// myapplication/src/AppBundle/Command/SocketCommand.php
// Change the namespace according to your bundle

namespace AppBundle\Command;

use Symfony\Component\Console\Command\Command;
use Symfony\Bundle\FrameworkBundle\Command\ContainerAwareCommand;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Output\OutputInterface;

//include React and ZMQ modules
use React\EventLoop\Factory;
use React\ZMQ\Context;
use React\Socket\Server as SocketServer;
use ZMQ;

use Thruway\Peer\Router;
use Thruway\Transport\RatchetTransportProvider;

// Change the namespace according to your bundle
use AppBundle\Sockets\Chat;

class SocketCommand extends ContainerAwareCommand
{
    protected function configure()
    {
        $this->setName('sockets:start-chat')
            // the short description shown while running "php bin/console list"
            ->setHelp("Starts the chat socket demo")
            // the full command description shown when running the command with
            ->setDescription('Starts the chat socket demo')
        ;
    }

    protected function execute(InputInterface $input, OutputInterface $output)
    {

        $router = new Router();
        $realm = "realm1";

        $router->addInternalClient(new Chat($realm, $router->getLoop()));
        $router->addTransportProvider(new RatchetTransportProvider("127.0.0.1", 8090));
        $router->start();
    }
}
**************************************************************************

This is where I set up the InternalClient for the router.....

**************************************************************************
<?php

// Change the namespace according to your bundle, and that's all !
namespace AppBundle\Sockets;

use Lexik\Bundle\JWTAuthenticationBundle\Security\Authentication\Token\JWTUserToken;
use Ratchet\ConnectionInterface;
use Ratchet\Wamp\WampServerInterface;
use Symfony\Component\DependencyInjection\ContainerInterface;
use Symfony\Component\HttpFoundation\Request;
use Symfony\Component\HttpFoundation\Response;
use Symfony\Component\HttpFoundation\JsonResponse;
use React\EventLoop;
use React\EventLoop\StreamSelectLoop;

use \Thruway\Transport\TransportInterface;
use \Thruway\Message\Message;
use React\EventLoop\Factory;
use React\ZMQ\Context;
use React\Socket\Server as SocketServer;
use ZMQ;

use Thruway\Peer\Client as Client;
class Chat extends Client
{
    protected $sessions = [];

    public function onSessionStart($session, $transport)
    {

        $session->subscribe('chat.test', function ($args, $kwArgs, $options) use ($session) {
            // Get message contents
            $token = $args[0]->token;
            $message = $args[0]->message;

             $value = isset($args[0]) ? $args[0] : '';
             echo 'Received ' . json_encode($value) . ' on topic ' . $options->topic . PHP_EOL;
            // publish to other people
            $session->publish('chat.test', [[ 'message' => $message ]]);
        });

    }

    public function onMessage(TransportInterface $transport, Message $msg) {
        var_dump($msg);
        if ($msg instanceof PublishMessage) {
            if ($msg->getTopicName() == "mytopic") {
                // mangle the message in here
            }
        }

        parent::onMessage($transport, $msg);
    }
}

The thing is that the message I send up doesn't seem to be hitting the onMessage method. My client is written like so. I've split them up along different Components as per React so these snippets are in different files....


class Channel extends React.Component{

    socket = {};
    constructor(props){
        super(props);
        this.state = {
            messages:[]
        };    

        this.getChannelMessages = this.getChannelMessages.bind(this);
        this.addMessage = this.addMessage.bind(this);
        this.updateMessages = this.updateMessages.bind(this);
        this.scrollToBottom = this.scrollToBottom.bind(this);
        this.establishSession = this.establishSession.bind(this);
        this.sessionHandler = this.sessionHandler.bind(this);
        this.session = null;

        this.conn = new Autobahn.Connection({
           url: "ws://127.0.0.1:8090",
           realm: "realm1"
        });
    }

    sessionHandler(args) {
        console.log("Event:", args[0]);
    }

    establishSession(session){
        console.log('inside establishSession: ' + this.props.details.channelTopic);
        session.subscribe(this.props.details.channelTopic, this.sessionHandler);
        this.session = session;
    }

    componentWillMount(){

        const channelName = this.props.details.channelName;
        this.conn.onopen = this.establishSession;
        this.conn.open();
        this.getChannelMessages();
    }

This following component is where I try to publish the message..... I established the topic 'com.singular.channel'


class ChatInput extends React.Component{

    constructor(props) {
        super(props);
        // Set initial state of the chatInput so that it is not undefined
        this.state = { 
            chatInput: '' 
        };

        // React ES6 does not bind 'this' to event handlers by default
        this.submitHandler = this.submitHandler.bind(this);
        this.textChangeHandler = this.textChangeHandler.bind(this);
        this.sHandler = this.sHandler.bind(this);
    }

    textChangeHandler(event)  {
        this.setState({ chatInput: event.target.value });
    }

    sHandler(args) {
        console.log("Event:", args[0]);
    }

    submitHandler(event) {
        event.preventDefault();
        this.props.session.publish('com.singular.channel', [this.chatText.value]);

        this.setState({ chatInput: '' });
    }
**************************************************************************

The following is what I'm seeing when I trigger the submitHandler event and send a message up through the autobahn session.publish. In this case, I sent up 'Testing, Testing, Testing'.....

2017-06-14T19:57:49.7512700 debug      [Thruway\Transport\RatchetTransportProvider 27496] RatchetTransportProvider::onOpen
2017-06-14T19:57:49.7553380 debug      [Thruway\Transport\RatchetTransportProvider 27496] onMessage: ([1,"realm1",{"roles":{"caller":{"features":{"caller_identification":true,"progressive_call_results":true}},"callee":{"features":{"caller_identification":true,"pattern_based_registration":true,"shared_registration":true,"progressive_call_results":true,"registration_revocation":true}},"publisher":{"features":{"publisher_identification":true,"subscriber_blackwhite_listing":true,"publisher_exclusion":true}},"subscriber":{"features":{"publisher_identification":true,"pattern_based_subscription":true,"subscription_revocation":true}}}}])
2017-06-14T19:57:49.7556700 info       [Thruway\RealmManager 27496] Got prehello...
2017-06-14T19:57:49.7588620 debug      [Thruway\Transport\RatchetTransportProvider 27496] onMessage: ([32,217579874244858,{},"com.singular.channel"])
2017-06-14T19:57:49.7591440 debug      [Thruway\Subscription\SubscriptionGroup 27496] Added subscription to "exact":"com.singular.channel"
2017-06-14T19:58:09.1274460 debug      [Thruway\Transport\RatchetTransportProvider 27496] onMessage: ([16,1605618907268018,{},"com.singular.channel",["Testing Testing Testing"]]) 

The thing is, that last debug message is printed within the onMessage method of the RatchetTransportProvider.php file. I need to somehow save that message in my PostGres Database and broadcast it out to the topic com.singular.channel. I can't seem to route the onMessage call to the 'onMessage' in my Chat class.

I'm not sure if I'm thinking along the correct lines. Can anyone point me in the right direction or direct me to some examples? It would be much appreciated! :-)

mkcheung commented 7 years ago

I was able to figure out a little more. I'm now receiving this on the command line when I send up a message in my chat....

2017-06-17T09:46:11.0345080 info       [Thruway\RealmManager 37512] Got prehello...
2017-06-17T09:46:11.0415140 debug      [Thruway\Transport\RatchetTransportProvider 37512] onMessage: ([32,8948575040964624,{},"com.singular.channel"])
2017-06-17T09:46:11.0417740 debug      [Thruway\Subscription\SubscriptionGroup 37512] Added subscription to "exact":"com.singular.channel"
2017-06-17T09:46:25.8556880 debug      [Thruway\Transport\RatchetTransportProvider 37512] onMessage: ([16,431742673216504,{},"com.singular.channel",["This seems to go through"]])
/Users/marscheung/SideProjects/slacklite/src/AppBundle/Sockets/Chat.php:60:
class Thruway\Message\EventMessage#730 (6) {
  private $subscriptionId =>
  int(7657383312685531)
  private $publicationId =>
  int(423985547458193)
  private $topic =>
  NULL
  private $details =>
  class stdClass#729 (0) {
  }
  private $arguments =>
  array(1) {
    [0] =>
    string(24) "This seems to go through"
  }
  private $argumentsKw =>
  class stdClass#731 (0) {
  }
}
2017-06-17T09:46:25.8621480 debug      [AppBundle\Sockets\Chat 37512] Client onMessage: [Thruway\Message\EventMessage]
Event This seems to go through

The change I experimented with was in my Chat.php file. I manually changed the $session->subscribe('chat.test'...... to 'com.singular.channel' as shown below and I set up a var_dump in my message method to display the command line results....

use Thruway\Peer\Client as Client;
class Chat extends Client
{
    protected $sessions = [];

    public function onSessionStart($session, $transport)
    {

        // 1) subscribe to a topic
        $onevent = function ($args) {
            echo "Event {$args[0]}\n";
        };
        $session->subscribe('com.singular.channel', $onevent);
        // 2) publish an event
        $session->publish('com.singular.channel', ['Hello, world from PHP!!!'], [], ["acknowledge" => true])->then(
            function () {
                echo "Publish Acknowledged!\n";
            },
            function ($error) {
                // publish failed
                echo "Publish Error {$error}\n";
            }
        );
    }

    public function onMessage(TransportInterface $transport, Message $msg) {
        var_dump($msg);
        if ($msg instanceof PublishMessage) {
            if ($msg->getTopicName() == "mytopic") {
                // mangle the message in here
            }
        }

        parent::onMessage($transport, $msg);
    }
}

I think that was part of my issue, in that the session being subscribed to was something hardcoded by myself in the code. What I'm having a difficult time figuring out is how to send up the topic that I want to subscribe to in the session. I thought the flow of execution was that Autobahn allowed for users to select a topic for the user to subscribe to and the subscription was handled on Thruway's end, but with the setup, it sounds like the subscriptions are already established and set in stone before an autobahn call is executed. My OnSessionStart in my Chat class gets activated when I run my socket command.....

class SocketCommand extends ContainerAwareCommand
{
    protected function configure()
    {
        $this->setName('sockets:start-chat')
            // the short description shown while running "php bin/console list"
            ->setHelp("Starts the chat socket demo")
            // the full command description shown when running the command with
            ->setDescription('Starts the chat socket demo')
        ;
    }

    protected function execute(InputInterface $input, OutputInterface $output)
    {

        // $router = new Router();
        // $transportProvider = new RatchetTransportProvider("127.0.0.1", 8090);
        // $router->addTransportProvider($transportProvider);
        // $router->start();
        $router = new Router();
        $realm = "realm1";

        $router->addInternalClient(new Chat($realm, $router->getLoop()));
        $router->addTransportProvider(new RatchetTransportProvider("127.0.0.1", 8090));
        $router->start();
    }
}

Perhaps I'm misunderstanding the setup? This script is already running with the 'com.singular.channel' already being subscribed to when my client comes up and try to select a session in my React client with autobahn.....

.
.
.
        this.session = null;

        this.conn = new Autobahn.Connection({
           url: "ws://127.0.0.1:8090",
           realm: "realm1"
        });
    }

    sessionHandler(args) {
        console.log("Event:", args[0]);
    }

    establishSession(session){
        console.log('inside establishSession: ' + this.props.details.channelTopic);
        session.subscribe(this.props.details.channelTopic, this.sessionHandler);
        this.session = session;
    }

Can someone please provide some insight? Again, please excuse my ignorance. I've been experimenting and trying to figure out Thruway on my own with limited success. Any help would be much appreciated. :-)

andrewminerd commented 7 years ago

Subscribing to a topic is similar to being in a chat room -- all subscribed clients receive the messages published to that topic. If all chat clients were simply subscribed to the channels they were a part of, there wouldn't necessarily be a need for another middle-man receiving the message and pushing it back out (that's what the WAMP broker does).

Here's an example along these lines: https://github.com/cgmartin/clj-wamp-example/blob/master/resources/public/js/chat.js

However, since you mentioned writing the messages to the database, another option might be using an RPC call. Rather than publishing a message, the chat clients would call a "sendMessage" procedure with the destination channel and the chat message. The procedure would write the message to the database, and then publish the message to a topic named after the channel. All clients would be subscribed to the topic for the channel they're in to receive the messages.

This page provides some information on the two 'modes' (publish/subscribe, or pubsub, vs. RPC): http://wamp-proto.org/why/

mkcheung commented 7 years ago

andfewminerd,

Thank you. :-)

Right, but the issue is that I'm having a problem getting the subscription to the topic (chat room) in the first place. The code example I posted is a command line command which seems to initiate the session subscription to 'com.singular.chat' (in the Chat class) when I I start up the command.

'com.singular.chat' is the name of one of the channels I intended to send up through Autobahn in my react client. So, to use an example, imagine I have a slack-like implementation for my client (which is ultimately what I'm trying to accomplish). This is how I'd like my chat to function....

1) Activate the command to start up the websockets. 2) Start up my react client. I have a view that has a list of the channels on the left hand side of the screen. Clicking on any one of these channels should kick off a subscription to the topic associated with that specific channel. In this case, let's say that I click a channel named Singular Channel and I have autobahn execute session.subscribe('com.singular.channel'......) to get the user into that specific topic/chat-room. 4) The user goes ahead and participates in the chat. His/her messages are processed in the database and broadcasted to all users who are subscribed to that specific topic.

The way my code works now, it seems like the subscription for the session takes place as soon as step 1 is executed. For lack of a better term, I'm not sure how to 'interrupt' the process and produce the flexibility needed for me to send the topic name up through autobahn to have the subscription take place upon user request rather than a hardcoded implementation utilizing Thruway. I'm sure it's possible, but I haven't been able to implement this so far.

andrewminerd commented 7 years ago

The server-side internal client and the autobahn client maintain separate subscriptions. If you wish to use the current model with the internal client subscribing to the channels, your Chat client would have to create individual subscriptions to every possible channel, or you could potentially use a pattern based subscription (https://github.com/wamp-proto/wamp-proto/blob/master/rfc/text/advanced/ap_pubsub_pattern_based_subscription.md). This is one reason I suggested using an RPC call when sending messages to the database.

Also, I'd just add that I'm not sure the internal client is a good fit for this application, although I'll admit that I have little experience with it. See the warning at http://voryx.net/creating-internal-client-thruway/:

The internal client should really only be used for router extensions or possibly for prototyping or debugging. It is not a good idea to use the internal client feature for production application clients. (Blocking, Compatibility with other WAMP elements, etc.)

That said, it's essentially the same process when using an 'external' client, it just would run in a separate process (and thus protect the router from any potential blocking in your client).

So a sample flow with RPC:

As you can see, this does not require the back-end client to have pre-existing knowledge of the channels/topics. If using a wildcard subscription, the flow would be somewhat similar, but the back-end client really just listens in and isn't part of the message flow:

mkcheung commented 7 years ago

@andrewminerd

Thanks again for your reply. I've tearing my hair out trying to find out how to work this, looking for examples and what not.

Let me give this a shot. I'll check back after I've played with it a little.

mbonneau commented 7 years ago

@mkcheung - I am closing this issue. Please feel free to comment on this or add a new one if your issue is not resolved.