danpaquin / coinbasepro-python

The unofficial Python client for the Coinbase Pro API
MIT License
1.82k stars 738 forks source link

Error: messages missing. Re-initializing book at sequence. #365

Open indrasweb opened 5 years ago

indrasweb commented 5 years ago

Can anyone think of an elegant solution to the problem of initializing an order book? Any time I want to initialize a book it takes a long time, and sometimes depletes my rate limit because of consecutive calls to get_product_order_book().

What I think is happening is that because self._client.get_product_order_book(product_id=self.product_id, level=3) in reset_book() takes so long to download over the network, and the system doesn't update the self._sequence counter until this has completed, if another message arrives whilst this is happening the whole process is repeated (another call to reset_book() is made triggered by the message). Sometimes this loop lasts forever and results in spamming the console with Error: messages missing. Re-initializing book at sequence. messages and depleting your rate limit.

Elegant ideas as to how to fix this?

noah222 commented 5 years ago

I would use the level 2 channel through direct interaction with the API since it is quite a bit more stable and easy to manage:

https://docs.pro.coinbase.com/#the-ticker-channel

The code for orderbook on this danpaquin package is not reliable in my opinion. If someone has it working well, I would like to know how they did it as well!

indrasweb commented 5 years ago

So I wrote this yesterday to wrap around the existing API. It works, but it is such a hack which feels dangerous. I could probably make it nicer and upstream if someone else can test it out and also thinks it's good enough.

import cbpro, time
from decimal import *
from sortedcontainers.sorteddict import SortedDict
import pickle

def get_book(market_name):

  book = cbpro.OrderBook(product_id=market_name)
  book.init = 1
  start_queue = []

  # cbpro orderbooks die after like 2 hours, so we need something to restart them.
  def new_on_close():
    print(f'\n-- {market_name} OrderBook Socket Closed - restarting... --')
    book.reset_book()
    book.start()

  book.on_close = new_on_close

  def new_on_message(message):
    if book._log_to:
      pickle.dump(message, book._log_to)

    if book.init == 1:
      book.init = 2
      book.reset_book()

    if book.init == 2:
      start_queue.append(message)
      return

    sequence = message.get('sequence', -1)
    if sequence <= book._sequence:
      # ignore older messages (e.g. before order book initialization from getProductOrderBook)
      return
    elif sequence > book._sequence + 1:
      book.on_sequence_gap(book._sequence, sequence)
      return

    msg_type = message['type']
    if msg_type == 'open':
      book.add(message)
    elif msg_type == 'done' and 'price' in message:
      book.remove(message)
    elif msg_type == 'match':
      book.match(message)
      book._current_ticker = message
    elif msg_type == 'change':
      book.change(message)

    book._sequence = sequence

  book.on_message = new_on_message

  def new_reset_book():
    book._asks = SortedDict()
    book._bids = SortedDict()
    res = book._client.get_product_order_book(product_id=book.product_id, level=3)
    for bid in res['bids']:
      book.add({
        'id': bid[2],
        'side': 'buy',
        'price': Decimal(bid[0]),
        'size': Decimal(bid[1])
      })
    for ask in res['asks']:
      book.add({
        'id': ask[2],
        'side': 'sell',
        'price': Decimal(ask[0]),
        'size': Decimal(ask[1])
      })
    book._sequence = res['sequence']
    for message in start_queue:
      new_on_message(message)

  book.reset_book = new_reset_book

  book.start()

  while True:
    try:
      book.get_bid()
      book.get_ask()
      break
    except IndexError:
      time.sleep(0.1)
      continue

  return book
noah222 commented 5 years ago

This looks pretty nice. I'll have to try it. Thanks!

indrasweb commented 5 years ago

Sorry what I wrote above turns out to be junk. The following seems to work quite well but will still drop some messages during initialization.


def get_book(market_name):
  """ Get an order book for the given market_name, populated with
      at least one order on each side (this avoids IndexError when
      attempting to get_bid or get_ask).
  """
  book = cbpro.OrderBook(product_id=market_name)
  book.init = 1
  book.start_queue = []

  # cbpro orderbooks die after like 2 hours, so we need something to restart them.
  def new_on_close():
    print(f'\n-- {market_name} OrderBook Socket Closed - restarting... --')
    # book.init = 1
    book.reset_book()
    book.start()

  book.on_close = new_on_close

  def new_on_message(message):
    if book._log_to:
      pickle.dump(message, book._log_to)

    if book.init == 1:
      book.init = 2
      book.start_queue.append(message)
      book.reset_book()

    if book.init == 2:
      book.start_queue.append(message)
      return

    sequence = message.get('sequence', -1)
    if sequence <= book._sequence:
      # ignore older messages (e.g. before order book initialization from getProductOrderBook)
      return
    elif sequence > book._sequence + 1:
      book.on_sequence_gap(book._sequence, sequence)
      return

    msg_type = message['type']
    if msg_type == 'open':
      book.add(message)
    elif msg_type == 'done' and 'price' in message:
      book.remove(message)
    elif msg_type == 'match':
      book.match(message)
      book._current_ticker = message
    elif msg_type == 'change':
      book.change(message)

    book._sequence = sequence

  book.on_message = new_on_message

  def new_reset_book():
    book._asks = SortedDict()
    book._bids = SortedDict()
    res = book._client.get_product_order_book(product_id=book.product_id, level=3)
    for bid in res['bids']:
      book.add({
        'id': bid[2],
        'side': 'buy',
        'price': Decimal(bid[0]),
        'size': Decimal(bid[1])
      })
    for ask in res['asks']:
      book.add({
        'id': ask[2],
        'side': 'sell',
        'price': Decimal(ask[0]),
        'size': Decimal(ask[1])
      })
    book._sequence = res['sequence']
    book.init = 0
    for message in book.start_queue:
      new_on_message(message)
    book.start_queue = []

  book.reset_book = new_reset_book

  book.start()

  while True:
    try:
      book.get_bid()
      book.get_ask()
      break
    except IndexError:
      time.sleep(0.1)
      continue

  return book
lordkebab commented 5 years ago

I like the level-2 feed to keep up with it. Here's what I'm doing:

import cbpro
import os
from colors import Colors

class Client(cbpro.WebsocketClient):    
    def on_open(self):
        # connection parameters
        #self.url = "wss://ws-feed-public.sandbox.pro.coinbase.com/"
        self.url = "wss://ws-feed.pro.coinbase.com/"
        self.products = ["BTC-USD"]
        self.channels = ["level2"]

        key = os.environ['CBPRO_KEY']
        secret = os.environ['CBPRO_SECRET']
        passphrase = os.environ['CBPRO_PASSPHRASE']

    def on_message(self, msg):
        if msg['type'] == 'snapshot':
            self.bids = {float(k): v for (k, v) in msg['bids']}
            self.asks = {float(k): v for (k, v) in msg['asks']}

        if msg['type'] == 'l2update':
            print(msg)

            for change in msg['changes']:
                amt = float(change[1])

                if change[0] == 'buy':                    
                    # if the qty becomes 0, we need to get rid of this item in the order book
                    if change[2] == '0':
                        self.bids.pop(float(amt))
                    else:
                        self.bids[amt] = change[2]

                else:
                    # if the qty becomes 0, we need to get rid of this item in the order book
                    if change[2] == '0':
                        self.asks.pop(amt)
                    else:
                        self.asks[amt] = change[2] 

                best_bid = sorted(self.bids.keys())[-1]
                best_ask = sorted(self.asks.keys())[0]

                spread = '{0:.2f}'.format(best_ask-best_bid)

                if spread == '0.01':
                    spread = Colors.BLUE + spread + Colors.END
                else:
                    spread = Colors.GREEN + spread + Colors.END

                print("Best bid: {}\tBest ask: {}\tSpread: {}".format(best_bid,  best_ask, spread))

Colors class if you want:

class Colors:
    BLUE = '\033[94m'
    GREEN = '\033[92m'
    END = '\033[0m'

That way in on_message if I need to know if bids or asks are heavier I can just do len(bids) > len(asks)

npucheta commented 5 years ago

@pmaji have a look I think this could be the fix to the empty websocket issue

CoinBossProLPSecurityForce commented 5 years ago

@pmaji have a look I think this could be the fix to the empty websocket issue

totally. I just tested the code and this fixes it 100%

pmaji commented 5 years ago

@npucheta @CoinBossProLPSecurityForce Will this be rolled into the actual library as a change? I don't like making systemic changes that rely on this kind of work-around.

lordkebab commented 5 years ago

I wouldn't put this into the library as it's an implementation of the library itself. Could be put into the README.md as an example, though.

JamesKBowler commented 5 years ago

This is what I am doing for the level 3 order book.

Built a rate limiter for the API calls, I use this for other stuff too, but it works fine with this API.

# rate_limit.py

from threading import Thread, Lock
import time

lock = Lock()

class RateLimiter(object):
    def __init__(self, class_link, rate=0.16, print_output=True):
        self.jobs = []
        self.rate = rate
        self.__print_output = print_output
        self.__class_link = class_link
        self.__loop_continue = True
        self.__running = False
        self.__t = None

    def __del__(self):
        self.__loop_continue = False
        del self.__t

    def __limiter(self):
        self.__running = True
        while self.__loop_continue:
            try:
                job = self.jobs.pop(0)
            except IndexError:
                job = None
            else:
                self.__func_caller(job)
            time.sleep(self.rate)
        self.__running = False
        self._on_stop()

    def __func_caller(self, job):
        if job is None:
            return
        func = getattr(self.__class_link, job.func_name)
        result = func(*job.vars)
        job.callback(result)

    def _on_stop(self):
        if self.__print_output:
            print(
                "The {} jobs loop has stopped".format(
                    self.__class__.__name__)
            )

    def _on_start(self):
        if self.__print_output:
            print(
                "The {} jobs loop has started".format(
                    self.__class__.__name__)
            )

    def start(self):
        if self.__running:
            return
        self.__loop_continue = True
        self.__t = Thread(target=self.__limiter)
        self.__t.daemon = True
        self.__t.start()
        self._on_start()

    def stop(self):
        if not self.__running:
            return
        self.__loop_continue = False

    def put_job(self, job):
        with lock:
            self.jobs.append(job)

.... and a small wrapper to tie the RateLimiter and PublicClient togeather. Now the PublicClient has a queue and will process them without exceeding the threshold.

# wrapper.py

from rate_limit import RateLimiter
from cbpro import PublicClient

class CoinbaseProWrapper:
    def __init__(self):
        self.__public_client = PublicClient()
        self.__limiter = RateLimiter(
            self.__public_client, 0.16, print_output=False
        )
        self.__limiter.start()

    def get(self, command_event):
        self.__limiter.put_job(command_event)

I then made each product have its own book and buffer to process messages once the full book returns from the PublicClient. Most of the code is the same as the Library.

# book.py

from sortedcontainers import SortedDict
from decimal import Decimal

class Book:
    def __init__(self):
        self._asks = SortedDict()
        self._bids = SortedDict()
        self._current_ticker = None
        self.sequence = -1
        self.buffer = []

    def reset(self, res):
        self._asks = SortedDict()
        self._bids = SortedDict()
        for bid in res['bids']:
            self.add({
                'id': bid[2],
                'side': 'buy',
                'price': Decimal(bid[0]),
                'size': Decimal(bid[1])
            })
        for ask in res['asks']:
            self.add({
                'id': ask[2],
                'side': 'sell',
                'price': Decimal(ask[0]),
                'size': Decimal(ask[1])
            })
        sequence = res['sequence']
        while self._process_buffer(sequence):
            pass

    def _process_buffer(self, sequence):
        buffer = self.buffer.copy()
        self.buffer.clear()
        any_processed = False
        for message in buffer:
            if message['sequence'] < sequence:
                continue
            self.process_message(message)
            sequence = message['sequence']
            any_processed = True
        if any_processed and not self.buffer:
            self.sequence = sequence
        return any_processed

    def process_message(self, message):
        msg_type = message['type']
        if msg_type == 'open':
            self.add(message)
        elif msg_type == 'done' and 'price' in message:
            self.remove(message)
        elif msg_type == 'match':
            self.match(message)
            self._current_ticker = message
        elif msg_type == 'change':
            self.change(message)

    def add(self, order):
        order = {
            'id': order.get('order_id') or order['id'],
            'side': order['side'],
            'price': Decimal(order['price']),
            'size': Decimal(order.get('size') or order['remaining_size'])
        }
        if order['side'] == 'buy':
            bids = self.get_bids(order['price'])
            if bids is None:
                bids = [order]
            else:
                bids.append(order)
            self.set_bids(order['price'], bids)
        else:
            asks = self.get_asks(order['price'])
            if asks is None:
                asks = [order]
            else:
                asks.append(order)
            self.set_asks(order['price'], asks)

    def remove(self, order):
        price = Decimal(order['price'])
        if order['side'] == 'buy':
            bids = self.get_bids(price)
            if bids is not None:
                bids = [o for o in bids if o['id'] != order['order_id']]
                if len(bids) > 0:
                    self.set_bids(price, bids)
                else:
                    self.remove_bids(price)
        else:
            asks = self.get_asks(price)
            if asks is not None:
                asks = [o for o in asks if o['id'] != order['order_id']]
                if len(asks) > 0:
                    self.set_asks(price, asks)
                else:
                    self.remove_asks(price)

    def match(self, order):
        size = Decimal(order['size'])
        price = Decimal(order['price'])

        if order['side'] == 'buy':
            bids = self.get_bids(price)
            if not bids:
                return
            assert bids[0]['id'] == order['maker_order_id']
            if bids[0]['size'] == size:
                self.set_bids(price, bids[1:])
            else:
                bids[0]['size'] -= size
                self.set_bids(price, bids)
        else:
            asks = self.get_asks(price)
            if not asks:
                return
            assert asks[0]['id'] == order['maker_order_id']
            if asks[0]['size'] == size:
                self.set_asks(price, asks[1:])
            else:
                asks[0]['size'] -= size
                self.set_asks(price, asks)

    def change(self, order):
        try:
            new_size = Decimal(order['new_size'])
        except KeyError:
            return

        try:
            price = Decimal(order['price'])
        except KeyError:
            return

        if order['side'] == 'buy':
            bids = self.get_bids(price)
            if bids is None or not any(o['id'] == order['order_id'] for o in bids):
                return
            index = [b['id'] for b in bids].index(order['order_id'])
            bids[index]['size'] = new_size
            self.set_bids(price, bids)
        else:
            asks = self.get_asks(price)
            if asks is None or not any(o['id'] == order['order_id'] for o in asks):
                return
            index = [a['id'] for a in asks].index(order['order_id'])
            asks[index]['size'] = new_size
            self.set_asks(price, asks)

        tree = self._asks if order['side'] == 'sell' else self._bids
        node = tree.get(price)

        if node is None or not any(o['id'] == order['order_id'] for o in node):
            return

    def get_book(self):
        result = {
            'sequence': self.sequence,
            'asks': [],
            'bids': [],
        }
        for ask in self._asks:
            try:
                # There can be a race condition here, where a price point is removed
                # between these two ops
                this_ask = self._asks[ask]
            except KeyError:
                continue
            for order in this_ask:
                result['asks'].append([order['price'], order['size'], order['id']])
        for bid in self._bids:
            try:
                # There can be a race condition here, where a price point is removed
                # between these two ops
                this_bid = self._bids[bid]
            except KeyError:
                continue

            for order in this_bid:
                result['bids'].append([order['price'], order['size'], order['id']])
        return result

    def get_current_ticker(self):
        return self._current_ticker

    def get_ask(self):
        return self._asks.peekitem(0)[0]

    def get_asks(self, price):
        return self._asks.get(price)

    def remove_asks(self, price):
        del self._asks[price]

    def set_asks(self, price, asks):
        self._asks[price] = asks

    def get_bid(self):
        return self._bids.peekitem(-1)[0]

    def get_bids(self, price):
        return self._bids.get(price)

    def remove_bids(self, price):
        del self._bids[price]

    def set_bids(self, price, bids):
        self._bids[price] = bids

... finally the OrderBook which holds all the other Books.

# order_book.py

from wrapper import CoinbaseProWrapper
from book import Book
from cbpro.websocket_client import WebsocketClient

from collections import namedtuple

import pickle

CommandEvent = namedtuple('CommandEvent', ['func_name', 'vars', 'callback'])

class OrderBook(WebsocketClient):
    def __init__(self, products=['BTC-USD'], log_to=None):
        self._books = {}
        for p in products:
            self._books[p] = Book()
        super(OrderBook, self).__init__(products=products)
        self._client = CoinbaseProWrapper()
        self._log_to = log_to
        if self._log_to:
            assert hasattr(self._log_to, 'write')

    def on_open(self):
        for i in self._books.values():
            i.sequence = -1
        print("-- Subscribed to OrderBook! --\n")

    def on_close(self):
        print("\n-- OrderBook Socket Closed! --")

    def reset_book(self, product_id):
        book = self._books[product_id]
        book.sequence = 0
        command = CommandEvent(
            'get_product_order_book',
            [product_id, 3], book.reset
        )
        self._client.get(command)

      def on_message(self, message):
        product = message.get('product_id')
        book = self._books[product]
        book.buffer.append(message)
        if self._log_to:
            pickle.dump(message, self._log_to)
        sequence = message.get('sequence', -1)
        if book.sequence == -1:
            book.sequence = 0
            self.reset_book(product)
            return
        if book.sequence == 0:
            return
        if sequence <= book.sequence:
            # ignore older messages (e.g. before order book initialization from getProductOrderBook)
            return
        elif sequence > book.sequence + 1:
            self.on_sequence_gap(product, book.sequence, sequence)
            return
        book.process_message(message)
        book.sequence = sequence
        book.buffer.remove(message)

    def on_sequence_gap(self, product, gap_start, gap_end):
        self.reset_book(product)
        print('Error: {}, messages missing ({} - {}). Re-initializing  book at sequence.'.format(
            product, gap_start, gap_end))

    def get_current_book(self, product):
        book = self._books[product]
        return book.get_book()

if __name__ == '__main__':
    import sys
    import time
    import datetime as dt

    import cbpro

    class OrderBookConsole(OrderBook):
        ''' Logs real-time changes to the bid-ask spread to the console '''

        def __init__(self, products=['BTC-USD', 'LTC-USD']):
            super(OrderBookConsole, self).__init__(products=products)
            for b in self._books.values():
                # latest values of bid-ask spread
                b._bid = None
                b._ask = None
                b._bid_depth = None
                b._ask_depth = None

        def on_message(self, message):
            super(OrderBookConsole, self).on_message(message)
            product = message.get('product_id')
            book = self._books[product]
            if book.sequence == 0:
                return
            # Calculate newest bid-ask spread
            bid = book.get_bid()
            bids = book.get_bids(bid)
            bid_depth = sum([b['size'] for b in bids])
            ask = book.get_ask()
            asks = book.get_asks(ask)
            ask_depth = sum([a['size'] for a in asks])

            if book._bid == bid and book._ask == ask and book._bid_depth == bid_depth and book._ask_depth == ask_depth:
                # If there are no changes to the bid-ask spread since the last update, no need to print
                pass
            else:
                # If there are differences, update the cache
                book._bid = bid
                book._ask = ask
                book._bid_depth = bid_depth
                book._ask_depth = ask_depth
                print('{} {} bid: {:.3f} @ {:.2f}\task: {:.3f} @ {:.2f}'.format(
                    dt.datetime.now(), product, bid_depth, bid, ask_depth, ask))

    client = cbpro.PublicClient()
    products = [i['id'] for i in client.get_products()]
    order_book = OrderBookConsole(products)
    order_book.start()
    try:
        while True:
            time.sleep(10)
    except KeyboardInterrupt:
        order_book.close()

    if order_book.error:
        sys.exit(1)
    else:
        sys.exit(0)
eraoul commented 4 years ago

Small updates to the code above from @matts80 as it didn't seem to be working anymore. There was a test for the string '0' which now looks like '0.000000' in the update message. I switched to using Decimal objects. Also switched to using a sorted dictionary class for efficiency, and made some updates to remove code redundancy.

colors.py:

class Colors:
    BLUE = '\033[94m'
    GREEN = '\033[92m'
    END = '\033[0m'

orderbook_client.py:

from decimal import Decimal

import cbpro
from sortedcontainers import SortedDict

from colors import Colors

class OrderbookClient(cbpro.WebsocketClient):    
    def __init__(self, products=['BTC-USD']):
        super().__init__(url='wss://ws-feed.pro.coinbase.com/', products=products, channels=['level2'])

    def on_open(self):
        # Set connection parameters.
        pass

    def on_message(self, msg):
        if msg['type'] == 'snapshot':
            self.bids = SortedDict({Decimal(k): Decimal(v) for (k, v) in msg['bids']})
            self.asks = SortedDict({Decimal(k): Decimal(v) for (k, v) in msg['asks']})

        if msg['type'] == 'l2update':
            print(msg)

            for change in msg['changes']:
                price = Decimal(change[1])
                size = Decimal(change[2])

                # Buy messages affect bid side; sell messages affect ask side.
                orderbook_side = self.bids if change[0] == 'buy' else self.asks

                # If the qty becomes 0, we need to get rid of this item in the order book.
                if size == 0:
                    orderbook_side.pop(price)
                else:
                    # Overwrite entry. The size is the new amount of orders at this price; it is not a delta.
                    orderbook_side[price] = size

                best_bid = self.bids.keys()[-1]
                best_ask = self.asks.keys()[0]

                spread = '{0:.2f}'.format(best_ask - best_bid)

                if spread == '0.01':
                    spread = Colors.BLUE + spread + Colors.END
                else:
                    spread = Colors.GREEN + spread + Colors.END

                print('Best bid: {}\tBest ask: {}\tSpread: {}'.format(best_bid,  best_ask, spread))

if __name__ == '__main__':
    client = OrderbookClient()
    client.start()