GeniusesOfSymfony / WebSocketBundle

:part_alternation_mark: Websocket server for Symfony applications (powered by Ratchet), includes a Autobahn.JS based JavaScript client
MIT License
609 stars 140 forks source link

Subscriber Pusher events #405

Closed Gus19 closed 3 years ago

Gus19 commented 4 years ago

Hello,

I hava a problem and I don't understand how to fix it.

I am working with Symfony 4.4 and I follow the "Pusher & Push Handler" section (https://github.com/GeniusesOfSymfony/WebSocketBundle/blob/2.x/Resources/docs/Pusher.md).

The method $pusher->push works perfectly and I want to add a Subscriber but no PushHandlerEvent exists.

When I tried to add the code

public static function getSubscribedServices() {
  array_merge(parent::getSubscribedServices(), [
    gos_web_socket.pusher.wamp' => WampPusher::class,
  ]);
}

There is this Exception : The service "App\Controller\HomepageController" has a dependency on a non-existent service "Gos\Bundle\WebSocketBundle\Pusher\Wamp\WampPusher"

I think I need to add a config to the service but wich one ?

The service :

App\Controller\HomepageController:
    arguments:
      '$pusher': '@gos_web_socket.pusher.wamp'

Thanks in advance !

mbabker commented 4 years ago

The pusher service IDs aren't aliased to their class names, try this for your getSubscribedServices() method:

public static function getSubscribedServices() {
  array_merge(parent::getSubscribedServices(), [
    'gos_web_socket.pusher.wamp' => 'gos_web_socket.pusher.wamp',
  ]);
}
Gus19 commented 4 years ago

Key and Value are the same ?

I have this error : image

mbabker commented 4 years ago

Yes, key and value are the same. Looking at https://symfony.com/doc/current/service_container/service_subscribers_locators.html, it looks like this should work too:

public static function getSubscribedServices() {
  array_merge(parent::getSubscribedServices(), [
    'gos_web_socket.pusher.wamp',
  ]);
}
Gus19 commented 4 years ago

I resolve the error with "?" before the class but still no events :(

public static function getSubscribedServices() {
    return array_merge(parent::getSubscribedServices(), [
      'gos_web_socket.pusher.wamp' => '?'.WampPusher::class
    ]);
  }

There is my subsriber and the logs. Am I doing it wrong?

namespace App\EventSubscriber;

use Psr\Log\LoggerInterface;
use Symfony\Component\EventDispatcher\EventSubscriberInterface;
use Gos\Bundle\WebSocketBundle\Event\PushHandlerEvent;

class PushHandlerEventSubscriber implements EventSubscriberInterface
{

  private $logger;
  public function __construct(LoggerInterface $logger) {
    $this->logger = $logger;
  }

  public static function getSubscribedEvents(): array {
    return [
      PushHandlerEvent::class => 'onEvent'
    ];
  }

  public function onEvent(PushHandlerEvent $event) {
    $this->logger->info("into PushHandlerEventSubscriber");
    $this->logger->info($event);
  }
}
[2020-02-19 19:08:32] websocket.INFO: Starting web socket [] []
[2020-02-19 19:08:32] websocket.INFO: Launching Ratchet on 127.0.0.1:1337 PID: 23898 [] []
[2020-02-19 19:08:38] request.INFO: Matched route "notification". {"route":"notification","route_parameters":{"_route":"notification","_controller":"App\\Controller\\HomepageController::notification"},"request_uri":"http://localhost:8000/notification","method":"GET"} []
[2020-02-19 19:08:38] websocket.WARNING: User firewall is not configured, we have set ws_firewall by default [] []
[2020-02-19 19:08:38] websocket.DEBUG: INSERT CLIENT 426 {"token":"[object] (Symfony\\Component\\Security\\Core\\Authentication\\Token\\AnonymousToken: AnonymousToken(user=\"anon-1261928115e4d79a6a9743036560542\", authenticated=true, roles=\"\"))","username":"anon-1261928115e4d79a6a9743036560542"} []
[2020-02-19 19:08:38] websocket.INFO: anon-1261928115e4d79a6a9743036560542 connected {"connection_id":426,"session_id":"1261928115e4d79a6a9743036560542","storage_id":"426"} []
[2020-02-19 19:08:38] websocket.INFO: Publish in acme/channel [] []
[2020-02-19 19:08:38] websocket.DEBUG: GET CLIENT 426 [] []
[2020-02-19 19:08:38] websocket.DEBUG: User anon-1261928115e4d79a6a9743036560542 published to acme/channel [] []
[2020-02-19 19:08:38] websocket.DEBUG: Matched route "acme_topic" [] []
[2020-02-19 19:08:38] security.INFO: Populated the TokenStorage with an anonymous Token. [] []
[2020-02-19 19:08:38] websocket.DEBUG: GET CLIENT 426 [] []
[2020-02-19 19:08:38] websocket.DEBUG: REMOVE CLIENT 426 [] []
[2020-02-19 19:08:38] websocket.INFO: anon-1261928115e4d79a6a9743036560542 disconnected {"connection_id":426,"session_id":"1261928115e4d79a6a9743036560542","storage_id":"426","username":"anon-1261928115e4d79a6a9743036560542"} []
mbabker commented 4 years ago

The question mark thing just marks the service as optional, that wouldn’t fix the initial problem of the pusher not being a known service since it isn’t registered by class name.

Do you have the pushers enabled? They are unwired from the container when disabled, that would cause the service to not be found.

jimiero commented 4 years ago

@mbabker sorry if wrong area to ask, but seems the current documentation is a total mess :( do you have some documentation for symfony 4.4?

For example, what should be used with version 2.0 for periodic ping to solve the error "General error: 2006 MySQL server has gone away" ?

mbabker commented 4 years ago

Admittedly I don't think the ping stuff ever got documented beyond it being included in the configuration reference dump.

gos_web_socket:
    ping:
        services:
            -
                name: 'database_connection'
                type: 'doctrine'

That'll set up a ping service using the default database_connection service that DoctrineBundle makes. If you have multiple connections then add separate services for each connection you want it to ping.

It's not a surefire way of keeping you from hitting the "MySQL server has gone away" error but it does help.

jimiero commented 4 years ago

@mbabker any idea what can cause this issue?

request.CRITICAL: Uncaught PHP Exception Gos\Component\WebSocketClient\Exception\BadResponseException: "Could not extract response body from stream." at /vendor/gos/websocket-client/src/Wamp/Client.php line 191

mbabker commented 4 years ago

It comes from here, it looks like something is creating a malformatted response.

Gus19 commented 4 years ago

Since it's deprecated, I think we can close this issue and this question is no longer relevant.

But I have an other one now ! I see the question on issue #417 but I don't have an answer ... How I push data on my websocket with symfony/messenger ?

mbabker commented 4 years ago

Make a MessageHandlerInterface that sends a message to the websocket HTTP endpoint. All the websocket stuff at this part of your stack should be services anyway and usable no matter the context, the Messenger recommendation is really more geared toward the AMQP implementation than the WAMP implementation of the pushers.

<?php declare(strict_types=1);

namespace App\Messenger\Handler;

use App\Messenger\Message\WebsocketTopicPublish;
use App\Websocket\Client\WebsocketClient;
use Symfony\Component\Messenger\Handler\MessageHandlerInterface;

final class BroadcastMessageToWebsocketServer implements MessageHandlerInterface
{
    private WebsocketClient $websocketClient;

    public function __construct(WebsocketClient $websocketClient)
    {
        $this->websocketClient = $websocketClient;
    }

    public function __invoke(WebsocketTopicPublish $command): void
    {
        $this->websocketClient->publish($command->payload, $command->route);
    }
}

So now you're asking what my WebsocketClient is, aren't you?

<?php declare(strict_types=1);

namespace App\Websocket\Client;

use Gos\Bundle\WebSocketBundle\Router\WampRouter;
use Psr\Log\LoggerAwareInterface;
use Psr\Log\LoggerAwareTrait;
use Ratchet\Client\WebSocket;

final class WebsocketClient implements LoggerAwareInterface
{
    use LoggerAwareTrait;

    private const MSG_WELCOME = 0;
    private const MSG_PREFIX = 1;
    private const MSG_CALL = 2;
    private const MSG_CALL_RESULT = 3;
    private const MSG_CALL_ERROR = 4;
    private const MSG_SUBSCRIBE = 5;
    private const MSG_UNSUBSCRIBE = 6;
    private const MSG_PUBLISH = 7;
    private const MSG_EVENT = 8;

    private WampRouter $router;
    private PawlClient $client;

    public function __construct(WampRouter $router, PawlClient $client)
    {
        $this->router = $router;
        $this->client = $client;
    }

    /**
     * Calls a RPC handler on the websocket server.
     *
     * @throws \InvalidArgumentException if an invalid $data type was given
     */
    public function call(array $payload, string $routeName, array $routeParameters = []): void
    {
        $this->client->connect()
            ->then(
                function (WebSocket $connection) use ($data, $routeName, $routeParameters) {
                    $this->logger->debug('Client connection resolved');

                    $route = $this->router->generate($routeName, $routeParameters);

                    $this->logger->debug('Calling RPC function on websocket server', ['route' => $route, 'payload' => $payload]);

                    try {
                        $message = json_encode(array_merge([self::MSG_CALL, uniqid('', true), $route], $payload), JSON_THROW_ON_ERROR);

                        $connection->send($message);
                    } catch (\JsonException $exception) {
                        $this->logger->error('Could not encode message to call RPC function on websocket server.', ['exception' => $exception]);

                        throw $exception;
                    } finally {
                        $connection->close();
                    }
                },
                function (\Throwable $throwable): void {
                    $this->logger->error('Client connection rejected', ['exception' => $throwable]);
                }
            );
    }

    /**
     * Publishes to a Topic on the websocket server.
     *
     * @throws \InvalidArgumentException if an invalid $data type was given
     */
    public function publish(array $payload, string $routeName, array $routeParameters = []): void
    {
        $this->client->connect()
            ->then(
                function (WebSocket $connection) use ($data, $routeName, $routeParameters) {
                    $this->logger->debug('Client connection resolved');

                    $route = $this->router->generate($routeName, $routeParameters);

                    $this->logger->debug('Publishing message to websocket server', ['route' => $route, 'payload' => $payload]);

                    try {
                        $message = json_encode([self::MSG_PUBLISH, $route, $payload, [], []], JSON_THROW_ON_ERROR);

                        $connection->send($message);
                    } catch (\JsonException $exception) {
                        $this->logger->error('Could not encode message to publish to websocket server.', ['exception' => $exception]);

                        throw $exception;
                    } finally {
                        $connection->close();
                    }
                },
                function (\Throwable $throwable): void {
                    $this->logger->error('Client connection rejected', ['exception' => $throwable]);
                }
            );
    }
}

And what about the PawlClient? It's a very thin wrapper around https://github.com/ratchetphp/Pawl to help with being able to unit test things since Pawl's API is function driven.

<?php declare(strict_types=1);

namespace App\Websocket\Client;

use Ratchet\Client;
use React\Promise\PromiseInterface;

final class PawlClient
{
    private string $websocketServerUri;

    public function __construct(string $websocketServerUri)
    {
        $this->websocketServerUri = $websocketServerUri;
    }

    public function connect(): PromiseInterface
    {
        return Client\connect($this->websocketServerUri, ['wamp']);
    }
}
Gus19 commented 4 years ago

Thank you very much, I will test that !

Gus19 commented 4 years ago

I have a new question, I think my "websocketServerUri" parameter is not good, because there is not an error but the then function is never call. Did I miss something ?

I set "wss://127.0.0.1:8888", here is my config (I use a ssl)

gos_web_socket:
  server:
    port: 8888        
    host: 127.0.0.1

Into the log, I have this, no errors but no messages from the then function.

17:55:24 INFO      [messenger] Received message Gus\GameBundle\Services\Websocket\WebsocketMessage ["message" => Gus\GameBundle\Services\Websocket\WebsocketMessage^ { …},"class" => "Gus\GameBundle\Services\Websocket\WebsocketMessage"]
17:55:24 INFO      [app] into publish ...
17:55:24 INFO      [app] connect PawlClient wss://127.0.0.1
17:55:24 INFO      [messenger] Message Gus\GameBundle\Services\Websocket\WebsocketMessage handled by Gus\GameBundle\Services\Websocket\WebsocketMessageHandler::__invoke ["message" => Gus\GameBundle\Services\Websocket\WebsocketMessage^ { …},"class" => "Gus\GameBundle\Services\Websocket\WebsocketMessage","handler" => "Gus\GameBundle\Services\Websocket\WebsocketMessageHandler::__invoke"]
17:55:24 INFO      [messenger] Gus\GameBundle\Services\Websocket\WebsocketMessage was handled successfully (acknowledging to transport). ["message" => Gus\GameBundle\Services\Websocket\WebsocketMessage^ { …},"class" => "Gus\GameBundle\Services\Websocket\WebsocketMessage"]

If I set an other value for websocketServerUri, I have the attended error message : 17:56:15 ERROR [app] Client connection rejected ["exception" => InvalidArgumentException { …}]

Edit : and the dump of the Promise :

^ React\Promise\Promise^ {#447
  -canceller: Closure($_, $reject)^ {#458
    class: "Ratchet\Client\Connector"
    this: Ratchet\Client\Connector {#451 …}
    use: {
      $url: "ws://127.0.0.1:8888"
      $connecting: React\Promise\Promise {#483 …}
    }
  }
  -result: null
  -handlers: []
  -progressHandlers: & []
  -requiredCancelRequests: 0
  -cancelRequests: 0
}
mbabker commented 4 years ago

The URI looks right, without knowing what the InvalidArgumentException is it's hard to say what else is going on in there.

Gus19 commented 4 years ago

I have the InvalidArgumentException if I set "127.0.0.1" for example. If I set the Uri "wss://127.0.0.1:8888" I don't have an error, that's why I don't understand what is happening.

Gus19 commented 4 years ago

I don't know why the Client\connect doesn't work but if I use the Connector class, it's fine !

This is my final class :

use Gos\Bundle\WebSocketBundle\Router\WampRouter;
use Psr\Log\LoggerAwareInterface;
use Psr\Log\LoggerAwareTrait;
use Ratchet\Client\Connector;
use Ratchet\Client\WebSocket;
use React\EventLoop\Factory;

final class WebsocketClient implements LoggerAwareInterface {
  use LoggerAwareTrait;

  private const MSG_WELCOME = 0;
  private const MSG_PREFIX = 1;
  private const MSG_CALL = 2;
  private const MSG_CALL_RESULT = 3;
  private const MSG_CALL_ERROR = 4;
  private const MSG_SUBSCRIBE = 5;
  private const MSG_UNSUBSCRIBE = 6;
  private const MSG_PUBLISH = 7;
  private const MSG_EVENT = 8;

  private WampRouter $router;
  private string $websocketServerUri;

  public function __construct(string $websocketServerUri, WampRouter $router) {
    $this->router = $router;
    $this->websocketServerUri = $websocketServerUri;
  }

  /**
   * Publishes to a Topic on the websocket server.
   * @throws \InvalidArgumentException if an invalid $data type was given
   */
  public function publish(array $payload, string $routeName, array $routeParameters = []): void {
    $loop = Factory::create();
    $connector = new Connector($loop);

    $connector($this->websocketServerUri, ['wamp'])
      ->then(function(WebSocket $connection) use ($payload, $routeName, $routeParameters) {
        $this->logger->info('Client connection resolved');

        $route = $this->router->generate($routeName, $routeParameters);

        $this->logger->info('Publishing message to websocket server', [
          'route' => $route,
          'payload' => $payload
        ]);

        try {
          $message = json_encode([
            self::MSG_PUBLISH,
            $route,
            $payload,
            [],
            []
          ], JSON_THROW_ON_ERROR);

          $connection->send($message);
        }
        catch(\JsonException $exception) {
          $this->logger->error('Could not encode message to publish to websocket server.', ['exception' => $exception]);
          throw $exception;
        }
        finally {
          $connection->close();
        }
      }, function(\Throwable $throwable): void {
        $this->logger->error('Client connection rejected', ['exception' => $throwable]);
      }
    );
    $loop->run();;
  }
}

Thanks again for your help !

ma-gu-16 commented 3 years ago
WebsocketTopicPublish

thx for this example, what is WebsocketTopicPublish ?

mbabker commented 3 years ago

It's just a message DTO for use with Messenger, it has the same public properties as the client's publish method arguments.

benrcole commented 1 year ago

This thread has really helped me out today, but I am having a strange problem with Symfony's Messenger component.

I'm using a very close variation of the classes defined above (https://github.com/GeniusesOfSymfony/WebSocketBundle/issues/405#issuecomment-691281586).

Watching the log output of bin/console messenger:consume I can see the handler receive the Message from the bus and pass the Message to my handler but for some reason the handler doesn't connect to my socket server until I kill the messenger:consume job at the CLI. Once the worker has been told to stop the handler will connect and send each message that the handler had been sent.

Has anyone seen this behaviour? Is there a configuration message that I am missing somehow?