crossbario / autobahn-python

WebSocket and WAMP in Python for Twisted and asyncio
https://crossbar.io/autobahn
MIT License
2.47k stars 763 forks source link

Start a Client without blocking #1603

Closed melardev closed 1 year ago

melardev commented 1 year ago

Hi, I have an issue that is taking me some time, it is basic but I could not find any example addressing it.

In my app I wrapped the autobahn code around a class(let's call it AutobahnWsClient) which in turn implements an interface(let's call it IWSLayer), the reason is that I don't want my app to be tied to a web socket framework, I could swap one framework by the other just replacing the implementation. The consumer of the wrapper class, should not know about asyncio, autobahn and of course not even main app logic, it should just do networking stuff. In all examples, the main logic is in the client itself, which is in my case not ideal. And so I need to be able to connect to the websocket but without blocking, I need to give the execution of the code back to the consumer,

how do I do it? The only option I see is using threads which I believe is not that right. Also, I read in #1416 and it indicates the connections must be opened in the same thread. My code however seems to work, perhaps that reply is outdated and no longer applicable?

Is there a "more idiomatic" way of achieving this, please?


import abc
import asyncio
import threading
import time
from typing import List

from autobahn.asyncio import WebSocketClientFactory, WebSocketClientProtocol

class IWSEventListener(abc.ABC):

    @abc.abstractmethod
    def on_open(self):
        raise NotImplemented()

class IWSLayer(abc.ABC):
    @abc.abstractmethod
    def add_listener(self, cb):
        raise NotImplemented()

    @abc.abstractmethod
    def run(self, endpoint: str, blocking: bool):
        raise NotImplemented()

    @abc.abstractmethod
    def send_message(self, message: str):
        raise NotImplemented()

class AutobahnWsClient(IWSLayer, WebSocketClientProtocol):
    def __init__(self):
        super().__init__()
        self.ws_listeners: List[IWSEventListener] = []

    def add_listener(self, cb):
        self.ws_listeners.append(cb)

    def _run(self, endpoint):
        loop = asyncio.new_event_loop()
        asyncio.set_event_loop(loop)
        factory = WebSocketClientFactory(endpoint)
        # protocol field must be a callable object
        # since we don't want to provide the class as protocol field
        # because it would create one object of such class but without giving us
        # the opportunity to pass arguments, then we create an object ourselves, passing all
        # arguments we want, and implementing the __call__ so when it is called by autobahn
        # framework, we return the instance we already created

        factory.protocol = lambda: self
        coroutine = loop.create_connection(factory, 'fstream.binance.com', 443, ssl=True)

        loop.run_until_complete(coroutine)
        loop.run_forever()

    def run(self, endpoint: str, blocking: bool):
        if blocking:
            self._run(endpoint)
        else:
            t = threading.Thread(target=self._run, args=(endpoint,))
            t.run()

    def send_message(self, message: str):
        self.sendMessage(message.encode())

    def onOpen(self):
        print('OnOpen')
        for wl in self.ws_listeners:
            wl.on_open()

    def onMessage(self, payload, isBinary):
        if isBinary:
            print(f'OnMessage: {payload.decode("utf-8")}')
        else:
            print(f'OnMessage: {payload}')

    def onClose(self, wasClean, code, reason):
        print('OnClose')

class App(IWSEventListener):
    # Main app login
    # notice how the app does not know, nor has to know anything about the WebSocket framework used.
    # Swaping one framework for another one should be as easy as changing the self.client implementation
    def __init__(self):
        self.client: IWSLayer = AutobahnWsClient()

    def on_open(self):
        print('OnConnect')
        self.client.send_message('{"method": "SUBSCRIBE", "params": ["!ticker@arr"], "id": 1}')

    def run(self):
        self.client.add_listener(app)
        self.client.run('wss://fstream.binance.com/ws', blocking=True)

app = App()
app.run()

while True:
    time.sleep(5)
melardev commented 1 year ago

By the way, this indirection may seem unnecessary, but actually, the App class in my project does not even know the shape of the WebSocket subscribe message, here it does to keep it simple. I used to have another abstraction layer on top of the AutobahnWsClient and that's what the App has a reference to, it is what actually knows how WebSocket messages look like, the App is only notified when data (after decoding and formatting it by the abstraction layer) is available, this "architecture" is very common and that's the thing I want to do.

oberstet commented 1 year ago

Autobahn already is non-blocking, so you don't have to do anything extra, just use it. Or fork it!

melardev commented 1 year ago

Autobahn already is non-blocking, so you don't have to do anything extra, just use it. Or fork it!

Hi. Yes I know, what I am trying to say is if someone could help me figure out some way of using autobahn to get my question done, please. It is not anything very complex and specific to my app it is just a general concept that I think many would benefit from too.

oberstet commented 1 year ago

the reason is that I don't want my app to be tied to a web socket framework, I could swap one framework by the other just replacing the implementation

tbh, I don't believe it makes sense or is a well-defined goal, but in general, I'd have a look at the "sans IO" approach https://sans-io.readthedocs.io/