sammchardy / python-kucoin

Kucoin REST and Websocket API python implementation
https://python-kucoin.readthedocs.io/en/latest/
MIT License
354 stars 150 forks source link

Websockets variable declaration #53

Open nielskool opened 5 years ago

nielskool commented 5 years ago

I run python 3.5.2

When I try to use the websocket features it raises an error at: MAX_RECONNECTS: int = 5

Is there something I need for python to allow for such variable declarations?

//edit On windows python version 3.6.2 it works. Ubuntu doesn't have the newer python version in their standard packages (yet). Trying to get python through other means on the ubuntu server, but websockets import is not comming through then....

//edit2 I ended up changing all the newer python syntax of asyncio/websockets.py

import asyncio
import json
import logging
import time
from random import random
from typing import Dict, Callable, Awaitable, Optional

import websockets as ws

from kucoin.client import Client

class ReconnectingWebsocket:

    MAX_RECONNECTS = 5
    MAX_RECONNECT_SECONDS = 60
    MIN_RECONNECT_WAIT = 0.1
    TIMEOUT = 10
    PROTOCOL_VERSION = '1.0.0'

    def __init__(self, loop, client: Client, coro, private: bool = False):
        self._loop = loop
        self._log = logging.getLogger(__name__)
        self._coro = coro
        self._reconnect_attempts = 0
        self._conn = None
        self._ws_details = None
        self._connect_id = None
        self._client = client
        self._private = private
        self._socket = None

        self._connect()

    def _connect(self):
        self._conn = asyncio.ensure_future(self._run())

    async def _run(self):

        keep_waiting = True

        # get the websocket details
        self._ws_details = None
        self._ws_details = self._client.get_ws_endpoint(self._private)

        async with ws.connect(self._get_ws_endpoint(), ssl=self._get_ws_encryption()) as socket:
            self._socket = socket
            self._reconnect_attempts = 0

            try:
                while keep_waiting:
                    try:
                        evt = await asyncio.wait_for(self._socket.recv(), timeout=self._get_ws_pingtimeout())
                    except asyncio.TimeoutError:
                        self._log.debug("no message in {} seconds".format(self._get_ws_pingtimeout()))
                        await self.send_ping()
                    except asyncio.CancelledError:
                        self._log.debug("cancelled error")
                        await self._socket.ping()
                    else:
                        try:
                            evt_obj = json.loads(evt)
                        except ValueError:
                            pass
                        else:
                            await self._coro(evt_obj)
            except ws.ConnectionClosed:
                keep_waiting = False
                await self._reconnect()
            except Exception as e:
                self._log.debug('ws exception:{}'.format(e))
                keep_waiting = False
                await self._reconnect()

    def _get_ws_endpoint(self) -> str:
        if not self._ws_details:
            raise Exception("Unknown Websocket details")

        self._ws_connect_id = str(int(time.time() * 1000))
        token = self._ws_details['token']
        endpoint = self._ws_details['instanceServers'][0]['endpoint']

        #ws_endpoint = f"{endpoint}?token={token}&connectId={self._ws_connect_id}"
        ws_endpoint = "%s?token=%s&connectId=%s" %(endpoint, token, self._ws_connect_id)
        return ws_endpoint

    def _get_ws_encryption(self) -> bool:
        if not self._ws_details:
            raise Exception("Unknown Websocket details")

        return self._ws_details['instanceServers'][0]['encrypt']

    def _get_ws_pingtimeout(self) -> int:

        if not self._ws_details:
            raise Exception("Unknown Websocket details")

        ping_timeout = int(self._ws_details['instanceServers'][0]['pingTimeout'] / 1000) - 1
        return ping_timeout

    async def _reconnect(self):
        await self.cancel()
        self._reconnect_attempts += 1
        if self._reconnect_attempts < self.MAX_RECONNECTS:

            #self._log.debug(f"websocket reconnecting {self.MAX_RECONNECTS - self._reconnect_attempts} attempts left")
            self._log.debug("websocket reconnecting %s attempts left") %(self.MAX_RECONNECTS - self._reconnect_attempts)
            reconnect_wait = self._get_reconnect_wait(self._reconnect_attempts)
            await asyncio.sleep(reconnect_wait)
            self._connect()
        else:
            # maybe raise an exception
            #self._log.error(f"websocket could not reconnect after {self._reconnect_attempts} attempts")
            self._log.error("websocket could not reconnect after %s attempts") %(self._reconnect_attempts)
            pass

    def _get_reconnect_wait(self, attempts: int) -> int:
        expo = 2 ** attempts
        return round(random() * min(self.MAX_RECONNECT_SECONDS, expo - 1) + 1)

    async def send_ping(self):
        msg = {
            'id': str(int(time.time() * 1000)),
            'type': 'ping'
        }
        await self._socket.send(json.dumps(msg))

    async def send_message(self, msg, retry_count=0):
        if not self._socket:
            if retry_count < 5:
                await asyncio.sleep(1)
                await self.send_message(msg, retry_count + 1)
        else:
            msg['id'] = str(int(time.time() * 1000))
            msg['privateChannel'] = self._private
            await self._socket.send(json.dumps(msg))

    async def cancel(self):
        try:
            self._conn.cancel()
        except asyncio.CancelledError:
            pass

class KucoinSocketManager:

    def __init__(self):
        """Initialise the IdexSocketManager

        """
        #self._callback: Callable[[int], Awaitable[str]]
        self._conn = None
        self._loop = None
        self._client = None
        self._private = False
        self._log = logging.getLogger(__name__)

    @classmethod
    async def create(cls, loop, client: Client, callback: Callable[[int], Awaitable[str]], private: bool = False):
        self = KucoinSocketManager()
        self._loop = loop
        self._client = client
        self._private = private
        self._callback = callback
        self._conn = ReconnectingWebsocket(loop, client, self._recv, private)
        return self

    async def _recv(self, msg: Dict):
        if 'data' in msg:
            await self._callback(msg)

    async def subscribe(self, topic: str):
        """Subscribe to a channel

        :param topic: required
        :returns: None

        Sample ws response

        .. code-block:: python

            {
                "type":"message",
                "topic":"/market/ticker:BTC-USDT",
                "subject":"trade.ticker",
                "data":{
                    "sequence":"1545896668986",
                    "bestAsk":"0.08",
                    "size":"0.011",
                    "bestBidSize":"0.036",
                    "price":"0.08",
                    "bestAskSize":"0.18",
                    "bestBid":"0.049"
                }
            }

        Error response

        .. code-block:: python

            {
                'code': 404,
                'data': 'topic /market/ticker:BTC-USDT is not found',
                'id': '1550868034537',
                'type': 'error'
            }

        """

        req_msg = {
            'type': 'subscribe',
            'topic': topic,
            'response': True
        }

        await self._conn.send_message(req_msg)

    async def unsubscribe(self, topic: str):
        """Unsubscribe from a topic

        :param topic: required

        :returns: None

        Sample ws response

        .. code-block:: python

            {
                "id": "1545910840805",
                "type": "ack"
            }

        """

        req_msg = {
            'type': 'unsubscribe',
            'topic': topic,
            'response': True
        }

        await self._conn.send_message(req_msg)
sammchardy commented 5 years ago

Hi @nielskool I am removing the typing annotations in v2.1.2 for better backward compatibility