bmoscon / cryptofeed

Cryptocurrency Exchange Websocket Data Feed Handler
Other
2.19k stars 679 forks source link

high level cpu usage #889

Closed xubinlaile closed 2 years ago

xubinlaile commented 2 years ago

I run the okx demo geting high level of cpu usage after a few minutes, my code as follow:

`from decimal import Decimal from cryptofeed import FeedHandler from cryptofeed.defines import CANDLES, BID, ASK, BLOCKCHAIN, FUNDING, GEMINI, L2_BOOK, L3_BOOK, LIQUIDATIONS, \ OPEN_INTEREST, PERPETUAL, TICKER, TRADES, INDEX from cryptofeed.exchanges import OKX import h5py import numpy as np import os from datetime import datetime

base_path = r'/root/OkexClient/Bin/'

symbol_map = {'BTC-USDT-PERP': 'BTC-USDT-SWAP'}

async def book(book, receipt_timestamp): print(f'Book received at {receipt_timestamp} for {book.exchange} - {book.symbol},' f' with {len(book.book)} entries. Top of book prices: ' f'{book.book.asks.index(0)[0]} - {book.book.bids.index(0)[0]}') if book.delta: print(f"Delta from last book contains {len(book.delta[BID]) + len(book.delta[ASK])} entries.") data = book.to_dict(numeric_type=float, none_to=None) ts_float = data['timestamp'] ts_int = int(ts_float*1000) local_time = datetime.fromtimestamp(ts_int / 1000.0) symbol = data['symbol'] if symbol in symbol_map: symbol = symbolmap[symbol] symbol = symbol.replace('-', '') local_pathstr = symbol + '' + local_time.strftime('%Y%m%d') + '.h5' save_path = os.path.join(base_path, 'python_depth', local_path_str)

h5 = h5py.File(save_path, 'a')

ask = list(data['book']['ask'].keys())[0:20]
bid = list(data['book']['bid'].keys())[0:20]
ask_li = []
bid_li = []
ask_li.append(ts_int)
for i in ask:
    ask_li.append(i)
    ask_li.append(data['book']['ask'][i])
for j in bid:
    bid_li.append(j)
    bid_li.append(data['book']['bid'][j])
if str(ts_int) not in h5:
    ask_li.extend(bid_li)
    data = np.array(ask_li, dtype=float)
    h5.create_dataset(str(ts_int), data=data)
h5.close()

async def trade(t, receipt_timestamp): print(f"Trade received at {receipt_timestamp}: ") ts_float = t.timestamp ts_int = int(ts_float * 1000) local_time = datetime.fromtimestamp(ts_int / 1000.0) symbol = t.symbol if symbol in symbol_map: symbol = symbolmap[symbol] symbol = symbol.replace('-', '') local_pathstr = symbol + '' + local_time.strftime('%Y%m%d') + '.h5' save_path = os.path.join(base_path, 'python_trades', local_path_str)

if not os.path.exists(save_path):

h5 = h5py.File(save_path, 'a')
if str(ts_int) not in h5:
    if t.side == 'sell':
        side = 0
    if t.side == 'buy':
        side = 1
    data = np.array([ts_int, t.price, t.amount, side], dtype=float)
    h5.create_dataset(str(ts_int), data=data)
h5.close()

def main(): config = {'log': {'filename': 'demo.log', 'level': 'DEBUG', 'disabled': False}} f = FeedHandler(config=config) f.add_feed(OKX(checksum_validation=True, symbols=['BTC-USDT-PERP'], channels=[BOOK, TRADES], callbacks={BOOK: book, TRADES: trade})) f.run()

if name == 'main': main()`

image

xubinlaile commented 2 years ago

running log as follows, does it mean error by showing 'OKX.ws.1: terminate the connection handler because not running'?

some logs: 2022-08-05 16:04:40,248 : DEBUG : OKX.ws.1: connecting to wss://ws.okx.com:8443/ws/v5/public 2022-08-05 16:05:08,705 : INFO : FH: System Exit received - shutting down 2022-08-05 16:05:08,705 : INFO : FH: shutdown connections handlers in feeds 2022-08-05 16:05:08,705 : INFO : FH: create the tasks to properly shutdown the backends (to flush the local cache) 2022-08-05 16:05:08,705 : INFO : FH: wait 1 backend tasks until termination 2022-08-05 16:05:08,705 : INFO : OKX: feed shutdown starting... 2022-08-05 16:05:08,708 : INFO : OKX.ws.1: terminate the connection handler because not running 2022-08-05 16:05:08,708 : INFO : OKX.ws.1: closed connection 'WebSocketClientProtocol' 2022-08-05 16:05:08,708 : INFO : OKX: feed shutdown completed 2022-08-05 16:05:08,708 : INFO : FH: stop the AsyncIO loop 2022-08-05 16:05:08,709 : INFO : FH: run the AsyncIO event loop one last time 2022-08-05 16:05:08,709 : INFO : FH: cancel the 1 pending tasks 2022-08-05 16:05:08,709 : INFO : FH: run the pending tasks until complete 2022-08-05 16:05:08,709 : INFO : FH: shutdown asynchronous generators 2022-08-05 16:05:08,709 : INFO : FH: close the AsyncIO loop 2022-08-05 16:05:08,710 : INFO : FH: leaving run() 2022

bmoscon commented 2 years ago

you or something else are terminating the application: 2022-08-05 16:05:08,705 : INFO : FH: System Exit received - shutting down

bmoscon commented 2 years ago

i dont see any bug or issues here, you are processing book updates, thats going to use a lot of CPU