Closed TheRealBecks closed 2 years ago
Hey @TheRealBecks - apologies about the confusion regarding the backtesting, this section of the docs has been under a lot of churn as you've discovered. There are more docs coming soon (see https://github.com/nautechsystems/nautilus_trader/pull/528), but in the mean time let me help you to get started!
I've just pushed an update to the that PR on the Loading External Data section - could you check it out and see if you are able to follow along for your own example? (And provide any feedback about things that are unclear!)
You'll also need to install the develop branch - I had to push a bug fix. You can do this via pip with
pip install git+https://github.com/nautechsystems/nautilus_trader.git@develop
Thanks for the quick reply! I now started reading the documentation and working on loading the data. I'm doing my best not to ask stupid Python and trading questions as I'm still learning both topics. Here's my feedback:
Loading data via Reader classes:
parser
as your self written parser function, that will be added and explained in the next paragraphQuoteTick
and TradeTick
?Forex
, but I'm trading stocks. These CSV headers are like timestamp,open,high,low,close,volume
, how to use this in this code example?API Reference
links is broken: https://github.com/nautechsystems/nautilus_trader/blob/develop/docs/2_user_guide/%22../3_api_reference/persistence%22
datetime.datetime.strptime()
? Do I need to use that always? Are there different timestamps is use? My timestamp is 2018-12-24 04:01:00
, so I have to change the string from %Y%m%d %H%M%S%f
to %Y%m%d %H%M%S
(without %f
for nanoseconds), am I right? It would be good to say a word about different timestamps and strings, a short example could help.CSVReader
will be imported, but the code is missing. But nevertheless it should be coded in the Loading data via Reader classes
paragraph?bid=Price.from_str(data['bid'].decode())
: Do you load the values as string so I have to use from_str
? E.g. my CSV data looks like this: 2018-12-24 04:01:00,17.01,17.01,16.9,17.0,1300
bid_size=Quantity.from_int(100_000)
and ask_size
exactly mean? What is from_int(100_000)
, specifically 100_000
? Why is there a _
?In general:
Reader
, QuoteTick
and TradeTick
and other classes? If yes, link it to the paragraphs where you introduce the classes.No more time for today, I will read and think about the other paragraphs the next days. I will also try to get my code working, but as you can read I still have many questions how to use CSVReader
and parser
:)
Hi @TheRealBecks
We really appreciate your detailed feedback on the docs examples. This is invaluable insight into the users perspective on how we present the information, and will help us refine the docs as we continue to build them up!
So many thanks, and just letting you know I'll be going through your points tomorrow and start addressing them soon.
I will start another try at the end of the week. I will keep you updated if I get it working :thumbsup:
Hey @TheRealBecks, apologies for the slow follow up. I'll try and address some of your specific questions here, and then we'll try and update the docs in the next week or so.
What's the difference between QuoteTick and TradeTick?
A QuoteTick is an update in the top level quote of the orderbook (bid price or ask price) and a TradeTick is a trade that happens in the market.
In the first sentence your talking explicitly about Forex, but I'm trading stocks. These CSV headers are like timestamp,open,high,low,close,volume, how to use this in this code example?
This data is typically called bar data (open, high, low, close). There is a Bar
data type in nautilus so you would simply adjust the function to match your data, something like below.
Note - the Bar data type also takes a bar_type
argument, you need to tell it about the spec of the bars (1 min bars) for example - this will depend on your data
from nautilus_trader.model.data.bar import Bar
def parser(data, instrument_id):
""" Parser function for stock OHLC data, for use with CSV Reader """
dt = pd.Timestamp(data['timestamp'], tz='UTC')
bar_type=BarType(
instrument_id,
BarSpecification(1, BarAggregation.MINUTE, PriceType.LAST),
AggregationSource.EXTERNAL, # Bars are aggregated in the data already
)
yield Bar(
instrument_id=instrument_id,
type=bar_type,
open=data['open'],
high=data['high'],
low=data['low'],
close=data['close'],
volume=data['volume'],
ts_event=dt_to_unix_nanos(dt),
ts_init=dt_to_unix_nanos(dt),
)
What about datetime.datetime.strptime()? Do I need to use that always? Are there different timestamps is use? My timestamp is 2018-12-24 04:01:00, so I have to change the string from %Y%m%d %H%M%S%f to %Y%m%d %H%M%S (without %f for nanoseconds), am I right? It would be good to say a word about different timestamps and strings, a short example could help.
The only requirement is getting your timestamps into UTC nanoseconds; how you do that is up to you. What you have above will work fine.
CSVReader will be imported, but the code is missing. But nevertheless it should be coded in the Loading data via Reader classes paragraph?
Thanks, this should be fixed in the docs.
bid=Price.from_str(data['bid'].decode()): Do you load the values as string so I have to use from_str? E.g. my CSV data looks like this: 2018-12-24 04:01:00,17.01,17.01,16.9,17.0,1300
The data that is read is kept as bytes
, which is why we do .decode()
, you will do something similar, but passing the fields to your Bar, ie open=Price.from_str(data['open'].decode())
.
What do bid_size=Quantity.from_int(100_000) and ask_size exactly mean? What is from_int(100_000), specifically 100000? Why is there a ?
Bid/ask size is the volume available on the top level of the orderbook. For bar data, this is not relevant. You will pass a volume to like Bar(volume=data['volume'])
. The 100_000
is just a formatting nicety; its exactly the same as 100000
If you have any other questions - don't hesistate to ask. I'm going to leave this issue open until we get some of these updates into the docs.
Btw - the docs are now live at https://docs.nautilustrader.io, which is much nicer than looking at the notebooks in the repo.
Ok, next try, but I'm still stuck. I also don't understand why reading and writing data is so different in my example compared to the new documentation.
Here is my current source code:
import fsspec
import pandas as pd
import os
import shutil
from nautilus_trader.backtest.data.providers import TestInstrumentProvider
from nautilus_trader.core.datetime import dt_to_unix_nanos
from nautilus_trader.model.c_enums.bar_aggregation import BarAggregation
from nautilus_trader.model.data.bar import Bar, BarType, BarSpecification
from nautilus_trader.model.enums import AggregationSource, PriceType
from nautilus_trader.persistence.catalog import DataCatalog
from nautilus_trader.persistence.external.core import process_files, write_objects
from nautilus_trader.persistence.external.readers import TextReader
from pathlib import Path
# Current working directory path
cwd = Path(__file__).absolute().resolve().parents[0]
data_file = cwd.parents[1].joinpath(
"trading-data/stock/alphavantage/1min/amd_nautilus_trader.csv"
)
CATALOG_PATH = cwd.parents[1].joinpath("nautilus_trader_data")
# Clear if it already exists, then create fresh
if os.path.exists(CATALOG_PATH):
shutil.rmtree(CATALOG_PATH)
os.mkdir(CATALOG_PATH)
# fs = fsspec.filesystem("file")
# raw_files = fs.glob(data_file)
# assert raw_files, f"Unable to find file: {data_file}"
def parser(data, instrument_id):
"""Parser function for stock OHLC data, for use with CSV Reader"""
dt = pd.Timestamp(data["timestamp"], tz="UTC")
bar_type = BarType(
instrument_id,
BarSpecification(1, BarAggregation.MINUTE, PriceType.LAST),
AggregationSource.EXTERNAL, # Bars are aggregated in the data already
)
yield Bar(
instrument_id=AMD.id,
type=bar_type,
open=data["open"],
high=data["high"],
low=data["low"],
close=data["close"],
volume=data["volume"],
ts_event=dt_to_unix_nanos(dt),
ts_init=dt_to_unix_nanos(dt),
)
catalog = DataCatalog(CATALOG_PATH)
process_files(
glob_path=data_file,
reader=TextReader(line_parser=parser),
catalog=catalog,
)
write_objects(catalog, [AMD])
I commented my fsspec
code and used process_files
instead, because that will write my data to the catalog. I do not understand how do do that with fsspec
? And why do I have to use fsspec
when there's a process_files
in the documentation example?
You wrote instrument_id=instrument_id
, but where do I get that instrument_id
from? In the documentation instrument_id=AUDUSD.id
is used, but I will always get a undefined name
Python error. Also a definition or import for AUDUSD.id
is missing in your documentation, but there's AUDUSD = TestInstrumentProvider
. ...and I think that I also need this TestInstrumentProvider? I saw that there's the method aapl_equity
backtest/data/providers.py
. Am I right that I have to define my own method for AMD?
In my opinion the documentation is still not self-explanatory, but you're on a good way! :)
Edit: And I wanted to join your Discord server, but Discord can't find any text channels. Looks like that I can't join your server as a public user?
Hey @TheRealBecks
You're almost there - you just need to tell nautilus about the instrument you want to load, so I would add something like below to create a new Equity
instrument for AMD. Nautilus is quite a low level code base, and does not make assumptions about what instruments you're trading - so you need to tell it what kind of instrument you're trading (in this case we have an Equity
class, but you need to fill in the specifics for AMD)
def make_equity_instrument(name, exchange):
symbol=Symbol(name)
venue=Venue(exchange)
return Equity(
instrument_id=InstrumentId(symbol=symbol, venue=venue),
native_symbol=symbol,
currency=USD,
price_precision=2,
price_increment=Price.from_str("0.01"),
multiplier=Quantity.from_int(1),
lot_size=Quantity.from_int(1),
isin=None,
ts_event=0,
ts_init=0,
)
AMD = make_equity_instrument("AMD", "NYSE")
And then some slight modifications to the parser code, I use CSVParser
based on your data above because it handles the header row. I also use a partial to "bind" the parser function to the AMD instrument id - you do this because nothing inside the CSV file you're reading tells you about what stock is it for - you only know at the top level.
def parser(data, instrument_id):
"""Parser function for stock OHLC data, for use with CSV Reader"""
dt = pd.Timestamp(data["timestamp"], tz="UTC")
bar_type = BarType(
instrument_id,
BarSpecification(1, BarAggregation.MINUTE, PriceType.LAST),
AggregationSource.EXTERNAL, # Bars are aggregated in the data already
)
yield Bar(
bar_type=bar_type,
open=Price.from_str(str(data["open"])),
high=Price.from_str(str(data["high"])),
low=Price.from_str(str(data["low"])),
close=Price.from_str(str(data["close"])),
volume=Quantity.from_str(str(data["volume"])),
ts_event=dt_to_unix_nanos(dt),
ts_init=dt_to_unix_nanos(dt),
check=True,
)
process_files(
glob_path=data_file,
# CSVReader to handle headers (ie csv files) and chunked=False to process row-by-row
reader=CSVReader(block_parser=partial(parser, instrument_id=AMD.id), chunked=False),
catalog=catalog,
)
Finally (whole script I used below) - this gets me the data based on your sample row (also below) into the catalog.
# Sample data
timestamp,open,high,low,close,volume
2018-12-24 04:01:00,17.01,17.01,16.9,17.0,1300
Full script:
import os
import shutil
from functools import partial
from pathlib import Path
import pandas as pd
from nautilus_trader.core.datetime import dt_to_unix_nanos
from nautilus_trader.model.c_enums.bar_aggregation import BarAggregation
from nautilus_trader.model.data.bar import Bar, BarType, BarSpecification
from nautilus_trader.model.enums import AggregationSource, PriceType
from nautilus_trader.model.instruments.equity import Equity
from nautilus_trader.model.identifiers import InstrumentId, Symbol, Venue
from nautilus_trader.model.currencies import USD
from nautilus_trader.model.objects import Price, Quantity
from nautilus_trader.persistence.catalog import DataCatalog
from nautilus_trader.persistence.external.core import process_files, write_objects
from nautilus_trader.persistence.external.readers import TextReader, CSVReader
# Current working directory path
cwd = Path(__file__).absolute().resolve().parents[0]
data_file = cwd.joinpath(
"amd.csv"
)
CATALOG_PATH = cwd.joinpath("nautilus_trader_data")
# Clear if it already exists, then create fresh
if os.path.exists(CATALOG_PATH):
shutil.rmtree(CATALOG_PATH)
os.mkdir(CATALOG_PATH)
# fs = fsspec.filesystem("file")
# raw_files = fs.glob(data_file)
# assert raw_files, f"Unable to find file: {data_file}"
def parser(data, instrument_id):
"""Parser function for stock OHLC data, for use with CSV Reader"""
dt = pd.Timestamp(data["timestamp"], tz="UTC")
bar_type = BarType(
instrument_id,
BarSpecification(1, BarAggregation.MINUTE, PriceType.LAST),
AggregationSource.EXTERNAL, # Bars are aggregated in the data already
)
yield Bar(
bar_type=bar_type,
open=Price.from_str(str(data["open"])),
high=Price.from_str(str(data["high"])),
low=Price.from_str(str(data["low"])),
close=Price.from_str(str(data["close"])),
volume=Quantity.from_str(str(data["volume"])),
ts_event=dt_to_unix_nanos(dt),
ts_init=dt_to_unix_nanos(dt),
check=True,
)
catalog = DataCatalog(CATALOG_PATH)
def make_equity_instrument(name, exchange):
symbol=Symbol(name)
venue=Venue(exchange)
return Equity(
instrument_id=InstrumentId(symbol=symbol, venue=venue),
native_symbol=symbol,
currency=USD,
price_precision=2,
price_increment=Price.from_str("0.01"),
multiplier=Quantity.from_int(1),
lot_size=Quantity.from_int(1),
isin=None,
ts_event=0,
ts_init=0,
)
AMD = make_equity_instrument("AMD", "NYSE")
process_files(
glob_path=data_file,
reader=CSVReader(block_parser=partial(parser, instrument_id=AMD.id), chunked=False),
catalog=catalog,
)
write_objects(catalog, [AMD])
assert catalog.instruments(as_nautilus=True)
assert catalog.bars(as_nautilus=True)
@TheRealBecks thanks for persevering with the docs!
When you get a chance, could you please confirm the discord badge at the top of the README is working for you now?
I added your changes to my code and afterwards I tested my implementation and also yours and...:
[ ] | 0% Completed | 25.4s
Traceback (most recent call last):
File "/home/becks/Entwicklung/nautilus_strategies/nautilus_strategies/nautilus3.py", line 76, in <module>
process_files(
File "/home/becks/.local/share/virtualenvs/nautilus_strategies-khFnq7GI/lib64/python3.8/site-packages/nautilus_trader/persistence/external/core.py", line 132, in process_files
results = compute(tasks)
File "/home/becks/.local/share/virtualenvs/nautilus_strategies-khFnq7GI/lib/python3.8/site-packages/dask/base.py", line 571, in compute
results = schedule(dsk, keys, **kwargs)
File "/home/becks/.local/share/virtualenvs/nautilus_strategies-khFnq7GI/lib/python3.8/site-packages/dask/local.py", line 553, in get_sync
return get_async(
File "/home/becks/.local/share/virtualenvs/nautilus_strategies-khFnq7GI/lib/python3.8/site-packages/dask/local.py", line 496, in get_async
for key, res_info, failed in queue_get(queue).result():
File "/usr/lib64/python3.8/concurrent/futures/_base.py", line 437, in result
return self.__get_result()
File "/usr/lib64/python3.8/concurrent/futures/_base.py", line 389, in __get_result
raise self._exception
File "/home/becks/.local/share/virtualenvs/nautilus_strategies-khFnq7GI/lib/python3.8/site-packages/dask/local.py", line 538, in submit
fut.set_result(fn(*args, **kwargs))
File "/home/becks/.local/share/virtualenvs/nautilus_strategies-khFnq7GI/lib/python3.8/site-packages/dask/local.py", line 234, in batch_execute_tasks
return [execute_task(*a) for a in it]
File "/home/becks/.local/share/virtualenvs/nautilus_strategies-khFnq7GI/lib/python3.8/site-packages/dask/local.py", line 234, in <listcomp>
return [execute_task(*a) for a in it]
File "/home/becks/.local/share/virtualenvs/nautilus_strategies-khFnq7GI/lib/python3.8/site-packages/dask/local.py", line 225, in execute_task
result = pack_exception(e, dumps)
File "/home/becks/.local/share/virtualenvs/nautilus_strategies-khFnq7GI/lib/python3.8/site-packages/dask/local.py", line 220, in execute_task
result = _execute_task(task, data)
File "/home/becks/.local/share/virtualenvs/nautilus_strategies-khFnq7GI/lib/python3.8/site-packages/dask/core.py", line 119, in _execute_task
return func(*(_execute_task(a, cache) for a in args))
File "/home/becks/.local/share/virtualenvs/nautilus_strategies-khFnq7GI/lib/python3.8/site-packages/dask/utils.py", line 39, in apply
return func(*args, **kwargs)
File "/home/becks/.local/share/virtualenvs/nautilus_strategies-khFnq7GI/lib64/python3.8/site-packages/nautilus_trader/persistence/external/core.py", line 106, in process_raw_file
n_rows += write_tables(catalog=catalog, tables=dataframes)
File "/home/becks/.local/share/virtualenvs/nautilus_strategies-khFnq7GI/lib64/python3.8/site-packages/nautilus_trader/persistence/external/core.py", line 239, in write_tables
write_parquet(
File "/home/becks/.local/share/virtualenvs/nautilus_strategies-khFnq7GI/lib64/python3.8/site-packages/nautilus_trader/persistence/external/core.py", line 295, in write_parquet
pq.write_metadata(table.schema, f"{path}/_common_metadata", version="2.6", filesystem=fs)
File "/home/becks/.local/share/virtualenvs/nautilus_strategies-khFnq7GI/lib64/python3.8/site-packages/pyarrow/parquet.py", line 2206, in write_metadata
writer = ParquetWriter(where, schema, **kwargs)
File "/home/becks/.local/share/virtualenvs/nautilus_strategies-khFnq7GI/lib64/python3.8/site-packages/pyarrow/parquet.py", line 655, in __init__
self.writer = _parquet.ParquetWriter(
File "pyarrow/_parquet.pyx", line 1395, in pyarrow._parquet.ParquetWriter.__cinit__
File "pyarrow/_parquet.pyx", line 1220, in pyarrow._parquet._create_writer_properties
ValueError: Unsupported Parquet format version: 2.6
The installed Python module pyarrow
is version 5.0.0, but that version has no format version
2.6, see this documentation. Search for _parquet_writer_arg_docs
.
The current [version 6] (https://arrow.apache.org/docs/_modules/pyarrow/parquet.html) has the necessary format version.
Your [pyproject.toml] (https://github.com/nautechsystems/nautilus_trader/blob/develop/pyproject.toml) says pyarrow = ">=4.0.0,<6.1.0"
. I updated the dependencies to pyarrow >= 6 and here we are:
[########################################] | 100% Completed | 26.1s
I will continue my testing within the next days.
@cjdsellers No, the Discord batch is still not working.
Yep you're right on the parquet issue - @cjdsellers we should bump parquet to ^6.0.1
Yep you're right on the parquet issue - @cjdsellers we should bump parquet to
^6.0.1
Understood, will do on the next diff to develop
.
Also @TheRealBecks , the discord badge on develop
branch should be working now (I updated it again).
Updated dependencies including pyarrow = "^6.0.1"
on latest develop
.
I was able to load my data into the catalog, but I'm stuck with the initial backtest configuration from this documentation:
catalog = DataCatalog(CATALOG_PATH)
data = catalog.bars()
print(data)
My dataframe looks like this:
bar_type open high low close volume ts_event ts_init
0 AMD.NYSE-1-MINUTE-LAST-EXTERNAL 17.01 17.01 16.9 17.0 1300 1545624060000000000 1545624060000000000
1 AMD.NYSE-1-MINUTE-LAST-EXTERNAL 17.09 17.09 17.09 17.09 107 1545624300000000000 1545624300000000000
2 AMD.NYSE-1-MINUTE-LAST-EXTERNAL 17.1 17.1 17.1 17.1 800 1545624420000000000 1545624420000000000
3 AMD.NYSE-1-MINUTE-LAST-EXTERNAL 17.14 17.14 17.14 17.14 868 1545624480000000000 1545624480000000000
4 AMD.NYSE-1-MINUTE-LAST-EXTERNAL 17.16 17.16 17.16 17.16 499 1545624540000000000 1545624540000000000
... ... ... ... ... ... ... ... ...
329720 AMD.NYSE-1-MINUTE-LAST-EXTERNAL 91.43 91.43 91.43 91.43 114 1607716260000000000 1607716260000000000
329721 AMD.NYSE-1-MINUTE-LAST-EXTERNAL 91.42 91.42 91.42 91.42 202 1607716380000000000 1607716380000000000
329722 AMD.NYSE-1-MINUTE-LAST-EXTERNAL 91.3 91.3 91.3 91.3 394 1607716440000000000 1607716440000000000
329723 AMD.NYSE-1-MINUTE-LAST-EXTERNAL 91.4 91.4 91.4 91.4 300 1607716680000000000 1607716680000000000
329724 AMD.NYSE-1-MINUTE-LAST-EXTERNAL 91.3001 91.4 91.3001 91.33 1225 1607716800000000000 1607716800000000000
Afterwards I tried:
catalog.quote_ticks(start=1545624060000000000, end=1607716800000000000)
And got the following error:
Traceback (most recent call last):
File "/home/becks/Entwicklung/nautilus_strategies/nautilus_strategies/nautilus4.py", line 41, in <module>
catalog.quote_ticks(start=1545624060000000000, end=1607716800000000000)
File "/home/becks/.local/share/virtualenvs/nautilus_strategies-khFnq7GI/lib64/python3.8/site-packages/nautilus_trader/persistence/catalog.py", line 306, in quote_ticks
return self.query(
File "/home/becks/.local/share/virtualenvs/nautilus_strategies-khFnq7GI/lib64/python3.8/site-packages/nautilus_trader/persistence/catalog.py", line 209, in query
return self._query(
File "/home/becks/.local/share/virtualenvs/nautilus_strategies-khFnq7GI/lib64/python3.8/site-packages/nautilus_trader/persistence/catalog.py", line 120, in _query
raise FileNotFoundError(f"protocol={self.fs.protocol}, path={full_path}")
FileNotFoundError: protocol=file, path=/home/becks/Entwicklung/nautilus_trader_data/data/quote_tick.parquet
I also tested it with this code, but without success (same error message):
start = dt_to_unix_nanos(1545624060000000000)
end = dt_to_unix_nanos(1607716800000000000)
catalog.quote_ticks(start=1545624060000000000, end=1607716800000000000)
And in this [documentation] (https://docs.nautilustrader.io/2_user_guide/2_backtest_example.html#adding-data) I have to provide the CATALOG_PATH
and instrument.id.value
? That looks suspicious :)
So here you've loaded bars into the catalog, but youre trying to query for quotes, which doesn't exist.
Simply replace catalog.quotes
with catalog.bars
1) I didn't use catalog.quotes
, but catalog.bars
. My problem is the use of catalog.quote_ticks
, because I think that I need to use a different function when working with bars and not ticks? I just copied the code from the documentation, but had no success in finding a different/better function.
2) The data has already been loaded into a dataframe, but in this documentation I have to provide the data (CATALOG_PATH
, instrument.id.value
) once again? And which data_type
do I need to provide?
I didn't use catalog.quotes, but catalog.bars. My problem is the use of catalog.quote_ticks, because I think that I need to use a different function when working with bars and not ticks? I just copied the code from the documentation, but had no success in finding a different/better function.
You're correct using catalog.bars
, but that first section in the docs that loads the dataframe is just to show/check we have some data in the catalog; it doesn't have anything to do with a backtest (we don't use this data).
The data has already been loaded into a dataframe, but in this documentation I have to provide the data (CATALOG_PATH, instrument.id.value) once again? And which data_type do I need to provide?
Using this method of backtesting (there are others also which are more manual), we're building up a configuration for a backtest that lets the BacktestRunner
handle the loading and running of the backtest, so yes just confirming, you do need to pass the CATALOG_PATH
and instrument_id to this config again. The data type will be Bar
in your instance. You can also remove the start_time
/end_time
if you want to just run a backtest on all of your data
from nautilus_trader.model.data.bar import Bar
data_config=[
BacktestDataConfig(
catalog_path=CATALOG_PATH,
data_type=Bar,
instrument_id="AMD.NYSE",
start_time=1580398089820000000,
end_time=1580504394501000000,
)
]
The reason we have this config is you can configure multiple data chunks / instruments / strategies / parameterisations in one easy go, then have them be run in parallel by some processing library.
I'm stuck here:
from decimal import Decimal
from distributed import Client
import os
from nautilus_trader.backtest.config import (
BacktestRunConfig,
BacktestVenueConfig,
BacktestDataConfig,
BacktestEngineConfig,
)
from nautilus_trader.backtest.node import BacktestNode
from nautilus_trader.examples.strategies.ema_cross import EMACrossConfig
from nautilus_trader.persistence.catalog import DataCatalog
from nautilus_trader.trading.config import ImportableStrategyConfig
from pathlib import Path
# Current working directory path
cwd = Path(__file__).absolute().resolve().parents[0]
data_file = cwd.parents[1].joinpath(
"trading-bot/trading-bot/data/stock/alphavantage/1min/amd_nautilus_trader.csv"
)
CATALOG_PATH = cwd.parents[1].joinpath("nautilus_trader_data")
# Clear if it already exists, then create fresh
if not os.path.exists(CATALOG_PATH):
os.mkdir(CATALOG_PATH)
base = BacktestRunConfig(
venues=[
BacktestVenueConfig(
name="SIM",
oms_type="HEDGING",
account_type="MARGIN",
base_currency="USD",
starting_balances=["100000 USD"],
)
]
)
# Path object (CATALOG_PATH) not working, string needed
data_config = [
BacktestDataConfig(
catalog_path=str(CATALOG_PATH),
data_cls_path="nautilus_trader.model.data.bar.Bar",
instrument_id="AMD.NYSE",
)
]
config = base.update(data=data_config, engine=BacktestEngineConfig())
PARAM_SET = [
{"fast_ema": 5, "slow_ema": 20},
{"fast_ema": 10, "slow_ema": 50},
{"fast_ema": 30, "slow_ema": 100},
]
configs = []
for params in PARAM_SET:
strategies = [
ImportableStrategyConfig(
path="examples.strategies.ema_cross_simple:EMACross",
config=EMACrossConfig(
instrument_id="AMD.NYSE",
bar_type="AMD.NYSE-1-MINUTE-LAST-EXTERNAL",
trade_size=Decimal(100000),
**params,
),
),
]
# Create the final config
new = config.replace(strategies=strategies)
configs.append(new)
node = BacktestNode()
task = node.build_graph(run_configs=configs)
# Create a local dask client - not a requirement, but allows parallelising the runs
client = Client(n_workers=2)
client
Console output:
/home/becks/.local/share/virtualenvs/nautilus_strategies-khFnq7GI/lib/python3.8/site-packages/distributed/node.py:160: UserWarning: Port 8787 is already in use.
Perhaps you already have a cluster running?
Hosting the HTTP server on port 41745 instead
warnings.warn(
/home/becks/.local/share/virtualenvs/nautilus_strategies-khFnq7GI/lib/python3.8/site-packages/distributed/node.py:160: UserWarning: Port 8787 is already in use.
Perhaps you already have a cluster running?
Hosting the HTTP server on port 34187 instead
warnings.warn(
Task exception was never retrieved
future: <Task finished name='Task-18' coro=<_wrap_awaitable() done, defined at /usr/lib64/python3.8/asyncio/tasks.py:688> exception=RuntimeError('\n An attempt has been made to start a new process before the\n current process has finished its bootstrapping phase.\n\n This probably means that you are not using fork to start your\n child processes and you have forgotten to use the proper idiom\n in the main module:\n\n if __name__ == \'__main__\':\n freeze_support()\n ...\n\n The "freeze_support()" line can be omitted if the program\n is not going to be frozen to produce an executable.')>
Traceback (most recent call last):
File "/usr/lib64/python3.8/asyncio/tasks.py", line 695, in _wrap_awaitable
return (yield from awaitable.__await__())
File "/home/becks/.local/share/virtualenvs/nautilus_strategies-khFnq7GI/lib/python3.8/site-packages/distributed/core.py", line 274, in _
await self.start()
File "/home/becks/.local/share/virtualenvs/nautilus_strategies-khFnq7GI/lib/python3.8/site-packages/distributed/nanny.py", line 338, in start
response = await self.instantiate()
File "/home/becks/.local/share/virtualenvs/nautilus_strategies-khFnq7GI/lib/python3.8/site-packages/distributed/nanny.py", line 421, in instantiate
result = await self.process.start()
File "/home/becks/.local/share/virtualenvs/nautilus_strategies-khFnq7GI/lib/python3.8/site-packages/distributed/nanny.py", line 691, in start
await self.process.start()
File "/home/becks/.local/share/virtualenvs/nautilus_strategies-khFnq7GI/lib/python3.8/site-packages/distributed/process.py", line 32, in _call_and_set_future
res = func(*args, **kwargs)
File "/home/becks/.local/share/virtualenvs/nautilus_strategies-khFnq7GI/lib/python3.8/site-packages/distributed/process.py", line 186, in _start
process.start()
File "/usr/lib64/python3.8/multiprocessing/process.py", line 121, in start
self._popen = self._Popen(self)
File "/usr/lib64/python3.8/multiprocessing/context.py", line 284, in _Popen
return Popen(process_obj)
File "/usr/lib64/python3.8/multiprocessing/popen_spawn_posix.py", line 32, in __init__
super().__init__(process_obj)
File "/usr/lib64/python3.8/multiprocessing/popen_fork.py", line 19, in __init__
self._launch(process_obj)
File "/usr/lib64/python3.8/multiprocessing/popen_spawn_posix.py", line 42, in _launch
prep_data = spawn.get_preparation_data(process_obj._name)
File "/usr/lib64/python3.8/multiprocessing/spawn.py", line 154, in get_preparation_data
_check_not_importing_main()
File "/usr/lib64/python3.8/multiprocessing/spawn.py", line 134, in _check_not_importing_main
raise RuntimeError('''
RuntimeError:
An attempt has been made to start a new process before the
current process has finished its bootstrapping phase.
This probably means that you are not using fork to start your
child processes and you have forgotten to use the proper idiom
in the main module:
if __name__ == '__main__':
freeze_support()
...
The "freeze_support()" line can be omitted if the program
is not going to be frozen to produce an executable.
Task exception was never retrieved
[... loop]
Okay interesting - are you on windows by chance?
Okay interesting - are you on windows by chance?
Nope, Linux :)
From the link below, it looks like you might need to put multiprocessing code (like creating the dask client) inside a __main__
guard
https://stackoverflow.com/questions/24374288/where-to-put-freeze-support-in-a-python-script
Yeah, it was the missing main()
function, i will use that from now on :+1:
This is my current script:
from decimal import Decimal
from distributed import Client
import os
# import shutil
from nautilus_trader.backtest.config import (
BacktestRunConfig,
BacktestVenueConfig,
BacktestDataConfig,
BacktestEngineConfig,
)
from nautilus_trader.backtest.node import BacktestNode
from nautilus_trader.examples.strategies.ema_cross import EMACrossConfig
from nautilus_trader.persistence.catalog import DataCatalog
from nautilus_trader.trading.config import ImportableStrategyConfig
from pathlib import Path
def main():
# Current working directory path
cwd = Path(__file__).absolute().resolve().parents[0]
CATALOG_PATH = cwd.parents[1].joinpath("nautilus_trader_data")
# Clear if it already exists, then create fresh
if not os.path.exists(CATALOG_PATH):
# shutil.rmtree(CATALOG_PATH)
os.mkdir(CATALOG_PATH)
# If you want to check which data has been loaded
# Not needed for backtesting
# catalog = DataCatalog(CATALOG_PATH)
# data = catalog.bars()
# print(data)
# del data
base = BacktestRunConfig(
venues=[
BacktestVenueConfig(
name="SIM",
oms_type="HEDGING",
account_type="MARGIN",
base_currency="USD",
starting_balances=["100000 USD"],
)
]
)
# Path object (CATALOG_PATH) not working, string needed
data_config = [
BacktestDataConfig(
catalog_path=str(CATALOG_PATH),
data_cls_path="nautilus_trader.model.data.bar.Bar",
instrument_id="AMD.NYSE",
)
]
config = base.update(data=data_config, engine=BacktestEngineConfig())
PARAM_SET = [
{"fast_ema": 5, "slow_ema": 20},
{"fast_ema": 10, "slow_ema": 50},
{"fast_ema": 30, "slow_ema": 100},
]
configs = []
for params in PARAM_SET:
strategies = [
ImportableStrategyConfig(
path="examples.strategies.ema_cross_simple:EMACross",
config=EMACrossConfig(
instrument_id="AMD.NYSE",
bar_type="AMD.NYSE-1-MINUTE-LAST-EXTERNAL",
trade_size=Decimal(100000),
**params,
),
),
]
# Create the final config
new = config.replace(strategies=strategies)
configs.append(new)
node = BacktestNode()
task = node.build_graph(run_configs=configs)
task
# Create a local dask client - not a requirement, but allows parallelising the runs
client = Client(n_workers=2)
client
results = task.compute()
results.plot_balances()
if __name__ == "__main__":
main()
...and here's the error message that the instrument collection is empty:
2022-01-24T11:05:57.768549297Z [INF] BACKTESTER-000.RiskEngine: TradingState is ACTIVE.
2022-01-24T11:05:57.768586539Z [INF] BACKTESTER-000.BacktestEngine: Building engine...
2022-01-24T11:05:57.768692869Z [INF] BACKTESTER-000.Throttler-ORDER_RATE: INITIALIZED.
2022-01-24T11:05:57.768758797Z [INF] BACKTESTER-000.Cache: INITIALIZED.
2022-01-24T11:05:57.768811315Z [INF] BACKTESTER-000.RiskEngine: Set MAX_ORDER_RATE: 100/00:00:01.
2022-01-24T11:05:57.768901808Z [INF] BACKTESTER-000.DataEngine: INITIALIZED.
distributed.worker - WARNING - Compute Failed
Function: execute_task
args: ((<function apply at 0x7f85390321f0>, <function BacktestNode._run_delayed at 0x7f84fc5011f0>, [<nautilus_trader.backtest.node.BacktestNode object at 0x7f84fc4fd7f0>], (<class 'dict'>, [['run_config_id', 'ce2707839095ae5721bb413f49a132ca'], ['engine_config', BacktestEngineConfig(trader_id='BACKTESTER-000', log_level='INFO', cache=None, cache_database=None, data_engine=None, risk_engine=None, exec_engine=None, bypass_logging=False, run_analysis=True)], ['venue_configs', [(<function apply at 0x7f85390321f0>, <class 'nautilus_trader.backtest.config.BacktestVenueConfig'>, (), (<class 'dict'>, [['name', 'SIM'], ['oms_type', 'HEDGING'], ['account_type', 'MARGIN'], ['base_currency', 'USD'], ['starting_balances', ['100000 USD']], ['book_type', 'L1_TBBO'], ['routing', False]]))]], ['data_configs', [(<function apply at 0x7f85390321f0>, <class 'nautilus_trader.backtest.config.BacktestDataConfig'>, (), (<class 'dict'>, [['catalog_path', '/home/becks/Entwicklung/nautilus_trader_data'], ['data_cls_pa
kwargs: {}
Exception: 'ValueError("The \'instruments\' collection was empty")'
I can't find the issue as the dataframe shows my data, so I think that there's a problem with my configs
list? But I can see the finish line! :smiley:
Okay this is likely to do with the catalog path - can you do a .absolute()
on the path you're using?
Missing instruments usually mean the backtest config can't find the proper catalog.
No, that doesn't solve the problem, because it's already an absolute path:
cwd = Path(__file__).absolute().resolve().parents[0]
CATALOG_PATH = cwd.parents[1].joinpath("nautilus_trader_data")
print(CATALOG_PATH)
-->
/home/becks/Entwicklung/nautilus_trader_data
-->
$ ll /home/becks/Entwicklung/nautilus_trader_data
insgesamt 12K
drwxr-xr-x 3 becks users 4,0K 20. Jan 21:08 ./
drwxr-xr-x 17 becks users 4,0K 20. Jan 21:05 ../
drwxr-xr-x 4 becks users 4,0K 20. Jan 21:08 data/
Ahh apologies @TheRealBecks - this is likely due to the bug in the data catalog where Bar
is missing an instrument_id
field. Because we have the option to load non-instrument "generic" data, we do this simplistic check for whether the data type has an instrument id for loading instruments.
This should be fixed when #552 is fixed.
@limx0 This one is still broken. Shall i open a new issue?
Okay @TheRealBecks this issue was much more subtle - in the docs we use a trading venue SIM
whereas in your example, you have loaded data and instruments for NYSE
.
# The docs example creates a Venue "SIM" in BacktestVenueConfig
BacktestVenueConfig(
name="SIM",
oms_type="HEDGING",
account_type="MARGIN",
base_currency="USD",
starting_balances=["100000 USD"],
)
Whereas you want to use name="NYSE"
when you create your BacktestVenueConfig
.
@cjdsellers I don't think this is clear at all in the docs, we should improve them. We should probably also blow up if someone loads some instruments from one venue but then configures another - I can't think of a use case where you would want this (and simply change if someone finds one).
@TheRealBecks if this solves your problem I'll ask you to close this issue - and I'll raise a new one for the venue validation / docs fix.
@cjdsellers I don't think this is clear at all in the docs, we should improve them. We should probably also blow up if someone loads some instruments from one venue but then configures another - I can't think of a use case where you would want this (and simply change if someone finds one).
I agree, I'll tighten this up with a helpful log message.
One step closer, but the strategy can't be loaded, because a module is missing:
distributed.worker - WARNING - Compute Failed
Function: execute_task
args: ((<function apply at 0x7fa11604e1f0>, <function BacktestNode._run_delayed at 0x7fa0d95fd5e0>, [<nautilus_trader.backtest.node.BacktestNode object at 0x7fa0d961e760>], (<class 'dict'>, [['run_config_id', '22f1fdae48f9e20becdf706e0419f0e4'], ['engine_config', BacktestEngineConfig(trader_id='BACKTESTER-000', log_level='INFO', cache=None, cache_database=None, data_engine=None, risk_engine=None, exec_engine=None, bypass_logging=False, run_analysis=True)], ['venue_configs', [(<function apply at 0x7fa11604e1f0>, <class 'nautilus_trader.backtest.config.BacktestVenueConfig'>, (), (<class 'dict'>, [['name', 'NYSE'], ['oms_type', 'HEDGING'], ['account_type', 'MARGIN'], ['base_currency', 'USD'], ['starting_balances', ['100000 USD']], ['book_type', 'L1_TBBO'], ['routing', False]]))]], ['data_configs', [(<function apply at 0x7fa11604e1f0>, <class 'nautilus_trader.backtest.config.BacktestDataConfig'>, (), (<class 'dict'>, [['catalog_path', '/home/becks/Entwicklung/nautilus_trader_data'], ['data_cls_p
kwargs: {}
Exception: 'ModuleNotFoundError("No module named \'examples.strategies\'")'
There is an ema_cross.py
file in the folder. How can I check if the module has been built?
Edit: My current source code:
from decimal import Decimal
from distributed import Client
import os
# import shutil
from nautilus_trader.backtest.config import (
BacktestRunConfig,
BacktestVenueConfig,
BacktestDataConfig,
BacktestEngineConfig,
)
from nautilus_trader.backtest.node import BacktestNode
from nautilus_trader.examples.strategies.ema_cross import EMACrossConfig
from nautilus_trader.persistence.catalog import DataCatalog
from nautilus_trader.trading.config import ImportableStrategyConfig
from pathlib import Path
def main():
# Current working directory path
cwd = Path(__file__).absolute().resolve().parents[0]
CATALOG_PATH = cwd.parents[1].joinpath("nautilus_trader_data")
# Clear if it already exists, then create fresh
if not os.path.exists(CATALOG_PATH):
# shutil.rmtree(CATALOG_PATH)
os.mkdir(CATALOG_PATH)
# If you want to check which data has been loaded
# Not needed for backtesting
# catalog = DataCatalog(CATALOG_PATH)
# data = catalog.bars()
# print(data)
# del data
base = BacktestRunConfig(
venues=[
BacktestVenueConfig(
name="NYSE",
oms_type="HEDGING",
account_type="MARGIN",
base_currency="USD",
starting_balances=["100000 USD"],
)
]
)
# Path object (CATALOG_PATH) not working, string needed
data_config = [
BacktestDataConfig(
catalog_path=str(CATALOG_PATH),
data_cls_path="nautilus_trader.model.data.bar.Bar",
instrument_id="AMD.NYSE",
)
]
config = base.update(data=data_config, engine=BacktestEngineConfig())
PARAM_SET = [
{"fast_ema": 5, "slow_ema": 20},
{"fast_ema": 10, "slow_ema": 50},
{"fast_ema": 30, "slow_ema": 100},
]
configs = []
for params in PARAM_SET:
strategies = [
ImportableStrategyConfig(
path="examples.strategies.ema_cross_simple:EMACross",
config=EMACrossConfig(
instrument_id="AMD.NYSE",
bar_type="AMD.NYSE-1-MINUTE-LAST-EXTERNAL",
trade_size=Decimal(100000),
**params,
),
),
]
# Create the final config
new = config.replace(strategies=strategies)
configs.append(new)
node = BacktestNode()
task = node.build_graph(run_configs=configs)
task
# Create a local dask client - not a requirement, but allows parallelising the runs
client = Client(n_workers=2)
client
results = task.compute()
results.plot_balances()
if __name__ == "__main__":
main()
So you can just remove the nautilus_trader.examples
line and add your own file import
@TheRealBecks I just pushed an initial implementation of bar only execution to develop
.
So its now creating the necessary TradeTicks
under the hood, and passing them into the exchanges order book.
Try it again and see how it goes.
Closing this in favour of more specific issues which can be raised.
I found this thread's title and figured it would be a good place to start. I'm new to this framework so be kind. I'm having a tough time getting this code to run, following the instructions in the complete backtesting example documentation here. I'm running on Windows 10, python 3.8.
The steps to reproduce are:
import datetime
import os
import shutil
from decimal import Decimal
import fsspec
import pandas as pd
from nautilus_trader.core.datetime import dt_to_unix_nanos
from nautilus_trader.model.data.tick import QuoteTick
from nautilus_trader.model.objects import Price, Quantity
from nautilus_trader.backtest.data.providers import TestInstrumentProvider
from nautilus_trader.backtest.node import BacktestNode, BacktestVenueConfig, BacktestDataConfig, BacktestRunConfig, BacktestEngineConfig
from nautilus_trader.config.common import ImportableStrategyConfig
from nautilus_trader.persistence.catalog import ParquetDataCatalog
from nautilus_trader.persistence.external.core import process_files, write_objects
from nautilus_trader.persistence.external.readers import TextReader
DATA_DIR = "./data/"
fs = fsspec.filesystem('file')
raw_files = fs.glob(f"{DATA_DIR}/*")
assert raw_files, f"Unable to find any histdata files in directory {DATA_DIR}"
# raw_files
def parser(line):
ts, bid, ask, idx = line.split(b",")
dt = pd.Timestamp(datetime.datetime.strptime(ts.decode(), "%Y%m%d %H%M%S%f"), tz='UTC')
yield QuoteTick(
instrument_id=EURUSD.id,
bid=Price.from_str(bid.decode()),
ask=Price.from_str(ask.decode()),
bid_size=Quantity.from_int(100_000),
ask_size=Quantity.from_int(100_000),
ts_event=dt_to_unix_nanos(dt),
ts_init=dt_to_unix_nanos(dt),
)
CATALOG_PATH = os.getcwd() + "/catalog"
# Clear if it already exists, then create fresh
if os.path.exists(CATALOG_PATH):
shutil.rmtree(CATALOG_PATH)
os.mkdir(CATALOG_PATH)
EURUSD = TestInstrumentProvider.default_fx_ccy("EUR/USD")
catalog = ParquetDataCatalog(CATALOG_PATH)
process_files(
glob_path=f"{DATA_DIR}/HISTDATA*.zip",
reader=TextReader(line_parser=parser),
catalog=catalog,
)
# Also manually write the AUD/USD instrument to the catalog
write_objects(catalog, [EURUSD])
import pandas as pd
from nautilus_trader.core.datetime import dt_to_unix_nanos
start = dt_to_unix_nanos(pd.Timestamp('2023-02-01', tz='UTC'))
end = dt_to_unix_nanos(pd.Timestamp('2023-02-28', tz='UTC'))
# catalog.quote_ticks(start=start, end=end)
catalog.trade_ticks(start=start, end=end)
instrument = catalog.instruments(as_nautilus=True)[0]
venues_config=[
BacktestVenueConfig(
name="SIM",
oms_type="HEDGING",
account_type="MARGIN",
base_currency="USD",
starting_balances=["1_000_000 USD"],
)
]
data_config=[
BacktestDataConfig(
catalog_path=str(ParquetDataCatalog.from_env().path),
data_cls=QuoteTick,
instrument_id=instrument.id.value,
start_time=start,
end_time=end,
)
]
strategies = [
ImportableStrategyConfig(
strategy_path="nautilus_trader.examples.strategies.ema_cross:EMACross",
config_path="nautilus_trader.examples.strategies.ema_cross:EMACrossConfig",
config=dict(
instrument_id=instrument.id.value,
bar_type="EUR/USD.SIM-15-MINUTE-BID-INTERNAL",
fast_ema_period=10,
slow_ema_period=20,
trade_size=Decimal(1_000_000),
),
),
]
config = BacktestRunConfig(
engine=BacktestEngineConfig(strategies=strategies),
data=data_config,
venues=venues_config,
)
node = BacktestNode(configs=[config])
results = node.run()
The only things I've changed were the mentions of AUDUSD to EURUSD, as well as trying to test catalog.trade_ticks(start=start, end=end)
instead of catalog.quote_ticks(start=start, end=end)
. This code results in the below traceback(s):
Traceback (most recent call last):
File "main.py", line 67, in <module>
catalog.trade_ticks(start=start, end=end)
File "C:\Users\chalu\AppData\Roaming\Python\Python38\site-packages\nautilus_trader\persistence\catalog\base.py", line 215, in trade_ticks
return self.query(
File "C:\Users\chalu\AppData\Roaming\Python\Python38\site-packages\nautilus_trader\persistence\catalog\base.py", line 147, in query
return self._query(
File "C:\Users\chalu\AppData\Roaming\Python\Python38\site-packages\nautilus_trader\persistence\catalog\parquet.py", line 114, in _query
raise FileNotFoundError(f"protocol={self.fs.protocol}, path={full_path}")
FileNotFoundError: protocol=file, path=I:\nasty\Python_Projects\Stock_Options_Trading\Backtesting\catalog\data\trade_tick.parquet
So there is no trade_tick.parquet
file in the catalog > data folder it creates, but there IS a currency_pair.parquet
folder with a couple of small files in it. What to do from here to start running the backtest on the downloaded EURUSD dataset?
I am new to your project and want to write a strategy and start backtesting. I have already seen the comment in the backtesting documentation that the documentation isn't up to date at the moment. I really tried hard to get CSV data imported into the catalogue and writing that to a file, but I failed to do so. I am using the current version from pypi.
My CSV looks like this (data from 'AMD' stock), it has a header in line 1:
I had several tries: 1) Use CSVBarDataLoader: The data has been loaded into a Pandas dataframe, but I didn't found a function to load this df into the DataCatalog and save the files to disk, e.g. as parquet file. The function
import_from_data_loader
that is referenced in the documentation isn't implemented anymore. 2) Use CSVBarDataLoader: This time I wanted to use the functionwrite_parquet
, but then I have to provide aschema
as argument, but I don't know where to get that from. 3) Use DataCatalog and the functionprocess_files
: I thought I have to use theCSVReader
as argument forprocess_function
, but I need to provide ablock_parser
argument for theCSVReader
. I don't know where to get that information from:And one more question: I need to set an OS environment variable, but I found several different entries in the documentation and your current code:
NAUTILUS_BACKTEST_DIR
--> in your documentationNAUTILUS_CATALOG
--> current one in thecatalog.py
fileThanks for your great work, it looks promising! :)