bmoscon / cryptofeed

Cryptocurrency Exchange Websocket Data Feed Handler
Other
2.21k stars 682 forks source link

what is the right way to use Backends without Feedhandler to store Exchange rest data? #1045

Open keiser1080 opened 3 months ago

keiser1080 commented 3 months ago

hi i am trying to fetch historical exchange data, and to use backend callback to store the data.

My code do the job but i am looking a cleaner way to do that.

import json
from datetime import datetime
import asyncio
from datetime import timedelta, UTC
from cryptofeed import FeedHandler
from cryptofeed.backends.quest import CandlesQuest
from cryptofeed.defines import CANDLES
from cryptofeed.exchanges import BinanceFutures
from backtest.config import settings

bf = BinanceFutures()
cq = CandlesQuest(host=settings.quest_host, port=settings.quest_port)
# async def candle_callback(c, receipt_timestamp):
#     print(f"Candle received at {receipt_timestamp}: {c}")

async def get_history(loop, start, end):
    cq.start(loop)
    cq.running
    async for data in bf.candles(
        "BTC-USDT-PERP",
        start,
        end,
    ):
        print("=================")

        for row in data:
            d = row.to_dict()
            d["receipt_timestamp"] = d["timestamp"] + 1
            await cq(row, d["receipt_timestamp"])
    await cq.stop()
    return

def main():
    config = {"log": {"filename": "demo.log", "level": "DEBUG", "disabled": False}}
    today = datetime.now(UTC)
    yesterday = datetime.now(UTC) - timedelta(minutes=24)
    start = yesterday.strftime("%Y-%m-%d %H:%M:%S")
    end = today.strftime("%Y-%m-%d %H:%M:%S")
    f = FeedHandler(config=config)
    f.run(start_loop=False)
    f.add_feed(
        BinanceFutures(
            symbols=["BTC-USDT-PERP"],
            channels=[CANDLES],
            callbacks={
                CANDLES: CandlesQuest(
                    host=settings.quest_host, port=settings.quest_port
                )
            },
        )
    )
    loop = asyncio.get_event_loop()
    # loop.create_task(get_history(loop, start, end))
    loop.run_until_complete(get_history(loop, start, end))
    loop.run_forever()

if __name__ == "__main__":

    main()

It work until but it raise an exception when i stop the script "CTRL-C"


2024-06-27 17:06:56,495 : INFO : BINANCE_FUTURES: starting backend task CandlesQuest with multiprocessing=False
2024-06-27 17:06:56,497 : DEBUG : BINANCE_FUTURES.http.0: create HTTP session
2024-06-27 17:06:56,497 : DEBUG : BINANCE_FUTURES.http.0: requesting data from https://fapi.binance.com/fapi/v1/klines?symbol=BTCUSDT&interval=1m&limit=1000&startTime=1719520976000&endTime=1719522416000
2024-06-27 17:06:56,515 : DEBUG : BINANCE_FUTURES.ws.2: connecting to wss://fstream.binance.com/stream?streams=btcusdt@kline_1m
=================
^CTask was destroyed but it is pending!
task: <Task pending name='Task-4' coro=<QuestCallback.writer() running at /home/me/project/backtest/.venv/lib/python3.12/site-packages/cryptofeed/backends/quest.py:27> wait_for=<Future pending cb=[Task.task_wakeup()]>>
backtest-py3.12```