nautechsystems / nautilus_trader

A high-performance algorithmic trading platform and event-driven backtester
https://nautilustrader.io
GNU Lesser General Public License v3.0
1.71k stars 402 forks source link

Add Support for Aggregate Trades #918

Closed poshcoe closed 1 year ago

poshcoe commented 1 year ago

Feature Request

Binance provides a market data endpoint (GET /api/v3/aggTrades) and a websocket stream (<symbol>@aggTrade) for trades in compressed/aggregate form, for both spot and futures.

The data endpoint can return historical trades between a startTime and endTime (limit 1000). Repeated requests could be used to build historical trade data, which cannot be done with only the currently supported/exposed GET /api/v3/trades endpoint via request_trade_ticks. Hence the current limitation preventing historical time ranges in adapters/binance/spot/data.py#L429

At the moment the extent of aggregate trades support are only on the HTTP & websocket layers within the Binance adapter. I propose these aggregate trade data sources be implemented completely and exposed to strategies.

To support aggregate trades, either a new data type could be created (AggTradeTick), or the existing TradeTick might be extended, replacing the trade_id field with first_trade_id and last_trade_id.

I would be happy to work on this feature, I need it for my application and would prefer to contribute than make a workaround.

OnlyC commented 1 year ago

@poshcoe I've done the workaround with a custom adapter. You can follow mine to make a PR. Compare my custom adapter to binance to see the differences. I didn't create new data type for AggTradeTick as I don't need that deep. Just use it like normal trade tick using first_trade_id. Then you can use on_historical_data function on strategy, to trade or to build bars using those trades.


# Helper function
def parse_aggtrade_tick_http(
    instrument_id: InstrumentId,
    trade: BinanceAggregatedTradeData,
    ts_init: int,
) -> TradeTick:
    return TradeTick(
        instrument_id=instrument_id,
        price=Price.from_str(trade["p"]),
        size=Quantity.from_str(trade["q"]),
        aggressor_side=AggressorSide.SELL if trade["m"] else AggressorSide.BUY,
        trade_id=TradeId(str(trade["f"])),
        ts_event=millis_to_nanos(trade["T"]),
        ts_init=ts_init,
    )

# Changes in binance data adapter
def request_trade_ticks(
        self,
        instrument_id: InstrumentId,
        limit: int,
        correlation_id: UUID4,
        from_datetime: Optional[pd.Timestamp] = None,
        to_datetime: Optional[pd.Timestamp] = None,
    ) -> None:
        if limit == 0 or limit > 1000:
            limit = 1000

        if from_datetime is not None or to_datetime is not None:
            self._log.warning(
                "Trade ticks have been requested with a from/to time range, "
                f"however the request will be for the most recent {limit}."
            )

        self._loop.create_task(
            self._request_trade_ticks(
                instrument_id, limit, correlation_id, from_datetime
            )
        )

async def _request_trade_ticks(
      self,
      instrument_id: InstrumentId,
      limit: int,
      correlation_id: UUID4,
      from_datetime: Optional[pd.Timestamp] = None,
  ) -> None:
      start_time_ms = None
      if from_datetime:
          start_time_ms = int(time.mktime(from_datetime.timetuple()) * 1000)
      agg_trades: List[TradeTick] = []
      found = 1000
      while found == 1000 and start_time_ms is not None:

          response: List[
              BinanceAggregatedTradeData
          ] = await self._http_market.agg_trades(
              symbol=instrument_id.symbol.value,
              start_time_ms=start_time_ms,
              limit=limit,
          )
          # print("start_time_ms: ", start_time_ms)
          # print("first: ", response[0]["a"])
          # print("last: ", response[len(response) - 1]["a"])

          agg_trades = agg_trades + response
          start_time_ms = response[len(response) - 1]["T"]
          found = len(response)
          # print(found)

      ticks: List[TradeTick] = [
          parse_aggtrade_tick_http(
              trade=trade,
              instrument_id=instrument_id,
              ts_init=self._clock.timestamp_ns(),
          )
          for trade in agg_trades
      ]

      self._handle_trade_ticks(instrument_id, ticks, correlation_id)
poshcoe commented 1 year ago

Thanks @OnlyC, I'll use the above as reference