Cryptrality / backtester

3 stars 2 forks source link

Get Binance data / store into a TSDB ? / get data as dataframe #2

Open c0indev3l opened 1 year ago

c0indev3l commented 1 year ago

Hello,

for storing historical data you may be interested in using a timeseries database.

Here is some code to download data from Binance

import datetime
from binance_historical_data import BinanceDataDumper

data_dumper = BinanceDataDumper(
    path_dir_where_to_dump=".",
    asset_class="spot",  # spot, um, cm
    data_type="klines",  # aggTrades, klines, trades
    data_frequency="1h",
)
data_dumper.dump_data(
    tickers="BTCUSDT",
    date_start=datetime.date(2023, 7, 1),
    date_end=None,
    is_to_update_existing=False,
)

using https://pypi.org/project/binance-historical-data/

Store data into an InfluxDB database

from influxdb_client import InfluxDBClient, WriteOptions
import datetime
import pandas as pd

data_source = "binance"
asset_class = "spot"  # spot, um, cm
storage_frequency = "daily"  # daily, monthly
data_type = "klines"  # aggTrades, klines, trades
symbol = "BTCUSDT"
data_frequency = "1h"
dt = datetime.date(2023, 7, 1)
fname = f"C:\\Users\\w4c\\data\\{data_source}\\{asset_class}\\{storage_frequency}\\{data_type}\\{symbol}\\{data_frequency}\\{symbol}-{data_frequency}-{dt.year}-{dt.month:02}-{dt.day:02}.csv"
with InfluxDBClient.from_env_properties() as client:
    columns = [
        "OpenTime",
        "Open",
        "High",
        "Low",
        "Close",
        "Volume",
        "CloseTime",
        "Quote asset volume",
        "Number of trades",
        "Taker buy base asset volume",
        "Taker buy quote asset volume",
        "Ignore",
    ]
    for df in pd.read_csv(fname, chunksize=1_000, names=columns):
        # for col in ["OpenTime", "CloseTime"]:
        for col in ["OpenTime"]:
            df[col] = pd.to_datetime(df[col], unit="ms")
        df["CloseTime"] *= 1_000_000.0
        df["data_source"] = data_source
        df["asset_class"] = asset_class
        df["data_type"] = data_type
        df["data_frequency"] = data_frequency
        df["symbol"] = symbol
        print(df)
        print(df.dtypes)
        with client.write_api() as write_api:
            try:
                write_api.write(
                    record=df,
                    bucket="data",
                    data_frame_measurement_name="crypto",
                    data_frame_tag_columns=[
                        "data_source",
                        "asset_class",
                        "data_type",
                        "symbol",
                        "data_frequency",
                    ],
                    data_frame_timestamp_column="OpenTime",
                )
            except Exception as e:
                print(e)

using https://github.com/influxdata/influxdb-client-python

Retrieve data as Pandas DataFrame

from influxdb_client import InfluxDBClient, WriteOptions
import pandas as pd

pd.options.display.max_rows = 10
pd.options.display.max_columns = 20

data_source = "binance"
asset_class = "spot"  # spot, um, cm
data_type = "klines"  # aggTrades, klines, trades
symbol = "BTCUSDT"
data_frequency = "1h"

dt_from = pd.to_datetime("2023-07-01")
dt_to = pd.to_datetime("2023-07-02")

ts_from = int(dt_from.timestamp())
ts_to = int(dt_to.timestamp())

# query = 'from(bucket:\"data\") |> range(start:-30d)'
query = f"""from(bucket:"data") 
|> range(start: {ts_from}, stop: {ts_to}) 
|> filter(fn: (r) => r.data_source == "{data_source}"
   and r.asset_class == "{asset_class}" 
   and r.data_type == "{data_type}" 
   and r.symbol == "{symbol}" 
   and r.data_frequency == "{data_frequency}"
)
|> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")"""

with InfluxDBClient.from_env_properties() as client:
    df = client.query_api().query_data_frame(query=query)
    if len(df) > 0:
        df.drop(columns=["result", "table", "_start", "_stop"], inplace=True)
        df.rename(columns={"_time": "OpenTime"}, inplace=True)
        df["CloseTime"] = pd.to_datetime(df["CloseTime"])
    print(df)

I'm still facing an issue https://github.com/influxdata/influxdb-client-python/issues/592

Maybe an other TSDB should be considered ? TimescaleDB for example.

An other approach could be to simply store data as Parquet or Feather files (or an other format) into a hierarchical directory

Kind regards

ffavero commented 1 year ago

nice, right now it's plain tsv files, I was thinking to index the files to be able to use cached data if overlapping with the requested time window, but I'd gladly uses the correct tools if already existing

c0indev3l commented 1 year ago

data pipelines / ETL (extract transform and load) is probably the way to follow. Unfortunately I'm not DevOps. Docker / docker-compose is probably also required (and I'm quite beginer in this part also)

ffavero commented 1 year ago

but the data source is working already, it's not optimized, but it useable to some extents, there are many other aspect of the tools than needs to be improved/fixed. Are you running/testing the tool?

c0indev3l commented 1 year ago

I'm considering it. But haven't used it currently