nautechsystems / nautilus_trader

A high-performance algorithmic trading platform and event-driven backtester
https://nautilustrader.io
GNU Lesser General Public License v3.0
1.95k stars 445 forks source link

About saving custom data from an actor to a catalog and later streaming this data #1796

Closed faysou closed 1 month ago

faysou commented 1 month ago

Feature Request

Imagine I have some custom data that takes some time to compute, for example option greeks Data.

It could be interesting to have mechanisms: 1) to save this custom data inside the current catalog of a BacktestNode from an actor that computes this custom data 2) to stream this custom data once the catalog is populated, like usual bars or other market data. Given that the Data class requires ts_event and ts_init, the data engine could stream custom data in theory.

limx0 commented 1 month ago

Hi @faysou, just chiming in to say that I've had the exact same thought, and this is something I would love to see in Nautilus. I don't think its necessary trivial though; any change in backtests inputs would probably need to invalidate the "cached" data in the catalog.

We would probably need to do something like:

I don't have a huge amount of time to commit right now, but happy to give guidance or thoughts on anything that might progress this idea.

faysou commented 1 month ago

What we want to do seems to be possible when looking at test_backtest_data_config_custom_data (for streaming) and test_catalog_custom_data (for writing).

faysou commented 1 month ago

So writing custom data is already possible using a streaming feather writer added to the BacktestEngineConfig, for example:

streaming = StreamingConfig(
    catalog_path=streaming_catalog_path,
    fs_protocol="file",
    include_types=[MyData]
)

What is needed is a way to stream data back to a subsequent backtest, using StreamingFeatherReader for example, in order to avoid redoing computations, this way data could be streamed to the message bus directly.

Another idea would be to convert a feather file to a parquet catalog. This could be even easier. I don't know though what nautilus expects as fields for a catalog and if custom data can be streamed from a parquet catalog.

import pyarrow as pa
import pyarrow.feather as feather
import pyarrow.parquet as pq

table = feather.read_table("file.feather")
pq.write_table(table, "path/file.parquet")

Another idea would be a StreamingCatalogWriter, to write custom data directly to catalog compatible file.

It would be good as well to be able to specify a path where to save the streaming data in streaming config, by default the data is saved in a folder correconspong to the config.id of a backtest.

faysou commented 1 month ago

It seems that a parquet catalog already handles custom data, so the thing missing would be a streaming parquet writer similar to the link below for feather (the problem with the feather format is that it's uncompressed). It would probably need an abstract class similar to what is done for catalogs.

https://github.com/nautechsystems/nautilus_trader/blob/develop/nautilus_trader/persistence/writer.py

A temporary solution would be to convert a feather file written by the current solution into a catalog.

faysou commented 1 month ago

I suppose the simple solution is to make a custom data type serialisable to Arrow like what is done for NewsData in tests, then it can simply be written using the write_data method of a parquet catalog from an actor that produces the useful data, and then the custom data can be subscribed in another backtest by an actor and from a calalog using BacktestDataConfig.

Given that streaming to feather files seems to be more for debugging puporse, there's maybe no need to implement a streaming parquet writer, although this would be a logical extension compared to a feather writer. There just needs to be some abstract streaming and config classes so the kernel can instantiate arbitrary streaming configs (same thing for catalogs and the kernel that can only instantiate parquet catalogs, but an abstract catalog class already exist).

Edit: streaming to a parquet catalog is not practical as parquet overwrites files. The only solution remaining is convert feather to parquet.

faysou commented 1 month ago

I managed to do it in the end. It's worth doing though only if the overhead of getting cached custom data from the catalog through the message bus is lower than actually doing the computation; in my case this isn't the case.

The key is to use various functionalities of a catalog. When configuring a backtest node to stream custom data, it's then possible to convert the streamed custom data after a backtest to actual catalog data in parquet. The key function I wrote it this one (it could maybe be added to the ParquetDataCatalog class):

from nautilus_trader.persistence.funcs import class_to_filename
from pathlib import Path

def convert_stream_to_data(backtest_results, catalog, custom_data_type):
    table_name = class_to_filename(custom_data_type)
    feather_file = Path(catalog.path) / 'backtest' / backtest_results[0].instance_id / f'{table_name}.feather'

    feather_table = catalog._read_feather_file(feather_file)
    custom_data = catalog._handle_table_nautilus(feather_table, custom_data_type)

    #note: write_data overwrites previous data of the same type
    catalog.write_data(custom_data)

The streaming configuration looks like this in order to save data published on the message bus:

streaming = StreamingConfig(
    catalog_path=catalog.path,
    fs_protocol="file",
    include_types=[GreeksData, Bar, QuoteTick]  

    #there's currently a bug where more data needs to be streamed 
    #than the one we want so there's no error
)

engine = BacktestEngineConfig(
    strategies=strategies,
    streaming=streaming,
    logging=LoggingConfig(log_level="ERROR")
)

After a backtest with a BacktestNode the function above needs to be called:

node = BacktestNode(configs=[config])
backtest_results = node.run()
convert_stream_to_data(backtest_results, catalog, GreeksData)

The backtest data config looks like this after the streamed data has been converted to parquet and can be consumed as usual data from the message bus:

BacktestDataConfig(
    data_cls=GreeksData,
    catalog_path=catalog.path,
    client_id="GreeksData",
)  

Also the custom data needs to implement parquet serialization functions similar to here https://github.com/nautechsystems/nautilus_trader/blob/develop/nautilus_trader/test_kit/stubs/persistence.py#L68 as well as functions to communicate using the message bus as described here https://nautilustrader.io/docs/latest/concepts/advanced/custom_data#option-greeks-example .

You may also need to delay the execution of your logic in a strategy by a fraction of a second using self.clock.set_time_alert so the loaded custom data has time to be processed by the message bus before the strategy is run.