Closed rsmb7z closed 1 year ago
Hi @rsmb7z
I just need a little more context on this one, under what scenario did duplicate bars occur?
When you say sent to Nautilus core, did you mean sent to the DataEngine
? if so, I'm guessing these are externally aggregated bars?
Hi @cjdsellers
I just need a little more context on this one, under what scenario did duplicate bars occur?
Live DataClient: So this requirement makes more sense for Swing Trading or Higher timeframes than HFT, while the system is as good as for all of these trading styles. Here is a scenario:
ts_event
) identify in IB for the Bar is bar's starting time.Backtesting: Although data is assumed to be unique when loading to Backtest DataEngine but if for some reason you send duplicates, data will continue to process the bars and send to strategy/indicators (applies to ticks and bars both).
When you say sent to Nautilus core, did you mean sent to the
DataEngine
? if so, I'm guessing these are externally aggregated bars?
Yes, by sent to core I meant DataEngine and is about externally aggregated. For tick data I don't see the problem as of now but for Bars, before the on_bar in Strategy is called where possibly the user can control these checks but that is pretty much useless because before this Strategy.handle_bar will send the Bar to Indicators, apart from writing this common logic in each strategy, it will make strategy trade logic complicated.
So following could be possible solution:
I hope this explains. Let me know if you need more info.
Regarding my comment on #955
I've come up with this initial implementation, now on develop
branch 860993c694372edf6512037b27b9c1b91d36207e.
There's now an additional option on the DataEngineConfig
:
handle_revised_bars: bool = True
which interacts with this logic in the engine, similar to your PR:
cdef void _handle_bar(self, Bar bar) except *:
cdef BarType bar_type = bar.bar_type
cdef Bar last_bar = None
if not self._handle_revised_bars:
last_bar = self._cache.bar(bar_type)
if last_bar is not None and (bar.ts_event < last_bar.ts_event or bar.ts_init <= last_bar.ts_init):
return
self._cache.add_bar(bar)
self._msgbus.publish_c(topic=f"data.bars.{bar_type}", msg=bar)
I think we don't need to be creating any special topics, or creating new hash maps when we can leverage the efficient cache which is already there.
I didn't add the is_revision
flag to the bar just yet. We could potentially just check if the bar at the 'first' index is later than the revised bar, and then append
if that's the case.
This also raises the question of whether we should publish revised out of sequence bars by default, maybe this handled_revised_bars
option should be False
by default?
Hi @cjdsellers
I think we don't need to be creating any special topics, or creating new hash maps when we can leverage the efficient cache which is already there.
This is awsome. I just tested and gives same results (assuming other part of of code from PR yet). As for the last bar in Cache because its appendleft so we can trust its index 0 :confused:
Backtesting: Although data is assumed to be unique when loading to Backtest DataEngine but if for some reason you send duplicates, data will continue to process the bars and send to strategy/indicators (applies to ticks and bars both).
Only thing is this section of code is to deal with duplicates, and make sure Bars published in timestamp sequence. So handle_revised_bars
can be renamed to more suitable.
Thanks for the feedback, and I'm glad its working for you.
So I think the focus now is on handling the correct sequence for all data being processed by the engine. We could make a simple check that if the data being processed has a timestamp prior to the last timestamp, and earlier than the oldest timestamp, then we could add the data to the cached list, otherwise discard.
It probably needs to be opt in though, to avoid the performance overhead for those who know their data is already correctly sequenced. I'll have a think.
So I think the focus now is on handling the correct sequence for all data being processed by the engine. We could make a simple check that if the data being processed has a timestamp prior to the last timestamp, and earlier than the oldest timestamp, then we could add the data to the cached list, otherwise discard.
It probably needs to be opt in though, to avoid the performance overhead for those who know their data is already correctly sequenced. I'll have a think.
Commit 860993c
perfectly does this the job and allows user to configure this check as well. Only thing needs to be done is to:
handle_revised_bars
to suitable wording validate_bar_sequence
or something similarApart from this remaining part is as follows.
is_revised
flag:Carrying is_revised
flag. This will be tagged by DataClient and required at:
As a second thought, this flag doesn't necessary needs to be attached to Bar and can be passed as argument to above flow. I will leave this to you, whatever is best.
Configurable by user handle_revised_bars
is_revised
So I went with a config param name of validate_data_sequence
so this could apply to all types of data eventually, and I don't think we need config granularity per data type?
The logic is flipped so False
by default to retain current behavior, and so that backtests don't suffer the performance overhead when the data stream is already sorted monotonically by the backtest engine.
As for the Bar.is_revised
attribute, we're still considering the merits and setup for that.
I've added the optional Bar.is_revision
attribute (False
by default) and have come up with the following logic:
cdef:
Bar cached_bar
Bar last_bar
list bars
int i
if self._validate_data_sequence:
last_bar = self._cache.bar(bar_type)
if bar.is_revision:
bars = self._cache._bars.get(bar_type) # noqa
if bars: # If not bars then just fall through and add to cache
for i, cached_bar in enumerate(bars):
if bar.ts_event == cached_bar.ts_event:
# Replace bar at index, previously cached bar will fall out of scope
bars[i] = bar
break
else:
self._cache.add_bar(bar)
elif last_bar is not None and (bar.ts_event < last_bar.ts_event or bar.ts_init <= last_bar.ts_init):
return
if not bar.is_revision:
self._cache.add_bar(bar)
It's not the prettiest code but I think it meets the requirements, and keeps all of this out of the critical path which most users will be configured to use.
You could really help me out with the workload if you could add some unit tests in test_data_engine.py
just after test_process_bar_when_revised_with_older_timestamp_does_not_cache_or_publish
to cover all the logic :pray:
Hi @cjdsellers Thanks and I will be happy to add these unit tests.
Small fix in DataEngine logic though, as follows please. The updates required in last_bar only, leaving upto you if we can achieve without enumarte over all cached bars.
if self._validate_data_sequence:
last_bar = self._cache.bar(bar_type)
if last_bar is not None:
if bar.ts_event < last_bar.ts_event or bar.ts_init <= last_bar.ts_init:
return
elif bar.is_revision:
if bar.ts_event == last_bar.ts_event:
# Replace the last_bar, previously cached bar will fall out of scope
# CAN WE ACHIEVE THIS WITHOUT ENUMERATE OVER ALL CACHED BARS?
else:
return # REVISIONS WILL ALWAYS BE OF last_bar
if not bar.is_revision:
self._cache.add_bar(bar)
Out of interest, why don't we just hold the partially built bar back until its closed - then send to the DataEngine?
Actually we are doing the same way, raw Bar is not converted to NT or processed further when handle_revised_bars=False, which is default/current behavior. However if this is set to true, will do the subject requirement.
I have included below logic from DataClient side, just for overview and to show the full flow. Will submit in seperate PR later.
"""
handle_revised_bars : bool, default False
If partial updates for bars should be handled by the client.
If False, will send the completed Bar only i.e. end of minute for 1M and end of hour for 1H.
Bar.is_revision=False
If True, will start sending the Bar, whenever it changes during timeframe, from open to close.
Bar.is_revision=False (opening Bar because that's new)
Bar.is_revision=True (from 2nd change onward until closed)
"""
self._handle_revised_bars = False # TODO: Add this flag in DataClientConfig,
def _on_historical_bar_update(
self,
bar_data_list: BarDataList,
has_new_bar: bool,
) -> None:
if bar_data_list.initialized:
if self._handle_revised_bars:
bars = [bar_data_list[-1]]
is_revised = not has_new_bar
elif not self._handle_revised_bars and has_new_bar:
bars = [bar_data_list[-2]]
is_revised = False
else:
return
else:
bars = bar_data_list
is_revised = False
bar_data_list.initialized = True
for bar in bars:
data = parse_bar_data(
bar=bar,
is_revised=is_revised,
bar_type=bar_data_list.bar_type,
instrument=bar_data_list.instrument,
ts_init=self._clock.timestamp_ns(),
)
self._handle_data(data)
As you notice we are accessing the Bar using position last, 2nd last because that will be always in sequence and we don't have to iterrate over and over again, this is the case with IB. For any other broker in different way, can easily handle accordingly in its adapter.
Understood, in that case I've come up with the following simpler logic (still some nesting but so be it for now):
if self._validate_data_sequence:
last_bar = self._cache.bar(bar_type)
if last_bar is not None:
if bar.ts_event < last_bar.ts_event or bar.ts_init <= last_bar.ts_init:
self._log.warning(
f"Bar {bar} was prior to last bar `ts_event` {last_bar.ts_event}.",
)
return # `bar` is out of sequence
if bar.is_revision:
if bar.ts_event == last_bar.ts_event:
# Replace `last_bar`, previously cached bar will fall out of scope
self._cache._bars.get(bar_type)[0] = bar # noqa
else:
self._log.warning(
f"Bar revision {bar} was not at last bar `ts_event` {last_bar.ts_event}.",
)
return # Revision SHOULD be at `last_bar.ts_event`
if not bar.is_revision:
self._cache.add_bar(bar)
Thanks, the tests will be much appreciated, and less to test now!
Thanks this is perfect now. I will add the tests and then we can close this.
Next stage is to handle the revised Bar in Actor/Strategy. Two potential options:
is_revision
check in Indicators. I think this will cost overhead (as each indicator will do the check), when not using revision i.e backtestingis_revision
check in handle_bar
of Actor/Strategy and then pass to on_bar
(default without revision) and on_bar_revision
new method for Strategy and handle_bar_revision
for Indicator. This shall least affect any performance.Let me know which one will be suitable and I will prepare the draft PR.
I'm not sure on a first pass we should wire this revision concept that far into the system?
The Bar
object already has the .is_revision
attribute now, so could just be passed to on_bar
and handled appropriately.
I don't think indicators should have been updated with partial bars too?
Does IB emit partial bars periodically, and you are passing these back to the engine for market updates, rather than say subscribing to quote or trade ticks?
I'm not sure on a first pass we should wire this revision concept that far into the system?
Bars with is_revision
shall not be appended at any stage, hence the reason of making Bar revision aware. We have already fixed this in Cache case. Apart from Strategy base class and Indicator base class (below potential change) I don't see any other change in the system.
The
Bar
object already has the.is_revision
attribute now, so could just be passed toon_bar
and handled appropriately.
In the Strategy base class before on_bar
is called, it will make call to _handle_indicators_for_bar
. We don't want to send bar to indicator which will treat it as new Bar and append. This shall allow both backward and forward compatibility. Previously both Bar and Indicator were unware if bar_is_new or bar_is_revision, now Bar is aware but Indicators are not.
cdef void _handle_indicators_for_bar(self, list indicators, Bar bar) except *:
cdef Indicator indicator
for indicator in indicators:
if not bar.is_revision: #<< THIS IS REQUIRED MINIMUM TO AVOID FALSE RESULTS
indicator.handle_bar(bar)
else:
indicator.handle_bar_revision(bar) # IN ADDITION TO NOT IMPLEMENTED METHOD IN INDICATOR BASE CLASS
There could be alternate method, but above is if we want to keep minimal and is sufficient to explain the flow.
I don't think indicators should have been updated with partial bars too?
Well, in the IB TWS charts as the Bar progresses (partial bar updates), indicator values are updated as well, so ideally it shall. However this is not show-stopper for me at this stage, but with above implementation at minimum custom indicators can have functionality in both cases. Leaving the decision for built-in indicators upto you.
Does IB emit partial bars periodically, and you are passing these back to the engine for market updates, rather than say subscribing to quote or trade ticks?
Yes it does and DataEngine will receive partial bars periodically when handle_revised_bars=True
in DataClient config. Yes in this cause subscribing to External Bars and quotes/ticks subscription is irrelevant here. Following from logs for example, as you can see the as soon time window starts, new bar open and revision comes, revision is emitted by IB only when last bar is changed or ts_event is different hence there is no fix frequency.
2023-01-17T06:00:01.394220300Z [INF] TESTER-001.EMACross-000: Bar(EUR/USD.IDEALPRO-1-HOUR-MID-EXTERNAL,1.08310,1.08310,1.08310,1.08310,0.0,1673935200000000000)
2023-01-17T06:00:01.394501400Z [INF] TESTER-001.EMACross-000: Bar(EUR/USD.IDEALPRO-1-MINUTE-MID-EXTERNAL,1.08310,1.08310,1.08310,1.08310,0.0,1673935200000000000)
2023-01-17T06:00:06.358310700Z [INF] TESTER-001.EMACross-000: is_revision Bar(EUR/USD.IDEALPRO-1-HOUR-MID-EXTERNAL,1.08310,1.08315,1.08310,1.08315,0.0,1673935200000000000)
2023-01-17T06:00:06.358592100Z [INF] TESTER-001.EMACross-000: is_revision Bar(EUR/USD.IDEALPRO-1-MINUTE-MID-EXTERNAL,1.08310,1.08315,1.08310,1.08315,0.0,1673935200000000000)
2023-01-17T06:00:11.487777600Z [INF] TESTER-001.EMACross-000: is_revision Bar(EUR/USD.IDEALPRO-1-HOUR-MID-EXTERNAL,1.08310,1.08315,1.08310,1.08310,0.0,1673935200000000000)
2023-01-17T06:00:11.488059100Z [INF] TESTER-001.EMACross-000: is_revision Bar(EUR/USD.IDEALPRO-1-MINUTE-MID-EXTERNAL,1.08310,1.08315,1.08310,1.08310,0.0,1673935200000000000)
2023-01-17T06:00:16.317133000Z [INF] TESTER-001.EMACross-000: is_revision Bar(EUR/USD.IDEALPRO-1-HOUR-MID-EXTERNAL,1.08310,1.08315,1.08300,1.08305,0.0,1673935200000000000)
2023-01-17T06:00:16.317621800Z [INF] TESTER-001.EMACross-000: is_revision Bar(EUR/USD.IDEALPRO-1-MINUTE-MID-EXTERNAL,1.08310,1.08315,1.08300,1.08305,0.0,1673935200000000000)
2023-01-17T06:00:21.356098200Z [INF] TESTER-001.EMACross-000: is_revision Bar(EUR/USD.IDEALPRO-1-HOUR-MID-EXTERNAL,1.08310,1.08315,1.08300,1.08300,0.0,1673935200000000000)
2023-01-17T06:00:21.356375700Z [INF] TESTER-001.EMACross-000: is_revision Bar(EUR/USD.IDEALPRO-1-MINUTE-MID-EXTERNAL,1.08310,1.08315,1.08300,1.08300,0.0,1673935200000000000)
2023-01-17T06:00:37.034052100Z [INF] TESTER-001.EMACross-000: is_revision Bar(EUR/USD.IDEALPRO-1-HOUR-MID-EXTERNAL,1.08310,1.08315,1.08300,1.08305,0.0,1673935200000000000)
2023-01-17T06:00:37.036067600Z [INF] TESTER-001.EMACross-000: is_revision Bar(EUR/USD.IDEALPRO-1-MINUTE-MID-EXTERNAL,1.08310,1.08315,1.08300,1.08305,0.0,1673935200000000000)
2023-01-17T06:00:41.562956100Z [INF] TESTER-001.EMACross-000: is_revision Bar(EUR/USD.IDEALPRO-1-HOUR-MID-EXTERNAL,1.08310,1.08315,1.08295,1.08295,0.0,1673935200000000000)
2023-01-17T06:00:41.565754900Z [INF] TESTER-001.EMACross-000: is_revision Bar(EUR/USD.IDEALPRO-1-MINUTE-MID-EXTERNAL,1.08310,1.08315,1.08295,1.08295,0.0,1673935200000000000)
2023-01-17T06:00:52.052333700Z [INF] TESTER-001.EMACross-000: is_revision Bar(EUR/USD.IDEALPRO-1-HOUR-MID-EXTERNAL,1.08310,1.08315,1.08290,1.08290,0.0,1673935200000000000)
2023-01-17T06:00:52.052611700Z [INF] TESTER-001.EMACross-000: is_revision Bar(EUR/USD.IDEALPRO-1-MINUTE-MID-EXTERNAL,1.08310,1.08315,1.08290,1.08290,0.0,1673935200000000000)
2023-01-17T06:00:56.727402100Z [INF] TESTER-001.EMACross-000: is_revision Bar(EUR/USD.IDEALPRO-1-HOUR-MID-EXTERNAL,1.08310,1.08315,1.08285,1.08285,0.0,1673935200000000000)
2023-01-17T06:00:56.727686700Z [INF] TESTER-001.EMACross-000: is_revision Bar(EUR/USD.IDEALPRO-1-MINUTE-MID-EXTERNAL,1.08310,1.08315,1.08285,1.08285,0.0,1673935200000000000)
2023-01-17T06:01:01.406661500Z [INF] TESTER-001.EMACross-000: is_revision Bar(EUR/USD.IDEALPRO-1-HOUR-MID-EXTERNAL,1.08310,1.08315,1.08280,1.08280,0.0,1673935200000000000)
2023-01-17T06:01:01.408493000Z [INF] TESTER-001.EMACross-000: is_revision Bar(EUR/USD.IDEALPRO-1-MINUTE-MID-EXTERNAL,1.08310,1.08315,1.08280,1.08280,0.0,1673935200000000000)
2023-01-17T06:01:02.877368700Z [INF] TESTER-001.EMACross-000: Bar(EUR/USD.IDEALPRO-1-MINUTE-MID-EXTERNAL,1.08280,1.08285,1.08280,1.08285,0.0,1673935260000000000)
2023-01-17T06:01:06.309649800Z [INF] TESTER-001.EMACross-000: is_revision Bar(EUR/USD.IDEALPRO-1-HOUR-MID-EXTERNAL,1.08310,1.08315,1.08280,1.08290,0.0,1673935200000000000)
2023-01-17T06:01:06.309931900Z [INF] TESTER-001.EMACross-000: is_revision Bar(EUR/USD.IDEALPRO-1-MINUTE-MID-EXTERNAL,1.08280,1.08290,1.08280,1.08290,0.0,1673935260000000000)
2023-01-17T06:01:22.052865300Z [INF] TESTER-001.EMACross-000: is_revision Bar(EUR/USD.IDEALPRO-1-MINUTE-MID-EXTERNAL,1.08280,1.08295,1.08280,1.08290,0.0,1673935260000000000)
2023-01-17T06:02:01.663696500Z [INF] TESTER-001.EMACross-000: Bar(EUR/USD.IDEALPRO-1-MINUTE-MID-EXTERNAL,1.08290,1.08295,1.08290,1.08295,0.0,1673935320000000000)
2023-01-17T06:02:06.298247300Z [INF] TESTER-001.EMACross-000: is_revision Bar(EUR/USD.IDEALPRO-1-HOUR-MID-EXTERNAL,1.08310,1.08315,1.08280,1.08295,0.0,1673935200000000000)
OK understood.
I think if you're handling bars in this way, then everything is already in place - the cache is handled correctly as above. The bar will arrive at the strategy on_bar
and you will have to decide how you want to handle bar revisions.
Currently for the Binance adapters, partial bars are not sent back into the system other than to hydrate bar aggregators with the currently running bar state (because its always possible to subscribe to quote and trade ticks directly for these adapters). I think I'd like to maintain this general system setup for now, and not introduce the complexity of making partial bar handling further 'built-in'.
Please continue to propose any tweaks or improvements upstream of handle_revised_bar
for strategies and indicators and we can continue to discuss.
Feature Request
I noticed that if duplicates Bars (with same ts_event) are sent to Nautilus core it will simply process that as new Bar. If we can have some sort of flag to update or ignore, so in the adapter can be set accordingly to situation. But even without flag default behavior should be either update or ignore instead of adding as new bar.