danpaquin / coinbasepro-python

The unofficial Python client for the Coinbase Pro API
MIT License
1.82k stars 737 forks source link

Websocket in "user" mode drops connection #256

Open SorenJ89 opened 6 years ago

SorenJ89 commented 6 years ago

when testing the websocket in user mode, the websocket seems to drop the connection after 50 seconds of inactivity (the keep alive solution does not kick in)

First of all, i had to implement this change to the websocket client to get authentication to work in py3.5 ( https://github.com/danpaquin/gdax-python/pull/219/files )

Secondly, i shamelessly copied from here to get a working test of the websocket ( https://github.com/danpaquin/gdax-python/issues/205 )

I basically connect to the user websocket, wait 10X seconds (x in range(10)), make a small order far away from the current price and check if my websocket hears it. `
import time import json import base64 import hmac import gdax import hashlib import time from threading import Thread from websocket import create_connection, WebSocketConnectionClosedException from pymongo import MongoClient from gdax.gdax_auth import get_auth_headers from myKeys import

class WebsocketClient(object):
    def __init__(self, url="wss://ws-feed.gdax.com", products=None, message_type="subscribe", mongo_collection=None,
                 should_print=True, auth=False, api_key="", api_secret="", api_passphrase="", channels=None):
        self.url = url
        self.products = products
        self.channels = channels
        self.type = message_type
        self.stop = False
        self.error = None
        self.ws = None
        self.thread = None
        self.auth = auth
        self.api_key = api_key
        self.api_secret = api_secret
        self.api_passphrase = api_passphrase
        self.should_print = should_print
        self.mongo_collection = mongo_collection

    def start(self):
        def _go():
            self._connect()
            self._listen()
            self._disconnect()

        self.stop = False
        self.on_open()
        self.thread = Thread(target=_go)
        self.thread.start()

    def _connect(self):
        if self.products is None:
            self.products = ["BTC-USD"]
        elif not isinstance(self.products, list):
            self.products = [self.products]

        if self.url[-1] == "/":
            self.url = self.url[:-1]

        if self.channels is None:
            sub_params = {'type': 'subscribe', 'product_ids': self.products}
        else:
            sub_params = {'type': 'subscribe', 'product_ids': self.products, 'channels': self.channels}

        if self.auth:
            timestamp = str(time.time())
            message = timestamp + 'GET' + '/users/self/verify'
            message = message.encode('ascii')
            hmac_key = base64.b64decode(self.api_secret)
            signature = hmac.new(hmac_key, message, hashlib.sha256)
            #signature_b64 = signature.digest().encode('base64').rstrip('\n')
            signature_b64 = base64.b64encode(signature.digest()).decode('utf-8').rstrip('\n')
            sub_params['signature'] = signature_b64
            sub_params['key'] = self.api_key
            sub_params['passphrase'] = self.api_passphrase
            sub_params['timestamp'] = timestamp

        self.ws = create_connection(self.url)
        self.ws.send(json.dumps(sub_params))

        if self.type == "heartbeat":
            sub_params = {"type": "heartbeat", "on": True}
        else:
            sub_params = {"type": "heartbeat", "on": False}
        self.ws.send(json.dumps(sub_params))

    def _listen(self):
        while not self.stop:
            try:
                if int(time.time() % 30) == 0:
                    # Set a 30 second ping to keep connection alive
                    self.ws.ping("keepalive")
                    print('keeping alive')
                print('before recv')
                data = self.ws.recv()
                print('after recv')
                msg = json.loads(data)
            except ValueError as e:
                print('value exception')
                print(e)
                self.on_error(e)
            except Exception as e:
                print('other exception')
                print(e)
                self.on_error(e)
            else:
                self.on_message(msg)
                print('message:')
                print(msg)

    def _disconnect(self):
        if self.type == "heartbeat":
            self.ws.send(json.dumps({"type": "heartbeat", "on": False}))
        try:
            if self.ws:
                self.ws.close()
        except WebSocketConnectionClosedException as e:
            pass

        self.on_close()

    def close(self):
        self.stop = True
        self.thread.join()

    def on_open(self):
        if self.should_print:
            print("-- Subscribed! --\n")

    def on_close(self):
        if self.should_print:
            print("\n-- Socket Closed --")

    def on_message(self, msg):
        if self.should_print:
            print(msg)
        if self.mongo_collection:  # dump JSON to given mongo collection
            self.mongo_collection.insert_one(msg)

    def on_error(self, e, data=None):
        self.error = e
        self.stop = True
        print('{} - data: {}'.format(e, data))

class BotSocket(WebsocketClient):

    def __init__(self, product=None, key=None, secret=None, passphrase=None, channels=None):
        super(BotSocket, self).__init__(products=product, channels=channels)

    def on_open(self):
        global auth_key, auth_secret, auth_passphrase
        self.url = "wss://ws-feed.gdax.com/"
        self.products = ['BTC-EUR']
        self.channels = ['user']
        self.auth = True
        self.api_key = auth_key
        self.api_secret = auth_secret
        self.api_passphrase = auth_passphrase
        print("-- Starting Bot Socket --")

    def on_message(self, msg):
        print(msg)

    def on_close(self):
        self.stop = 1
        print("-- Terminating Bot Socket --")

auth_client = gdax.AuthenticatedClient(auth_key, auth_secret, auth_passphrase)
for i in range(10):
    print('\n\n {}'.format(i))
    currency = ["BTC-EUR", "LTC-EUR", "ETH-EUR", "BCH-EUR", "LTC-BTC", "ETH-BTC", "BCH-BTC"]
    ws = BotSocket(product=currency, key=auth_key, secret=auth_secret, passphrase=auth_passphrase, channels=["user",])
    ws.start()
    time.sleep(i*10)
    print('buying...\n')
    id = auth_client.sell(price = '200000', size='0.001', product_id = 'BTC-EUR',post_only=True)
    print('cancelling...\n')
    auth_client.cancel_all('BTC-EUR')
    ws.close()

`

After the 5th iteration i get this message, suggesting the connection has been dropped:

5 -- Starting Bot Socket -- before recv after recv {'type': 'subscriptions', 'channels': [{'name': 'user', 'product_ids': ['BTC-EUR']}]} before recv buying...

after recv {'price': '200000.00000000', 'size': '0.00100000', 'order_id': '6ab9f51f-8d23-48cb-b1d4-ae7e0cb3fda5', 'user_id': '5a20327cf37fc900bbdfd233', 'product_id': 'BTC-EUR', 'profile_id': '5ce40bd7-4de8-4837-9d1f-61cede8c8c53', 'type': 'received', 'time': '2018-02-22T10:35:17.954000Z', 'sequence': 3393282689, 'order_type': 'limit', 'side': 'sell'} cancelling... before recv

after recv {'price': '200000.00000000', 'product_id': 'BTC-EUR', 'order_id': '6ab9f51f-8d23-48cb-b1d4-ae7e0cb3fda5', 'user_id': '5a20327cf37fc900bbdfd233', 'time': '2018-02-22T10:35:17.954000Z', 'profile_id': '5ce40bd7-4de8-4837-9d1f-61cede8c8c53', 'type': 'open', 'remaining_size': '0.00100000', 'sequence': 3393282690, 'side': 'sell'} before recv after recv {'price': '200000.00000000', 'reason': 'canceled', 'side': 'sell', 'order_id': '6ab9f51f-8d23-48cb-b1d4-ae7e0cb3fda5', 'user_id': '5a20327cf37fc900bbdfd233', 'time': '2018-02-22T10:35:18.313000Z', 'profile_id': '5ce40bd7-4de8-4837-9d1f-61cede8c8c53', 'type': 'done', 'remaining_size': '0.00100000', 'sequence': 3393282730, 'product_id': 'BTC-EUR'} -- Terminating Bot Socket --

6 -- Starting Bot Socket -- before recv after recv {'type': 'subscriptions', 'channels': [{'name': 'user', 'product_ids': ['BTC-EUR']}]} before recv buying...

other exception Connection is already closed. Connection is already closed. - data: None -- Terminating Bot Socket -- cancelling...

What i think is going on is that the "self.ws.recv()" commands stops our code from running further until something is heard, which in turn prevents the "keepalive" from being sent.

smilerz commented 6 years ago

Do you have a suggested fix? I've seen similar behavior but haven't spent the time to identify root cause yet.

SorenJ89 commented 6 years ago

Sadly not.. i'm still somewhat new at python.

smilerz commented 6 years ago

I'm reasonably certain that ws.run_forever can be used instead of manual pings, but I'm not sure 100% how to incorporate it or what else needs changed, I haven't found the time to start playing with it yet. My initial attempt will be something like this and removing all of the hardcoded keepalive messages.

    def start(self):
        def _go():
            self._connect()
            self._listen()
            self._disconnect()

        self.stop = False
        self.on_open()
        self.run_forever(ping_interval=30)
        self.thread = Thread(target=_go)
        self.thread.daemon=True
        self.thread.start()
robertdelacruz1551 commented 6 years ago

Based on smilerz recommendation I was able to solve this issue by adding this method and altering the _listen method keep a bit: ` def keepalive(self, interval=30): while not self.stop: if self.ws: self.ws.ping('keepalive') time.sleep(interval)

def _listen(self):
    Thread(target=self.keepalive).start() # <=== Add this before the loop
    while not self.stop:
        try:
            # if time.time() - start_t >= 15:
            #     # Set a 30 second ping to keep connection alive
            #     self.ws.ping("keepalive")
            #     start_t = time.time()
            data = self.ws.recv()
            msg = json.loads(data)
        except ValueError as e:
            self.on_error(e)
        except Exception as e:
            self.on_error(e)
        else:
            self.on_message(msg)

`

royitaqi commented 6 years ago

Has anyone checked in this fix? If not I'm planning to implement as @robertdelacruz1551 suggested.

danpaquin commented 6 years ago

I believe this is related to #264 -- let me know if you find a fix in the meantime.

royitaqi commented 6 years ago

@danpaquin Didn't @robertdelacruz1551 propose a fix just above my last comment?

royitaqi commented 6 years ago

@danpaquin I actually think this post has the root cause rather than #264 . That post only contains a line that is buggy but will not solve the problem. I think this issue (#256) should be re-opened, and the other closed to point to this. And this is not exactly a dup, I'd say.

danpaquin commented 6 years ago

This issue is not closed. Rather than opening another thread in the class, I would rather set start_t = 0 before the while loop and un-comment the if loop as stated in #264.

mcardillo55 commented 6 years ago

@deanblacc touched on this issue in #271. His solution involves dropping the pinger and focusing on reconnecting logic.

royitaqi commented 6 years ago

@danpaquin As long as the ping happens within the while loop which is blocked by recv(), there is a chance that the next message never arrives, blocking us from sending out a ping. This is also mentioned in #271. So, if we want to use ping to keep the connection alive, I think we need to move the ping out of the while loop, as suggested by @robertdelacruz1551 above.

@mcardillo55 Thank you for the pointer. I agree with his analysis of the problem. However, when comparing two solutions I find the solution here better, since it keeps the connection alive, which means we won't miss any socket messages. The solution suggested in #271 almost guarantees to miss some messages (especially when we use the 'user' channel which might not have frequent messages). Since it will take additional effort to treat potentially missing messages (out side this class, probably in application code), I prefer the solution here. Otherwise you will have to constantly get snapshots from rest API and compare to what you have in the book, which kind of negates the purpose of a websocket. @deanblacc what do you think?

danpaquin commented 6 years ago

You're right @royitaqi -- Robert's solution is much better in this case. I overlooked the scenario where messages are received slowly.

kudovickij commented 6 years ago

I propose a small modification to @robertdelacruz1551 implementation. This changes one long sleep to frequent sleep sessions that allow behavior interrupts.

def keepalive(self, interval=30):
    while not self.stop:
        if self.ws:
            self.ws.ping('keepalive')
            intervalCounter = 0
            while intervalCounter < interval:
                if self.stop:
                    return
                intervalCounter += 1
                time.sleep(1)

I am writing a tool that allows the client to start and stop sessions frequently. The current implementation is extremely tedious to wait on due to the above delay and the behavior of recv. Aside from the change above, I propose additional fixes that are somewhat related (should probably be outside of this issue, @danpaquin please take a look)

Adding abort to the websocket pulls it out from the internals of recv. At this point I think it is reasonable to assume that we are not interested in any more data.

def close(self):
    self.stop = True
    self.ws.abort()
    self.thread.join()

Additionally, I joined the keepalive thread in _listen. Not sure if destructor does it automatically. If it is handled intrinsically, ignore this change.

keeper = Thread(target=self.keepalive)
keeper.start()
...
keeper.join()
shahzak commented 6 years ago

Is this fix going to be pushed? Thanks.

danpaquin commented 6 years ago

I like the solution proposed by @kudovickij.

@shahzak please point me in the direction of the PR with this fix and I'll merge it as soon as possible.

robertdelacruz1551 commented 6 years ago

One more suggestion is to reconnect the websocket if a disconnect occurs. Even with this solution I’m experiencing dropped connection.

kudovickij commented 6 years ago

I can confirm that the issue persists. Reconnecting is undesirable, because now you are stuck with potential "holes" in data listening, resulting in missing information, that can critically affect the application's purpose.

uclatommy commented 5 years ago

I've sent a proposed fix for this issue. I arrived at a solution very similar to @robertdelacruz1551.

@robertdelacruz1551, I think your solution is solid. Mine is very similar and I am not experiencing additional dropped connections. I think you may be continuing to experience dropped connections simply due to internet connectivity issues. So I believe your proposal to implement some reconnection logic is reasonable, however, this should probably be implemented in client-side code and maybe not in the module.

I believe it is reasonable for us to expect module users to implement their own logic for how to fill in gaps because the module cannot anticipate what data the user is interested in retaining from the feeds and therefore cannot adequately implement a way to fill in the holes.

danpaquin commented 5 years ago

337 has been merged thanks to @uclatommy

I'm leaving this open for 30 days to await feedback.

noah222 commented 5 years ago

I've been running my own web-socket routine in a separate thread. I re-subscribe every minute and have a 10 second timeout on the receive command. This has worked very reliably for dealing with the price feed. I do not use the web-socket for anything else because of the possibility of missed messages. So I poll for fills and order status while getting continuous price updates via the feed. I am no expert with python but doing it this way has been very stable.

TristanJM commented 5 years ago

@danpaquin @uclatommy the websocket fix in PR https://github.com/danpaquin/coinbasepro-python/pull/337 is working well and I've not been experiencing any issues.

When will you publish this new patch version to PyPI?

TristanJM commented 5 years ago

Actually, PR https://github.com/danpaquin/coinbasepro-python/pull/337 has now introduced a bug that the websocket won't gracefully exit on Keyboard Interrupt.

Eg.

^C[SSL: DECRYPTION_FAILED_OR_BAD_RECORD_MAC] decryption failed or bad record mac 

The stack trace is lengthy, but I won't paste it here as it's very easy to replicate (just keyboard interrupt the running websocket_client.py script)

@uclatommy Any idea how to fix this?

uclatommy commented 5 years ago

@TristanJM, I am not able to reproduce. These are the steps I used in my test:

  1. Clone latest master from github:
    $ mkdir cbpro
    $ cd cbpro
    $ git clone https://github.com/danpaquin/coinbasepro-python.git 
  2. Instance a fresh virtualenv in the same directory:
    $ virtualenv .
  3. Install the repo into the virtualenv as a module:
    $ pip install --editable ./coinbasepro-python
  4. Run websocket_client.py, then ctrl+c to send KeyboardInterrupt signal:
    $ python coinbasepro-python/cbpro/websocket_client.py
    ... <bunch of output from websocket printed here> ...
    ^C
    $

    It exits cleanly for me. I'm using Python 3.7 on macOS Mojave 10.14.2

Note: It takes a few seconds (like 20) for the script to exit. I noticed that I can produce some exceptions if I press ctrl+c a second time before the script exits, but it looks different from your stack trace:

^CException ignored in: <module 'threading' from '/usr/local/Cellar/python/3.7.0/Frameworks/Python.framework/Versions/3.7/lib/python3.7/threading.py'>
Traceback (most recent call last):
  File "/usr/local/Cellar/python/3.7.0/Frameworks/Python.framework/Versions/3.7/lib/python3.7/threading.py", line 1273, in _shutdown
    t.join()
  File "/usr/local/Cellar/python/3.7.0/Frameworks/Python.framework/Versions/3.7/lib/python3.7/threading.py", line 1032, in join
    self._wait_for_tstate_lock()
  File "/usr/local/Cellar/python/3.7.0/Frameworks/Python.framework/Versions/3.7/lib/python3.7/threading.py", line 1048, in _wait_for_tstate_lock
    elif lock.acquire(block, timeout):
KeyboardInterrupt
TristanJM commented 5 years ago

@uclatommy Thanks for looking into this. Everything works fine when following your steps above.

I run into an issue when I pip install cbpro into a virtualenv, and then replace the old contents of websocket_client.py with the new code from github. I assumed this would work but I suppose I'm missing a trick here. This is the error that comes from it:

Closing order book socket...
[Errno 9] Bad file descriptor - data: None
-- OrderBook Socket Closed --
-- OrderBook Socket Closed --
Segmentation fault: 11

TL;DR The repo runs perfectly well and it would be nice to have this on PyPI 😃

uclatommy commented 5 years ago

@TristanJM, Not sure why you would be seeing that error, but my guess is that the latest PyPI package is a few commits back, so if you install from that, then add my code changes, you might be missing a few commits in your final result. If you want a new pypi package, you'd have to ask @danpaquin.

I'm just another user of this module like you and I've managed to make a few contributions along the way while building my own thing. Good luck!