nautechsystems / nautilus_trader

A high-performance algorithmic trading platform and event-driven backtester
https://nautilustrader.io
GNU Lesser General Public License v3.0
1.95k stars 445 forks source link

Improving backtest config & execution #354

Closed limx0 closed 3 years ago

limx0 commented 3 years ago

We've had a few questions come up recently about how to execute and parallelize backtests, I'm opening this issue to document some thoughts. Nautilus currently runs a single backtest extremely fast and well, but the functionality for doing anything more than that is lacking (multiple strategies or instruments, out of memory data, parameter optimisation etc).

Theres a broad range of strategies and instrument combinations that nautilus can support. Assuming the following dummy inputs:

I can list (at least; not at all totally inclusive) the following ways someone might want to configure a backtest :

  1. Running two strategies over all the data with differing parameters
    • Example: Different strategies (mean reversion, momentum) with stock/equity data and parameteisation of those strategy (20d ewma vs 30d ewma) that run over the whole dataset (from start date - end date).
  2. Running a single strategy over different instruments with no overlap
    • Example: A strategy that trades the front (nearest) month future as it approaches expiry. No necessarily dependent on previous instruments positions (can be parallelised)
  3. Running a single strategy over multiple related instruments.
    • Example: An options trading strategy that uses multiple expiries and is dependent on previous positions.

In each of the examples above, the ways in which we can parallelise the backtest (and data loading) is quite different. Some strategies are "embarrassingly parallel"; each chunk (date / strategy / params/ instrument) that we process (or at least a subset) can run without consideration for the others, however others (strategies with dependent paths / years of data) will require some state. We then need the ability to configure the backtest engine as required, and ideally be able to ship this config in such a way that we can leverage libraries like dask for (far superior) parallel execution

I think this basically boils down to creating a "BacktestConfig" object of the form BacktestConfig(strategies=[], instruments=[], data=[], parameters={}). This object won't do any actual creating of objects or loading data, but will contain enough information for the BacktestEngine (or similar) to do so.

Which we can generate quickly and pass to one a wrapper over BacktestEngine that can actually execute it (with any of your choice of execution framework, ie dask).

I'd like to hear others thoughts on this (if it does or does not suit your use case!) and I will try and get a POC going.

yohplala commented 3 years ago

ideally be able to ship this config in such a way that we can leverage libraries like dask for (far superior) parallel execution

Hi Bradley! Having not deep dived into this, my question is probably very naive. I understand Nautylus speed comes a lot from cythonization. But if you intend to use Dask for parallelization, will Dask be able to parallelize Nautylus cythonized functions? (my understanding is that everything Dask can parallelize is its own functions, which mostly mimic / are based on pandas functions)

As per my current understanding (probably wrong), you would need to rewrite Nautylus function with Dask ones and those would probably not be cythonized any longer. Am I wrong?

Thanks for the feedbacks, Bests,

limx0 commented 3 years ago

Hey @yohplala, dask has a few lower level interfaces than the pandas/dataframe API like delayed and also the ability to write custom accept custom task graphs (https://docs.dask.org/en/latest/custom-graphs.html). We could leverage this alongside some rules based on which kind of configuration (from above) you have to parallelise work (and IO / loading & preparing data)

thematz1 commented 3 years ago

I use dask to load and preprocess my data for backtesting. With the .compute() method I turn it into a pd.DataFrame which is the type used across the program. A way to stream data in from a database in backtesting would help when dealing with large amounts of trade volume like BTC/USDT which I cannot hold in memory as 89 days takes over 40gb of RAM.

Streaming from a database could also improve overhead for other systems such as data storage and scraping for backtesting. It opens the door to include many high volume asset trade streams, for longer periods of time, within an individual backtest engine. I love your idea for a BacktestConfig within the backtest engine that could enable some features.

I have been thinking of using batches to parallel engines to execute parameter searches. What I mean is that I would like to take a trade stream and strategy and try different strategy inputs to find the optimal result. I did it with some rough code, I passed the engine through some sloppy classes and functions to achieve my goal. I deleted my code in my inexperience with Github, i had not optimized to use the cache option for repeat runs, and it was not very clean. Basically the ideal class for me would check the maximum cores, load the number of runs and batch the task out. A log that showed the current best parameters would be much cleaner than attempt and integrate into the grand scheme better. Your idea for a config has inspired me for to think about implementation. The code in the develop branch is teaching me a lot so thanks for the time communicating these things in threads. @limx0

The combination of the aforementioned would make me feel like i have the ability to do portfolio optimization through backtesting active trading strategies, or even do this recursively in tandem with the live node. The idea is that parameters are confirmed to be optimal over the prescribed backtest period.

limx0 commented 3 years ago

Hey @thematz1 thanks for the response and info! It seems like we're very much on the same page (what you describe; batch loading a dataset and parameter optimisation is exactly what I'm talking about here).

I will try and get an initial proof of concept PR together this week. It's not going to be pretty (as with the backtest data loader/catalog) but I hopefully its enough to start discussing what this should look like (and having a play with it).

crazy25000 commented 3 years ago

@limx0 what you described is exactly what I was trying to implement. I'm still learning the internals so it was a bit more difficult for me to implement it as quickly as I hoped the other day. A proof of concept PR would be awesome!

crazy25000 commented 3 years ago

Also wanted to mention https://github.com/ray-project/ray as another potential framework. It would be easy to scale data, parametrize, and scale ML since it's framework agnostic. Having the ability to use any framework like you mentioned would also make Nautilus more interoperable.

Which we can generate quickly and pass to one a wrapper over BacktestEngine that can actually execute it (with any of your choice of execution framework, ie dask).

crazy25000 commented 3 years ago

Here's a basic working example of using it with Ray in case anyone wants to experiment:

Using the cross over example https://github.com/nautechsystems/nautilus_trader/blob/master/examples/backtest/fx_ema_cross_gbpusd_bars.py, if we want to optimize for the sharpe ratio:

import ray
from ray import tune
from ray.tune import JupyterNotebookReporter
from ray.tune.schedulers import AsyncHyperBandScheduler
from ray.tune.suggest import ConcurrencyLimiter
from ray.tune.suggest.skopt import SkOptSearch

ray.init(ignore_reinit_error=True, log_to_driver=False, include_dashboard=False, num_cpus=1, num_gpus=1, local_mode=False)

metric = "sharpe_ratio"
skopt_algo = SkOptSearch(metric=metric, mode="max")
scheduler = AsyncHyperBandScheduler()

def run_strategy_instance(params, checkpoint_dir=None):
    engine = BacktestEngine(use_data_cache=True, bypass_logging=True)  # Pre-cache data for increased performance on repeated runs

    # Setup trading instruments
    SIM = Venue("SIM")
    GBPUSD = TestInstrumentProvider.default_fx_ccy("GBP/USD", SIM)

    # Setup data
    engine.add_instrument(GBPUSD)
    engine.add_bars(
        instrument_id=GBPUSD.id,
        aggregation=BarAggregation.MINUTE,
        price_type=PriceType.BID,
        data=TestDataProvider.gbpusd_1min_bid(),  # Stub data from the test kit
    )
    engine.add_bars(
        instrument_id=GBPUSD.id,
        aggregation=BarAggregation.MINUTE,
        price_type=PriceType.ASK,
        data=TestDataProvider.gbpusd_1min_ask(),  # Stub data from the test kit
    )

    # Create a fill model (optional)
    fill_model = FillModel(
        prob_fill_at_limit=0.2,
        prob_fill_at_stop=0.95,
        prob_slippage=0.5,
        random_seed=42,
    )

    # Optional plug in module to simulate rollover interest,
    # the data is coming from packaged test data.
    interest_rate_data = pd.read_csv(os.path.join(PACKAGE_ROOT, "data", "short-term-interest.csv"))

    # Add an exchange (multiple exchanges possible)
    # Add starting balances for single-currency or multi-currency accounts
    engine.add_venue(
        venue=SIM,
        venue_type=VenueType.ECN,
        oms_type=OMSType.HEDGING,  # Venue will generate position_ids
        account_type=AccountType.MARGIN,
        base_currency=USD,  # Standard single-currency account
        starting_balances=[Money(1_000_000, USD)],
        fill_model=fill_model,
    )

    # Instantiate your strategy
    strategy = EMACross(
        instrument_id=GBPUSD.id,
        bar_spec=BarSpecification(5, BarAggregation.MINUTE, PriceType.BID),
        fast_ema_period=params["ema_p1"],
        slow_ema_period=params["ema_p2"],
        trade_size=Decimal(1_000_000),
        order_id_tag="001",
    )

    # Run the engine from start to end of data
    engine.run(strategies=[strategy])
    sharpe_ratio = engine.analyzer.sharpe_ratio()
    engine.dispose()

    tune.report(sharpe_ratio=sharpe_ratio)

analysis = tune.run(
    run_strategy_instance,
    metric=metric,
    mode="max",
    log_to_file=False,
    verbose=3,
    name="strategy_run_example",
    search_alg=skopt_algo,
    scheduler=scheduler,
    num_samples=5,
    config={
        "ema_p1": tune.qlograndint(100, 500, 100),
        "ema_p2": tune.qlograndint(500, 1000, 100),
    },
)

print(f"Best strategy params: {analysis.best_result['sharpe_ratio']}   config: {analysis.best_config}")
cjdsellers commented 3 years ago

Flagging that #361 is about to bring significant advances in this area, thanks to @limx0.

thematz1 commented 3 years ago

@crazy25000

Thank you so much for your introduction of Ray to me. I took some time to modify your code using the tune.Trainable class and reuse_actor=True flag. In addition I used tune.with_parameters to have the dataframe saved in the plasma store although the engine data cache still holds in ram.

In the process of coding this I was wondering if the data is being copied multiple times within the engine and if some manual garbage collection should take place. It seems the dataframe is ~350mb but the data in ram becomes significantly more in two stages before the test runs. I am excited to see how data streaming and the other modifications could help take this even further.

I would love to paste the code but it keeps messing up loool. Here is the main section of code.

` class Trainable(tune.Trainable):

# Creates the engine with cached data producer
# Uses serialized dataframe in plasma folder
def setup(self, config, checkpoint_dir=None, data=None):

    self.params = config
    # Setup trading instruments
    # Requires an internet connection for the instrument loader
    # Alternatively use the TestInstrumentProvider in the test kit
    print("Loading instruments...")
    instruments = CCXTInstrumentProvider(client=ccxt.binance(), load_all=True)

    self.engine = BacktestEngine(use_data_cache=True, bypass_logging=True)  # Pre-cache data for increased performance on repeated runs

    # Setup trading instruments
    BINANCE = Venue("BINANCE")
    instrument_id = InstrumentId(symbol=Symbol("ADA/BTC"), venue=BINANCE)
    self.ADABTC_BINANCE = instruments.find(instrument_id)

    # Setup data
    self.engine.add_instrument(self.ADABTC_BINANCE)
    self.engine.add_trade_ticks(self.ADABTC_BINANCE.id, data)

    # Create a fill model (optional)
    fill_model = FillModel(
        prob_fill_at_limit=0.2,
        prob_fill_at_stop=0.95,
        prob_slippage=0.5,
        random_seed=42,
    )

    # Add an exchange (multiple exchanges possible)
    # Add starting balances for single-currency or multi-currency accounts
    self.engine.add_venue(
        venue=BINANCE,
        venue_type=VenueType.EXCHANGE,
        oms_type=OMSType.NETTING,
        account_type=AccountType.CASH,  # Spot cash account
        base_currency=None,  # Multi-currency account
        starting_balances=[Money(200, ADA), Money(0.0005, BTC)],
        fill_model=fill_model,
    )

# Engine runs strategy with new parameters
def step(self):
    # Instantiate your strategy
    # Log will show reverse of params (do the inverse manually)
    if self.params["ema_p1"] < self.params["ema_p2"]:
        strategy = EMACrossWithTrailingStop(
            instrument_id=self.ADABTC_BINANCE.id,
            bar_spec=BarSpecification(15, BarAggregation.MINUTE, PriceType.LAST),
            fast_ema_period=self.params["ema_p1"],
            slow_ema_period=self.params["ema_p2"],
            atr_period=self.params["atr_period"],
            trail_atr_multiple=self.params["trail_atr_multiple"],
            trade_size=Decimal("100"),
            order_id_tag="001",
        )
    else:
        strategy = EMACrossWithTrailingStop(
            instrument_id=self.ADABTC_BINANCE.id,
            bar_spec=BarSpecification(15, BarAggregation.MINUTE, PriceType.LAST),
            fast_ema_period=self.params["ema_p2"],
            slow_ema_period=self.params["ema_p1"],
            atr_period=self.params["atr_period"],
            trail_atr_multiple=self.params["trail_atr_multiple"],
            trade_size=Decimal("100"),
            order_id_tag="001",
        )

    # Run the engine from start to end of data
    self.engine.run(strategies=[strategy])
    realize_pnl_pct = self.engine.analyzer.total_pnl_percentage(BTC)
    self.engine.reset()

    tune.report(realize_pnl_pct=realize_pnl_pct)

    return {"realize_pnl_pct": realize_pnl_pct, "training_iteration": 1, "done": True}

# Reselts the config params at the class level
def reset_config(self, new_config):
    del self.params
    self.params = new_config
    return True

`

I am trying a PBT scheduler, hence the training_iteration in the return. I'm unsure of the raining_iterations part, I think it needs to increment, but the speed of testing is much greater with the cache being used.

crazy25000 commented 3 years ago

@thematz1 that's awesome, glad it was useful! And yes you are right that the data is duplicated. @limx0 is working on adding a workflow that would allow us to handle multiple/parametrized backtests more efficiently here https://github.com/nautechsystems/nautilus_trader/pull/361

You can read the discussion here about my initial attempt at loading the data once and why we have to load the data every time until the above PR is done: https://github.com/nautechsystems/nautilus_trader/discussions/351

It eventually came down to not being able to pickle the data unless we update the core implementations:

Can't pickle local object '__Pyx_CFunc

cjdsellers commented 3 years ago

Serializable config object provided through pydantic were added in the 1.128.0 release.

This will allow the above style of distributed workflows to be setup more easily.

Closing now in favor of more granular issues.