nautechsystems / nautilus_trader

A high-performance algorithmic trading platform and event-driven backtester
https://nautilustrader.io
GNU Lesser General Public License v3.0
1.95k stars 445 forks source link

Loading binance csv data #500

Closed yohplala closed 2 years ago

yohplala commented 2 years ago

Context

Hi, finally starting with nautilus trader, I am first trying to load data into a catalog. I could notice that backtest_config_example.ipynb is probably the best entry point. However, I am adapting to the data I would like to use, which is 1minute bar data from Binance. I take data from Binance official repo, which is made available in csv.zip format here. In below example, I am more specifically starting with spot data, ADABTC pair, with monthly data for October 2021 (file here).

Problem

Here is my current code. Basically, what should be the block_parser callable to be used in CSVReader is not so clear to me. I am providing a pre-configured pd.read_csv method for this, I get a TypeError exception.

Steps to Reproduce the Problem

Code.

import os, shutil
import pandas as pd
import numpy as np

from nautilus_trader.persistence.catalog import DataCatalog
from nautilus_trader.persistence.external.core import process_files, write_objects
# here, importing CSVReader, instead of TextReader
from nautilus_trader.persistence.external.readers import CSVReader

# here, importing Bar, instead of QuoteTick
from nautilus_trader.model.data.bar import Bar
# here, I have not yet checked what I should change :)
from nautilus_trader.model.objects import Price, Quantity

DATA_DIR = "~/Documents/code/data/binance/data/spot/monthly/klines/ADABTC/1m"
CATALOG_PATH = os.path.expanduser("~/Documents/code/data/nautilus_test/catalog")
if os.path.exists(CATALOG_PATH):
    shutil.rmtree(CATALOG_PATH)
os.makedirs(CATALOG_PATH)

ADABTC = TestInstrumentProvider.adabtc_binance()
catalog = DataCatalog(CATALOG_PATH)

# Pre-configured 'pd.read_csv()'
BINANCE_KLINES_CONF =  {'sep':',',
                                   'header':None,
                                   'names':('ts_open','open','high','low','close',
                                            'volume', 'ts_close', 'quote_volume',
                                            'n_trades', 'taker_buy_base_volume',
                                            'taker_buy_quote_volume', 'ignore'),
                                   'index_col':None, # no column to use as index
                                   'usecols':('ts_open','open','high','low','close',
                                              'volume', 'quote_volume'),
                                   'dtype':{'ts_open':np.int64,
                                            'open':np.float64,
                                            'high':np.float64,
                                            'low':np.float64,
                                            'close':np.float64,
                                            'volume':np.float64,
                                            'quote_volume':np.float64},
                                   'engine':'c',
                                   'compression':'zip'}

def parse_binance_klines(file):
    return pd.read_csv(file, **BINANCE_KLINES_CONF)

process_files(
    glob_path=f"{DATA_DIR}/*.zip",
    reader=CSVReader(block_parser=parse_binance_klines),
    catalog=catalog,
)

Error produced:

[                                        ] | 0% Completed |  0.1s
Traceback (most recent call last):

  File "<ipython-input-19-24fcb10aed2f>", line 1, in <module>
    process_files(

  File "/home/yoh/anaconda3/lib/python3.8/site-packages/nautilus_trader/persistence/external/core.py", line 132, in process_files
    results = compute(tasks)

  File "/home/yoh/anaconda3/lib/python3.8/site-packages/dask/base.py", line 571, in compute
    results = schedule(dsk, keys, **kwargs)

  File "/home/yoh/anaconda3/lib/python3.8/site-packages/dask/local.py", line 553, in get_sync
    return get_async(

  File "/home/yoh/anaconda3/lib/python3.8/site-packages/dask/local.py", line 496, in get_async
    for key, res_info, failed in queue_get(queue).result():

  File "/home/yoh/anaconda3/lib/python3.8/concurrent/futures/_base.py", line 432, in result
    return self.__get_result()

  File "/home/yoh/anaconda3/lib/python3.8/concurrent/futures/_base.py", line 388, in __get_result
    raise self._exception

  File "/home/yoh/anaconda3/lib/python3.8/site-packages/dask/local.py", line 538, in submit
    fut.set_result(fn(*args, **kwargs))

  File "/home/yoh/anaconda3/lib/python3.8/site-packages/dask/local.py", line 234, in batch_execute_tasks
    return [execute_task(*a) for a in it]

  File "/home/yoh/anaconda3/lib/python3.8/site-packages/dask/local.py", line 234, in <listcomp>
    return [execute_task(*a) for a in it]

  File "/home/yoh/anaconda3/lib/python3.8/site-packages/dask/local.py", line 225, in execute_task
    result = pack_exception(e, dumps)

  File "/home/yoh/anaconda3/lib/python3.8/site-packages/dask/local.py", line 220, in execute_task
    result = _execute_task(task, data)

  File "/home/yoh/anaconda3/lib/python3.8/site-packages/dask/core.py", line 119, in _execute_task
    return func(*(_execute_task(a, cache) for a in args))

  File "/home/yoh/anaconda3/lib/python3.8/site-packages/dask/utils.py", line 37, in apply
    return func(*args, **kwargs)

  File "/home/yoh/anaconda3/lib/python3.8/site-packages/nautilus_trader/persistence/external/core.py", line 103, in process_raw_file
    objs = [x for x in reader.parse(block) if x is not None]

  File "/home/yoh/anaconda3/lib/python3.8/site-packages/nautilus_trader/persistence/external/core.py", line 103, in <listcomp>
    objs = [x for x in reader.parse(block) if x is not None]

  File "/home/yoh/anaconda3/lib/python3.8/site-packages/nautilus_trader/persistence/external/readers.py", line 296, in parse
    yield from self.block_parser(chunk)

  File "<ipython-input-14-fcc5fbbbdee7>", line 2, in parse_binance_klines
    return pd.read_csv(file, **BINANCE_KLINES_CONF)

  File "/home/yoh/anaconda3/lib/python3.8/site-packages/pandas/util/_decorators.py", line 311, in wrapper
    return func(*args, **kwargs)

  File "/home/yoh/anaconda3/lib/python3.8/site-packages/pandas/io/parsers/readers.py", line 586, in read_csv
    return _read(filepath_or_buffer, kwds)

  File "/home/yoh/anaconda3/lib/python3.8/site-packages/pandas/io/parsers/readers.py", line 482, in _read
    parser = TextFileReader(filepath_or_buffer, **kwds)

  File "/home/yoh/anaconda3/lib/python3.8/site-packages/pandas/io/parsers/readers.py", line 811, in __init__
    self._engine = self._make_engine(self.engine)

  File "/home/yoh/anaconda3/lib/python3.8/site-packages/pandas/io/parsers/readers.py", line 1040, in _make_engine
    return mapping[engine](self.f, **self.options)  # type: ignore[call-arg]

  File "/home/yoh/anaconda3/lib/python3.8/site-packages/pandas/io/parsers/c_parser_wrapper.py", line 51, in __init__
    self._open_handles(src, kwds)

  File "/home/yoh/anaconda3/lib/python3.8/site-packages/pandas/io/parsers/base_parser.py", line 222, in _open_handles
    self.handles = get_handle(

  File "/home/yoh/anaconda3/lib/python3.8/site-packages/pandas/io/common.py", line 585, in get_handle
    if _is_binary_mode(path_or_buf, mode) and "b" not in mode:

  File "/home/yoh/anaconda3/lib/python3.8/site-packages/pandas/io/common.py", line 962, in _is_binary_mode
    return isinstance(handle, binary_classes) or "b" in getattr(handle, "mode", mode)

TypeError: argument of type 'method' is not iterable

Specifications

yohplala commented 2 years ago

Hi, I have been delving a bit more in the topic, by reviewing code of CSVReader.

I fail to see what a block is and what provide them?

As Binance files are csv.zip files, I am not sure they are accepted as they are by CSVReader, but i can confirm pd.read_csv accepts them without need of additional args. I can see pd.read_csv is used by CSVReader. So in the end, I wonder what is supposed to do a block_parser function. I could guess it could be a dict from which mapping of column names from external data to nautilus is done, as well as mapping of dtype probably?

Please, where can I find information about what block_parser is expected to do?

Thanks in advance for your help and sorry for these naive questions.

limx0 commented 2 years ago

Hi @yohplala - sorry for the delayed response, and the lacking documentation here.

The block is a chunk of data (in this case it will be a dataframe), which may be a whole file, or in the case where the files are large, a subset of the data.

The parsing function needs to return actual nautilus objects - so you need to convert your raw data in csv into nautilus Bar objects. Another thing to note is that the parsing functions should return generators (this is mostly for other types of files ie text where a row may contain 0 or more actual data points - so a generator keeps this flexible to handle those situations)

If you update your parsing function to something like this, it should work:

def parse_binance_klines(block: pd.DataFrame, precision=5, bar_spec="1-MINUTE"):
    df = pd.read_csv(file, **BINANCE_KLINES_CONF)
    for _, row in df.iterrows():
        yield Bar(
            bar_type=BarType.from_str(f"{TestInstrumentProvider.adabtc_binance}-{bar_spec}-MID-EXTERNAL"),
            open=Price(row["open"], precision=precision),
            high=Price(row["high"], precision=precision),
            low=Price(row["low"], precision=precision),
            close=Price(row["close"], precision=precision),
            volume=Quantity.from_str(str(row['quote_volume'])),
            ts_event=millis_to_nanos(row['ts_open']),
            ts_init=millis_to_nanos(row['ts_open']),
        )

I've just hard coded the bar type in above, you might need to use a partial function if you're planning on loading in multiple bar types or underlyings and the bar spec is part of the filename, ie:

parser_5min = functools.partial(parse_binance_klines, bar_spec="5-MINUTE")

process_files(
    glob_path=f"{DATA_DIR}/*5m*.zip",
    reader=CSVReader(block_parser=5min_parser),
    catalog=catalog,
)

Let me know if any of that isn't clear, or if you have any more questions?

yohplala commented 2 years ago

Hi @limx0

Sorry I just see your answer as I have proposed my very 1st pull request :) #501

So, I had troubles with the fact that Binance official CSV files are headerless (I use 'official' every 4 words, as they do be provided by Binance on their official website, and as Binance is one of the exchange supported by nautilus trader, it seems logical Nautilus trader should be able to read them).

I added a test case to check the fix, please do not hesitate to share your feedback.

The test case can be used as example. It replicates what is done in other test cases. So finally, here is my parser.

import os, shutil
import pandas as pd

from nautilus_trader.persistence.catalog import DataCatalog
from nautilus_trader.persistence.external.core import process_files
from nautilus_trader.persistence.external.readers import CSVReader
from nautilus_trader.backtest.data.wranglers import BarDataWrangler
from nautilus_trader.model.enums import BarAggregation, PriceType
#from tests.test_kit.stubs import TestStubs
from nautilus_trader.backtest.data.providers import TestInstrumentProvider
from nautilus_trader.model.identifiers import Symbol, Venue
from nautilus_trader.model.identifiers import InstrumentId
from nautilus_trader.model.data.bar import BarSpecification, BarType

# https://docs.nautilustrader.io/user-guide/backtesting
DATA_DIR = "~/Documents/code/data/binance/data/spot/monthly/klines/ADABTC/1m"
CATALOG_PATH = os.path.expanduser("~/Documents/code/data/nautilus/catalog")

# Clear if it already exists, then create fresh
if os.path.exists(CATALOG_PATH):
    shutil.rmtree(CATALOG_PATH)
os.makedirs(CATALOG_PATH)
catalog = DataCatalog(CATALOG_PATH)

adabtc_id = InstrumentId(Symbol("ADA/BTC"), Venue("BINANCE"))
bar_spec_1min_last = BarSpecification(1, BarAggregation.MINUTE, PriceType.LAST)
bar_type = BarType(adabtc_id, bar_spec_1min_last)
#bar_type = TestStubs.bartype_adabtc_binance_1min_last()
instrument = TestInstrumentProvider.adabtc_binance()
wrangler = BarDataWrangler(bar_type, instrument)
def binance_csvzip_parser2(data):
    data['timestamp'] = data['timestamp'].astype('datetime64[ms]')
#    data = data[['timestamp', 'open', 'high', 'low', 'close', 'volume']]
    bars = wrangler.process(data.set_index("timestamp"))
    return bars

binance_spot_header = ['timestamp','open','high','low','close', 'volume',
                       'ts_close', 'quote_volume', 'n_trades',
                       'taker_buy_base_volume', 'taker_buy_quote_volume',
                       'ignore']

binance_spot_csv_reader = CSVReader(block_parser=binance_csvzip_parser2,
                                    header=binance_spot_header)

process_files(
    glob_path=f"{DATA_DIR}/*.zip",
    reader=binance_spot_csv_reader,
    catalog=catalog,
)

But it will not run correctly without #501

limx0 commented 2 years ago

Yep that makes sense to me - happy with #501. As you say, we have an "official" source of historic data from binance, so I think it makes sense to add a copy of your parser here into nautilus itself, inside the binance adapter. I'll discuss with @cjdsellers.

cjdsellers commented 2 years ago

This seems like a good idea to me placing the parser into the Binance adapter.

However we could do even better and have it return the BinanceBar data structure, rather than the standard Bar (if there is sufficient data).

You find that type defined in adapters/binance/data_types.py.

yohplala commented 2 years ago

This seems like a good idea to me placing the parser into the Binance adapter.

However we could do even better and have it return the BinanceBar data structure, rather than the standard Bar (if there is sufficient data).

You find that type defined in adapters/binance/data_types.py.

Hello @cjdsellers and @limx0, Nice to see you consider this source of data from Binance. Just some comments that you probably already have in mind, but I prefer to share in case you don't have them (may help doing right, right at the 1st time).

cjdsellers commented 2 years ago

On your first few points: I think adding some backtest data download functionality into the Binance adapter would be valuable. I agree taking advantage of check sums is a great idea.

On your 3rd question: Yes, there are some differences between the spot and futures APIs, and at this stage I'm thinking of keeping those as separate classes in the adapter because of this. Work on this will begin soon.

Closing this issue as the PR added the bulk of the functionality, will add some TODO items to #429