LUCIT-Systems-and-Development / unicorn-fy

A Python SDK by LUCIT to convert received raw data from crypto exchange API endpoints into well-formed python dictionaries.
https://unicorn-fy.docs.lucit.tech
MIT License
54 stars 20 forks source link

Add balanceUpdate to UnicornFy #21

Closed davivc closed 3 years ago

davivc commented 3 years ago

PR Details

Adding balanceUpdate key to UnicornFy. When a transfer from spot to cross/isolated margin (and vice-versa) happens, Binance sends a json with event type as 'balanceUpdate'. This event type was not mapped in UnicornFy.

Here shows exactly what to expect from the payload: https://github.com/binance/binance-spot-api-docs/blob/master/user-data-stream.md#balance-update

{
  "e": "balanceUpdate",         //Event Type
  "E": 1573200697110,           //Event Time
  "a": "BTC",                   //Asset
  "d": "100.00000000",          //Balance Delta
  "T": 1573200697068            //Clear Time
}

Description

Related Issue

https://github.com/oliver-zehentleitner/unicorn-fy/issues/20

Motivation and Context

When create_stream is using output=UnicornFy an error stops the websocket whenever a transfer occurs between accounts.

unicorn_stream                | 2021-03-16 20:21:58,309 - DEBUG |client - received solicited pong: 60806399
unicorn_stream                | 2021-03-16 20:22:10,470 - DEBUG |client - event = data_received(<72 bytes>)
unicorn_stream                | 2021-03-16 20:22:10,470 - DEBUG |client - event = data_received(<97 bytes>)
unicorn_stream                | 2021-03-16 20:22:10,481 - DEBUG |client < Frame(fin=True, opcode=1, data=b'{"e":"balanceUpdate","E":1615926131286,"a":"USDT","d":"1.00000000","T":1615926131285}', rsv1=False, rsv2=False, rsv3=False)
unicorn_stream                | 2021-03-16 20:22:10,491 - DEBUG |client < Frame(fin=True, opcode=1, data=b'{"e":"outboundAccountPosition","E":1615926131286,"u":1615926131285,"B":[{"a":"USDT","f":"1.00000000","l":"0.00000000"}]}', rsv1=False, rsv2=False, rsv3=False)
unicorn_stream                | 2021-03-16 20:22:10,622 - DEBUG |UnicornFy->binance_websocket({"e":"balanceUpdate","E":1615926131286,"a":"USDT","d":"1.00000000","T":1615926131285})
unicorn_stream                | 2021-03-16 20:22:10,627 - ERROR |BinanceWebSocketApiSocket.start_socket(d4b1ac8c-f4c3-4f4a-b6a7-e67bb04ad856, ['arr'], ['!userData']) - Exception General Exception - error_msg: 'data'
unicorn_stream                | 2021-03-16 20:22:10,632 - CRITICAL |BinanceWebSocketApiManager.stream_is_crashing(d4b1ac8c-f4c3-4f4a-b6a7-e67bb04ad856)
unicorn_stream                | 2021-03-16 20:22:10,638 - DEBUG |client - state = CLOSING
unicorn_stream                | 2021-03-16 20:22:10,643 - DEBUG |client > Frame(fin=True, opcode=8, data=b'\x03\xe8', rsv1=False, rsv2=False, rsv3=False)
unicorn_stream                | 2021-03-16 20:22:10,721 - INFO |BinanceWebSocketApiManager.kill_stream(d4b1ac8c-f4c3-4f4a-b6a7-e67bb04ad856)
unicorn_stream                | 2021-03-16 20:22:10,726 - INFO |BinanceWebSocketApiManager._restart_stream(d4b1ac8c-f4c3-4f4a-b6a7-e67bb04ad856, ['arr'], ['!userData'])
unicorn_stream                | 2021-03-16 20:22:10,737 - DEBUG |Using selector: EpollSelector

How Has This Been Tested

I've create a unittest for this payload, also I've tested this release using a Docker image python3.8.1, unicorn-binance-websocket-api 0.29

import unicorn_binance_websocket_api
import logging
import os
import sys
import time
import requests
import threading

###############
### Logging ###
###############

# https://docs.python.org/3/library/logging.html#logging-levels
logging.basicConfig(level=logging.DEBUG,
                    filename=os.path.basename(__file__) + '.log',
                    format="{asctime} [{levelname:8}] {process} {thread} {module}: {message}",
                    style="{")

############
### Main ###
############

binance_api_key = os.environ.get('BAPI_KEY')
binance_api_secret = os.environ.get('BSECRET_KEY')

def print_stream_data_from_stream_buffer(binance_websocket_api_manager):
    while True:
        if binance_websocket_api_manager.is_manager_stopping():
            exit(0)
        oldest_stream_data_from_stream_buffer = binance_websocket_api_manager.pop_stream_data_from_stream_buffer()
        if oldest_stream_data_from_stream_buffer is False:
            time.sleep(0.1)
        else:
            print(oldest_stream_data_from_stream_buffer)

def main():
    ubwa_com_spot = unicorn_binance_websocket_api.BinanceWebSocketApiManager(throw_exception_if_unrepairable=True)
    ubwa_com_cross = unicorn_binance_websocket_api.BinanceWebSocketApiManager(exchange="binance.com-margin", throw_exception_if_unrepairable=True)
    ubwa_com_im = unicorn_binance_websocket_api.BinanceWebSocketApiManager(exchange="binance.com-isolated_margin", throw_exception_if_unrepairable=True)

    ubwa_com_spot.create_stream(
        "arr",
        "!userData",
        output="UnicornFy",
        api_key=binance_api_key,
        api_secret=binance_api_secret,
        stream_label="spot"
    )

    ubwa_com_cross.create_stream(
        "arr",
        "!userData",
        api_key=binance_api_key,
        api_secret=binance_api_secret,
        output="UnicornFy",
        stream_label="margin"
    )

    ubwa_com_im.create_stream(
        "arr",
        "!userData",
        symbols="BTCUSDT",
        output="UnicornFy",
        api_key=binance_api_key,
        api_secret=binance_api_secret,
        stream_label="BTCUSDT"
    )

    # start a worker process to process to move the received stream_data from the stream_buffer to a print function
    worker_thread_spot = threading.Thread(target=print_stream_data_from_stream_buffer, args=(ubwa_com_spot,))
    worker_thread_spot.start()

    # start a worker process to process to move the received stream_data from the stream_buffer to a print function
    worker_thread_cross = threading.Thread(target=print_stream_data_from_stream_buffer, args=(ubwa_com_cross,))
    worker_thread_cross.start()

    # start a worker process to process to move the received stream_data from the stream_buffer to a print function
    worker_thread_im = threading.Thread(target=print_stream_data_from_stream_buffer, args=(ubwa_com_im,))
    worker_thread_im.start()

    # Notifies
    while True:
        ubwa_com_spot.print_summary()
        ubwa_com_cross.print_summary()
        ubwa_com_im.print_summary()
        time.sleep(60)

if __name__ == "__main__":
    main()

With this script running just transfer assets between accounts (spot/cross/isolated) and now we get the correct output:

protocol: client - event = data_received(<73 bytes>)
protocol: client - event = data_received(<98 bytes>)
protocol: client < Frame(fin=True, opcode=1, data=b'{"e":"balanceUpdate","E":1615938677080,"a":"USDT","d":"1.00000000","T":1615938677079}', rsv1=False, rsv2=False, rsv3=False)
protocol: client < Frame(fin=True, opcode=1, data=b'{"e":"outboundAccountPosition","E":1615938677080,"u":1615938677079,"B":[{"a":"USDT","f":"1.00000000","l":"0.00000000"}]}', rsv1=False, rsv2=False, rsv3=False)
unicorn_fy: UnicornFy->binance_websocket({"e":"balanceUpdate","E":1615938677080,"a":"USDT","d":"1.00000000","T":1615938677079})
unicorn_fy: UnicornFy->binance_com_futures_websocket({'stream_type': '!userData@arr', 'event_type': 'balanceUpdate', 'event_time': 1615938677080, 'asset': 'USDT', 'balance_delta': '1.00000000', 'clear_time': 1615938677079, 'unicorn_fied': ['binance.com-margin', '0.8.0.dev']})
unicorn_binance_websocket_api_manager: BinanceWebSocketApiManager.is_stop_request(9ec4d3a4-3cb8-4cf0-8191-df205705b4d9)
protocol: client - event = data_received(<74 bytes>)
unicorn_binance_websocket_api_manager: BinanceWebSocketApiManager.is_stop_as_crash_request(9ec4d3a4-3cb8-4cf0-8191-df205705b4d9)
protocol: client - event = data_received(<102 bytes>)
unicorn_binance_websocket_api_manager: BinanceWebSocketApiManager.set_heartbeat(9ec4d3a4-3cb8-4cf0-8191-df205705b4d9)
protocol: client < Frame(fin=True, opcode=1, data=b'{"e":"balanceUpdate","E":1615938677080,"a":"USDT","d":"-1.00000000","T":1615938677079}', rsv1=False, rsv2=False, rsv3=False)
unicorn_fy: UnicornFy->binance_websocket({"e":"outboundAccountPosition","E":1615938677080,"u":1615938677079,"B":[{"a":"USDT","f":"1.00000000","l":"0.00000000"}]})
protocol: client < Frame(fin=True, opcode=1, data=b'{"e":"outboundAccountPosition","E":1615938677080,"u":1615938677079,"B":[{"a":"USDT","f":"9.00540157","l":"0.00000000"}]}', rsv1=False, rsv2=False, rsv3=False)
unicorn_fy: UnicornFy->binance_com_futures_websocket({'stream_type': '!userData@arr', 'event_type': 'outboundAccountPosition', 'event_time': 1615938677080, 'last_update_time': 1615938677079, 'balances': [{'asset': 'USDT', 'free': '1.00000000', 'locked': '0.00000000'}], 'unicorn_fied': ['binance.com-margin', '0.8.0.dev']})
unicorn_fy: UnicornFy->binance_websocket({"e":"balanceUpdate","E":1615938677080,"a":"USDT","d":"-1.00000000","T":1615938677079})
unicorn_binance_websocket_api_manager: BinanceWebSocketApiManager.is_stop_request(9ec4d3a4-3cb8-4cf0-8191-df205705b4d9)
unicorn_fy: UnicornFy->binance_com_futures_websocket({'stream_type': '!userData@arr', 'event_type': 'balanceUpdate', 'event_time': 1615938677080, 'asset': 'USDT', 'balance_delta': '-1.00000000', 'clear_time': 1615938677079, 'unicorn_fied': ['binance.com', '0.8.0.dev']})
unicorn_binance_websocket_api_manager: BinanceWebSocketApiManager.is_stop_as_crash_request(9ec4d3a4-3cb8-4cf0-8191-df205705b4d9)
unicorn_binance_websocket_api_manager: BinanceWebSocketApiManager.is_stop_request(98c0c886-4e59-4e79-9635-09da1cb794c9)
unicorn_binance_websocket_api_manager: BinanceWebSocketApiManager.set_heartbeat(9ec4d3a4-3cb8-4cf0-8191-df205705b4d9)
unicorn_binance_websocket_api_manager: BinanceWebSocketApiManager.is_stop_as_crash_request(98c0c886-4e59-4e79-9635-09da1cb794c9)
unicorn_binance_websocket_api_manager: BinanceWebSocketApiManager.set_heartbeat(98c0c886-4e59-4e79-9635-09da1cb794c9)
unicorn_fy: UnicornFy->binance_websocket({"e":"outboundAccountPosition","E":1615938677080,"u":1615938677079,"B":[{"a":"USDT","f":"9.00540157","l":"0.00000000"}]})
unicorn_fy: UnicornFy->binance_com_futures_websocket({'stream_type': '!userData@arr', 'event_type': 'outboundAccountPosition', 'event_time': 1615938677080, 'last_update_time': 1615938677079, 'balances': [{'asset': 'USDT', 'free': '9.00540157', 'locked': '0.00000000'}], 'unicorn_fied': ['binance.com', '0.8.0.dev']})
unicorn_binance_websocket_api_manager: BinanceWebSocketApiManager.is_stop_request(98c0c886-4e59-4e79-9635-09da1cb794c9)
unicorn_binance_websocket_api_manager: BinanceWebSocketApiManager.is_stop_as_crash_request(98c0c886-4e59-4e79-9635-09da1cb794c9)
unicorn_binance_websocket_api_manager: BinanceWebSocketApiManager.set_heartbeat(98c0c886-4e59-4e79-9635-09da1cb794c9)

Types of changes

Checklist

davivc commented 3 years ago

I've just noticed that some of the logging inside the function binance_websocket says 'binance_futures_websocket'. I can fix this later for you as well.

oliver-zehentleitner commented 3 years ago

Thanks for the very clean PR!!!