Closed Pirat83 closed 11 months ago
I have thougth about this 5 min longer. The save_portfolio
and get_portfolio
methods that can be overridden are not a good design. The handler based approach in pybroker is much more elegant. So i would suggest to add an set_portfolio_handler
method that givs access to the portfolio after each trading interation - like here with the position size handler: https://www.pybroker.com/en/latest/notebooks/4.%20Ranking%20and%20Position%20Sizing.html#Setting-Position-Sizes
Composition is much more elegant then inherence.
Thank you @Pirat83, I will add this functionality in the next release.
Hi @edtechre - Thank you very much.
I would open an other ticket when I know what is also required for live trading. For now I just see a set-order_handler
that is triggered when an Order
is added to the Portfolio
and a Session
on the Strategy
level like the Session
on ExecutionContext
now. But my prototype is not mature enough for now.
Thank you @Pirat83, I will add this functionality in the next release.谢谢,我会在下一个版本中添加此功能。
Do you mean that the next version will add live trading functionality?
No this part is much more complicated and I am working on that.
The next steps are incremental Bracktesting and reusing an existing Portfolio, which both prerequisite for live trading.
Hi @Pirat83,
To clarify, are you already able to pickle a Portfolio object without any issues? And you just need a handle to access the portfolio instance after completing a backtest?
Hi @edtechre
yes I execute an incremental backrest every night. Then I store the portfolio in my DB and reuse it the next day.
The only thing where users need to take care is to not reuse the Portfolio in overlapping backtest intervals, because then trades are executed twice and the stats are messed up.
Hi @Pirat83,
So i would suggest to add an set_portfolio_handler method
To clarify, you also want a way to pass a Portfolio object to a Strategy?
Hi @edtechre - yes this would be of great benefit.
Hi @Pirat83,
I propose just adding an optional portfolio
argument to the backtest and walkforward methods. There is no need for a handler or attribute since the Portfolio instance will have its state updated. You will then be able to serialize the updated Portfolio.
Hi @edtechre, thank you very much for your effort.
Adding an optional portfolio
parameter to the backtest
and walkforward
methods would be enough for the first step.
The idea behind a portfolio_handler
was to be able to grab the Portfolio
and the metrics after each candle. So I could execute one backtest for the entire year and save the metrics and the Portfolio
after each trading day (or trading iteration). This is usefull when people want to evaluate the Strategy
in different market conditions.
I.e. a Strategy
that works good in trending markets. If we are in ranging market conditions, I could compare the Strategy
metrics with a benchmark and then swich to an other Strategy
, that is better suited for ranging markets.
In the end I can also use https://github.com/edtechre/pybroker/issues/51 and do incremental backtesting every trading iteration and then save the metrics and the Portfolio and the metrics.
So if you want to add a method where people can grab the Portfolio
after each trading iteration and store the Portfolio
and the metrics in a database this would be an easy way to achive this. The approach described in https://github.com/edtechre/pybroker/issues/51 is more error prone, because you have to calculate the exact candle where the portfolio needs to continue trading (and then substract the warmup period from it). It lags of usebility and is error prone but it does also work.
Thank you very much for this feature.
I have added an optional portfolio
argument to the backtest/walkforward methods in the dev branch. The change is targeted for v1.1.31
The idea behind a portfolio_handler was to be able to grab the Portfolio and the metrics after each candle. So I could execute one backtest for the entire year and save the metrics and the Portfolio after each trading day (or trading iteration).
Instead of having a dedicated portfolio_handler, I would suggest two alternatives:
Closing this ticket for now. If neither of the above work for your use case, then please re-open.
Can't wait for it:-) Thank you very much.
Hi @Pirat83 / @edtechre , can you give some hints on how to add live trading capabilities? I dont mean to add native support for live trading, I just need to know a way to do so.
The idea is that once a live bar is closed, I run the strategy, and if it gives a buy/sell signal, I call brokers/exchanges and place orders, right?
What I understand is that I will basically write some execution functions for my strategy logic (e.g. compare some indicators and generate buy/sell signals), but how can I reuse the execution function logic to run live trading?
I am new and eager to learn this lib, any suggestions would be much appreciated!
@Pirat83 or anyone else. Has someone continued to develop any live trading code (even snippets/ideas) for pybroker?
Hi @SW4T400
This is what I have. It is my first step and has multiple issues, which I can describe later.
So just take it as an inspiration and prototype
I am currently working on a more robust version that should work better.
Do not use my code in production!!!
1) We have an Algo
class that inherits from Strategy
It is a parser for my algo trading rules. It has a JSON field with the source code and can be parsed and persisted in Postgres DB as JSON Field.
It has he method backtest_with_defaults
that returns a Pybroker TestResult
def backtest_with_defaults(self, disable_parallel: bool = True) -> TestResult:
# noinspection PyProtectedMember
from pybroker.data import _parse_alpaca_timeframe as parse_alpaca_timeframe, TimeFrameUnit
amount, unit = parse_alpaca_timeframe(self.time_frame)
from pybroker import Day
match unit:
case TimeFrameUnit.Day:
return self.backtest(
timeframe=self.time_frame,
days=[Day.MON, Day.TUES, Day.WEDS, Day.THURS, Day.FRI],
disable_parallel=disable_parallel
)
case TimeFrameUnit.Hour | TimeFrameUnit.Minute:
return self.backtest(
timeframe=self.time_frame,
between_time=('9:30', '16:00'),
days=[Day.MON, Day.TUES, Day.WEDS, Day.THURS, Day.FRI],
disable_parallel=disable_parallel
)
case _:
raise NotImplementedError(unit)
I disabled parallelism and caching because I am using Pythons multi threading module later for execution and Redis queues for queuing backtests overnight over multiple nodes.
Also caching is not required for me, because I have a TimescaleDatasource
that fetches me adjusted prices from Alpaca and so I can fill missing data and resample it with SQL. Also I assume that there are some bugs in Pybroker with invalidating the cached data when using it multiple times on multiple different intervals and multiple different time frames in parallel . But that's okay since Pybroker was never build in to execute the trades and caching is one of the hardest things in software architecture and computer science.
I stopped investigating the cache issues later and Timescale DB has become my first level cache.
2) We have an ExecutableStrategy
class that inherits from Strategy
The ExecutableStrategy
class will retrieve these parameters:
TimeScaleDBDataSource
to fetch candles from the Postgres DB. start_date
and an end_date
those dates can be date_time on minute resolution or on day resolution. It does not matter. Portfolio
that can be stored in i.e the database as binary field (not very reliable but it works - until the source code of the Portfolio class changed). _get_trading_state
reads the binary from database and sets it to the ExecutableStrategy
as field. Overriding Pybroker's walkforward
should not be needed since this issue is fixed now. Great work :+1: When I implemented this I really got headake because overriding it is highly error prone.
I have also overridden the _place_sell_orders
and _place_buy_orders
. They simply call the same method in super()
and also my synchronize_portfolio()
method.
The synchronize_portfolio
method takes Pybrokers Orders from the TestResult
and tries to execute them with Alpaca. Also the sate is saved then in the database again, so when the next candle is executed. Then It can be restored from there.
This is why it is just a prototype, and you should not use my code with your money.
Orders can be partially filled, rejected, cancelled by the broker, they can expire. And also not getting filled. In a Pandas Dataframe we always get our full quantity in reality things get a little bit harder.
3) We have the AlgoTrader
class
So the AlgoTrader
class has 2 main purposes.
apscheduler
trigger. Which is more complicated then you might think. If you want it to schedule a method only when the stock market has open and then pause the CronTrigger
. I.e Apscheduler creates a while
True loop that consumes 100% of your CPU if the CronTrigger
is to complicated (like it is when you want a scheduler that runs, only when the stock exchanges are open. I tried to search for an other scheduler but this seems to be one of the most reliable and mature project in the Python world. It is simply not good work. But it is how it is. :-(AlgoTrader.trade()
method that is called by the scheduler each interval. This method tries to estimate the start_date
and the end_date
of the Algo
and then it triggers a backtest. When Pybroker finishes the backtest it also executes the _place_sell_orders
and _place_buy_orders
and the synchronize_portfolio()
method is triggered, which executes the trades (in theory - and most times in practice). AlgoTrader.trade()
in a separate Python Process (not Thread) to minimize concurrent read / writes to caches or unexpected memory writes / reads in a shared heap. My next iteration will run completely in UTC timezone like Alpaca does. Using only New York timezone might be also an easier option. But since Pybroker removes and adds the timezone multiple times in internal code and I needed to override methods I had to deal with it. Also those methods were never intended to be overridden. So it is okay from a software design perspective. But if you decide to do it - then you have to deal with the consequences :-)
So that's it what I have :-) Feel free to take it as an prototype BUT DO NOT USE IT IN PRODUCTION"
import logging
import os
from datetime import datetime, timedelta, time
from typing import Dict, Union, Optional, Iterable, Tuple
import numpy as np
import pandas as pd
import pytz
from alpaca.data import TimeFrameUnit
from alpaca.trading import TradingClient
from pandas import Series, DataFrame
from pybroker import Strategy, Day, TestResult
from pybroker.cache import CacheDateFields
from pybroker.common import to_datetime, verify_date_range, to_seconds
from pybroker.context import ExecResult
from pybroker.data import DataSource
# noinspection PyProtectedMember
from pybroker.data import _parse_alpaca_timeframe as parse_alpaca_timeframe
from pybroker.portfolio import Portfolio
from pybroker.scope import StaticScope, PriceScope, PendingOrderScope
from valkyrie.algo.algo_parser import Algo
from valkyrie.postgres import get_cursor
logger = logging.getLogger(__name__)
def _get_trading_state(algo_id: str, version_id: str):
with get_cursor() as cursor:
# If we instantiate ExecutableStrategy, the latest Portfolio is loaded from the database.
# Here arises a problem, when we trade overlapping time intervals during multiple instantiations / restarts,
# The same Portfolio is traded multiple times, and therefore we have multiple orders and trades
# this corrupts the Portfolio object.
# So we should make sure that only incremental time intervals are traded or delete the portfolio.
query = f"""
SELECT portfolio, session FROM algo_trading WHERE algo_id = %(algo_id)s AND version_id = %(version_id)s
"""
cursor.execute(query, vars={'algo_id': algo_id, 'version_id': version_id})
binary: Dict = cursor.fetchone()
import pickle
portfolio = pickle.loads(binary['portfolio']) if binary is not None else None
session = pickle.loads(binary['session']) if binary is not None else {}
return portfolio, session
def print_data_frame(data: Union[Series, DataFrame]):
with pd.option_context('expand_frame_repr', False, 'display.max_rows', None, 'display.max_columns', None):
print(data)
class AlgoTrader:
def __init__(self, algo: Dict, time_frame: str, days: Iterable[Day], start_end: Tuple[time, time]):
self.id = algo.get('algo_id')
self.name = algo.get('algo_name')
self.version = algo.get('version_id')
self.code = algo.get('json')
self.time_frame = time_frame
self.days = days
self.start_time, self.end_time = start_end
def print(self, test_result: TestResult):
import multiprocessing
print(f'Current Process name: {multiprocessing.current_process().name}')
import threading
print(f'Current Thread name : {threading.current_thread().name}')
print('--------------------------------------------------------------------------------------------')
print(f'{self.id}: {self.name}')
print('--------------------------------------------------------------------------------------------')
print_data_frame(test_result.portfolio.tail(15))
# print_data_frame(test_result.trades)
# print_data_frame(test_result.positions)
print('--------------------------------------------------------------------------------------------')
def get_between_time(self):
def localize_to_none_timezone(result: time):
result = datetime.now(result.tzinfo).replace(
hour=result.hour, minute=result.minute, second=0, microsecond=0
)
result = result.astimezone(None)
result = result.time()
result = result.strftime('%H:%M')
return result
return localize_to_none_timezone(self.start_time), localize_to_none_timezone(self.end_time)
def get_start_date_end_date(self, watch: bool):
end = datetime.now(pytz.timezone('US/Eastern'))
end = end.replace(second=0, microsecond=0)
end = end - timedelta(minutes=15)
if watch:
# start = datetime.fromtimestamp(1693519200, pytz.timezone('US/Eastern'))
start = datetime.now(pytz.timezone('US/Eastern'))
start = start.replace(hour=9, minute=0, second=0, microsecond=0)
else:
start = end - timedelta(minutes=1)
portfolio, session = _get_trading_state(self.id, self.version)
if portfolio and len(portfolio.bars) > 0:
last_bar_date = max([b.date for b in portfolio.bars])
last_bar_date = datetime.utcfromtimestamp(int(last_bar_date) / 1e9).replace(tzinfo=pytz.UTC)
last_bar_date = last_bar_date.astimezone(pytz.timezone('US/Eastern'))
if start < last_bar_date:
start = last_bar_date
return start, end
def get_cron_trigger(self):
if self.start_time.tzinfo != self.end_time.tzinfo:
raise ValueError()
timezone = set([t.tzinfo for t in [self.start_time, self.end_time]])
if len(timezone) != 1:
raise ValueError()
timezone = timezone.pop()
day_of_week = ','.join([str(d.value) for d in self.days])
amount, unit = parse_alpaca_timeframe(self.time_frame)
from apscheduler.triggers.cron import CronTrigger
from apscheduler.triggers.combining import AndTrigger
match unit:
case TimeFrameUnit.Day:
return CronTrigger(day_of_week=day_of_week, hour=16, timezone=pytz.timezone('US/Eastern'))
case TimeFrameUnit.Hour:
hour = f'{self.start_time.hour}-{self.end_time.hour}'
return CronTrigger(day_of_week=day_of_week, hour=hour, minute='*', timezone=timezone)
case TimeFrameUnit.Minute:
hour = f'{self.start_time.hour + 1}-{self.end_time.hour}'
match amount:
case 1:
start_minute = f'{self.start_time.minute + 15}-59'
end_minute = f'0-{self.end_time.minute + 15}/{amount}'
case 5:
start_minute = f'{self.start_time.minute + 15}-59/{amount}'
end_minute = f'0-{self.end_time.minute + 15}/{amount}'
case 15:
start_minute = f'{self.start_time.minute + 15}'
end_minute = f'0,{self.end_time.minute + 15}'
case _:
raise NotImplementedError()
result = AndTrigger([
CronTrigger(
day_of_week=day_of_week,
hour=self.start_time.hour,
minute=start_minute,
timezone=timezone
),
CronTrigger(day_of_week=day_of_week, hour=hour, minute=f'*/{amount}', timezone=timezone),
CronTrigger(
day_of_week=day_of_week,
hour=self.end_time.hour,
minute=end_minute,
timezone=timezone
)
])
return result
case _:
raise NotImplementedError
def trade(self):
start_date, end_date = self.get_start_date_end_date(False)
algo = Algo(self.code, self.version, self.time_frame, start_date)
from valkyrie.algo.timescale import TimeScaleDBDataSource
strategy = ExecutableStrategy(
TimeScaleDBDataSource(), start_date - timedelta(days=algo.warmup * 2), end_date, algo
)
amount, unit = parse_alpaca_timeframe(algo.time_frame)
between_time = self.get_between_time()
match unit:
case TimeFrameUnit.Day:
test_result = strategy.backtest(timeframe=algo.time_frame, days=self.days)
case TimeFrameUnit.Hour | TimeFrameUnit.Minute:
test_result = strategy.backtest(
timeframe=algo.time_frame, between_time=between_time, days=self.days
)
case _:
raise NotImplementedError()
self.print(test_result)
class ExecutableStrategy(Strategy):
def __init__(
self, data_source: Union[DataSource, pd.DataFrame],
start_date: Union[str, datetime], end_date: Union[str, datetime],
algo: Algo
):
from pybroker import StrategyConfig
super().__init__(data_source, start_date, end_date, StrategyConfig(exit_on_last_bar=True))
self._algo = algo
self.portfolio, self.session = _get_trading_state(self._algo.algo_id, self._algo.version_id)
if self.portfolio is None:
self.portfolio = Portfolio(
self._config.initial_cash,
self._config.fee_mode,
self._config.fee_amount,
self._fractional_shares_enabled(),
self._config.max_long_positions,
self._config.max_short_positions,
)
if self.session is None:
self.session = {}
self.client = TradingClient(
os.getenv('ALPACA_KEY_ID'),
os.getenv('ALPACA_SECRET'),
paper=os.getenv('ALPACA_PAPER') == 'True'
)
def walkforward(
self,
windows: int,
lookahead: int = 1,
start_date: Optional[Union[str, datetime]] = None,
end_date: Optional[Union[str, datetime]] = None,
timeframe: str = "",
between_time: Optional[tuple[str, str]] = None,
days: Optional[Union[str, Day, Iterable[Union[str, Day]]]] = None,
train_size: float = 0.5,
shuffle: bool = False,
calc_bootstrap: bool = False,
disable_parallel: bool = False,
warmup: Optional[int] = None,
) -> Optional[TestResult]:
if warmup is not None and warmup < 1:
raise ValueError("warmup must be > 0.")
scope = StaticScope.instance()
try:
scope.freeze_data_cols()
if not self._executions:
raise ValueError("No executions were added.")
start_dt = (
self._start_date
if start_date is None
else to_datetime(start_date)
)
if start_dt < self._start_date or start_dt > self._end_date:
raise ValueError(
f"start_date must be between {self._start_date} and "
f"{self._end_date}."
)
end_dt = (
self._end_date if end_date is None else to_datetime(end_date)
)
if end_dt < self._start_date or end_dt > self._end_date:
raise ValueError(
f"end_date must be between {self._start_date} and "
f"{self._end_date}."
)
if start_dt is not None and end_dt is not None:
verify_date_range(start_dt, end_dt)
self._logger.walkforward_start(start_dt, end_dt)
df = self._fetch_data(timeframe)
day_ids = self._to_day_ids(days)
df = self._filter_dates(
df=df,
start_date=start_dt,
end_date=end_dt,
between_time=between_time,
days=day_ids,
)
tf_seconds = to_seconds(timeframe)
indicator_data = self._fetch_indicators(
df=df,
cache_date_fields=CacheDateFields(
start_date=start_dt,
end_date=end_dt,
tf_seconds=tf_seconds,
between_time=between_time,
days=day_ids,
),
disable_parallel=disable_parallel,
)
train_only = (
self._before_exec_fn is None
and self._after_exec_fn is None
and all(map(lambda e: e.fn is None, self._executions))
)
# here is the change :-)
portfolio = self.portfolio
# here was the change :-)
self._run_walkforward(
portfolio=portfolio,
df=df,
indicator_data=indicator_data,
tf_seconds=tf_seconds,
between_time=between_time,
days=day_ids,
windows=windows,
lookahead=lookahead,
train_size=train_size,
shuffle=shuffle,
train_only=train_only,
warmup=warmup,
)
if train_only:
self._logger.walkforward_completed()
return None
return self._to_test_result(
start_dt, end_dt, portfolio, calc_bootstrap
)
finally:
scope.unfreeze_data_cols()
def _place_buy_orders(
self, date: np.datetime64, price_scope: PriceScope, pending_order_scope: PendingOrderScope,
buy_sched: dict[np.datetime64, list[ExecResult]], portfolio: Portfolio, enable_fractional_shares: bool
):
super()._place_buy_orders(
date, price_scope, pending_order_scope, buy_sched, portfolio, enable_fractional_shares
)
self.synchronize_portfolio(portfolio, date)
def _place_sell_orders(
self, date: np.datetime64, price_scope: PriceScope, pending_order_scope: PendingOrderScope,
sell_sched: dict[np.datetime64, list[ExecResult]], portfolio: Portfolio, enable_fractional_shares: bool
):
super()._place_sell_orders(
date, price_scope, pending_order_scope, sell_sched, portfolio, enable_fractional_shares
)
self.synchronize_portfolio(portfolio, date)
def synchronize_portfolio(self, portfolio: Portfolio, date: np.datetime64):
from alpaca.common import APIError
from alpaca.broker import MarketOrderRequest
for o in portfolio.orders:
if o.id not in self.session.keys():
request = MarketOrderRequest(symbol=o.symbol, qty=o.shares, side=o.type, time_in_force='day')
try:
response = self.client.submit_order(request)
except APIError as e:
logger.error(
f'Unable to place order {request.symbol} {request.type} {request.side} {request.qty}: {e.response.text}'
)
self.session.update({o.id: e.response})
else:
self.session.update({o.id: response})
finally:
with get_cursor() as cursor:
from valkyrie.algo.postgres import upsert_algo_trading
timestamp = datetime.utcfromtimestamp(int(date) / 1e9).replace(tzinfo=pytz.UTC)
upsert_algo_trading(
cursor, self._algo.algo_id, self._algo.version_id, timestamp, portfolio,
self.session
)
So before you want to implement live trading in Pybroker by your self you should take a look at:
First of all. I really like Pybroker. It has good software design and is quite well unit tested. The whole codebase looks very good maintained and taking into account the amount of code nearly 99,9% works how they should. It is also very easy to understand when you are debugging it (beside the timezone stuff - that is necessary in most cases)
Stocks and maybe Crypto, where I did not take a deeper look at the source code or the code is proprietary
So you should have now a lot of options. Have fun
@Pirat83 , thanks for sharing your progress - I appreciate it alot! I haven´t read it all yet and am not yet in a place where I would even have "PRODUCTION" code. So I will respond and thank you later in more detail.
Thanks for now.
Hi @SW4T400,
yeah you are welcome. For all those how also haven't read all that yet. Here the short version:
Placing an order in the real markets is something completely different then placing it in a backtesting software. Even placing an order on a paper account is different then on a real market. Pybroker is a piece of software that is very good designed, it is easy to extend and easy to use, so very good maintained. This is why I really like it.
You should properly pay money for a solution that works and has an active developer team with a sustainable business model then implement live trading by your own. This is my opinion and properly this what 99.999% of participant in algo trading should do, when they are searching for a reliable trade execution framework - after they have found a reliable strategy and are able to trade it by hand.
Hi @edtechre,
I am currently adding live trading capabilties to pybroker. So far it works quite well. One thing that would make my life easier, would be the capability to externalize the
Portfolio
. Currenlty it is created in thewalkforward
method and there is no clean way to provide it from a SQL database or an object storage / file system, etc...It would be a benefit if we could pickle the
Portfolio
and on the next trading iteration run a backtest / trade method with the previosly unpickledPortfolio
. In this way we would also gain incremental backtest capability, which is also one of my goals. So if someone runs a complicated backtest or runs many of them, this persion could simply save the Portfolio and then reuse it on the next trading interation without executing all the past trades again and again.For now I have extended the
Strategy
class and override the methods where necessary. This works but is not a very clean way to do it. The benefit of this approach is, that it will be easier to integrate code changes from upcomming pybroker releases.So currently the
Portfolio
is created here: https://github.com/edtechre/pybroker/blob/2e770a55e45e044ed87aa4f7d5be659ce88c1582/src/pybroker/strategy.py#L1203. My suggestion would be to add an optional parameter toStrategy
orStrategyConfig
where we could simply pass an existing unpickledPortfolo
to theStrategy
. ThePortfolio
then could be a field of theStrategy
class. From what I have seen so far this should work - correct me please if I am wrong or missed something... Also a method calledget_portfolio
that can be overridden could be an less cleaner option. But i personally prefere the fist sollution.Also this place https://github.com/edtechre/pybroker/blob/2e770a55e45e044ed87aa4f7d5be659ce88c1582/src/pybroker/strategy.py#L1228 would need an additional method like
save_portfolio
wich can do nothing until it is overridden in a subclass.If you are interested I can share my progress and we can togeather add life trading capabilities to pybroker.