Closed fxgram-xai closed 3 years ago
Hello Dasch!
I've been having an issue with this as well. I've solved it for my use case by modifying the above in two ways:
1.) Upon receiving the first heartbeat signal, I set "last_id" to "lastTransactionID". This ensures that if the very first transaction is missed due to a disconnect, it will be captured when old transactions are replayed after reconnection.
2.) I've removed the "if msg_type == "transaction.Transaction:" line. This ensures that the system will have an opportunity with each heartbeat to replay any transactions it may have missed due to a disconnection.
Here's the full code for the above:
oandav20store.py ` def _t_streaming_events(self, q): '''Callback method for streaming events''' last_id = None reconnections = 0 first_heartbeat_has_not_happened_yet = True while True: try: response = self.oapi_stream.transaction.stream( self.p.account )
for msg_type, msg in response.parts():
if first_heartbeat_has_not_happened_yet:
if msg_type == 'transaction.TransactionHeartbeat':
last_id = msg.lastTransactionID
first_heartbeat_has_not_happened_yet = False
# if a connection occurred
if reconnections > 0:
if reconnections > 0 and last_id:
# get all transactions between the last seen and first from
# reconnected stream
old_transactions = self.get_transactions_since(
last_id)
for t in old_transactions:
#if msg_type == "transaction.Transaction":
if t.id > last_id:
self._transaction(t.dict())
last_id = t.id
reconnections = 0
if msg_type == "transaction.Transaction":
if not last_id or msg.id > last_id:
self._transaction(msg.dict())
last_id = msg.id
except (v20.V20ConnectionError, v20.V20Timeout) as e:
self.put_notification(str(e))
if (self.p.reconnections == 0 or self.p.reconnections > 0
and reconnections > self.p.reconnections):
# unable to reconnect after x times
self.put_notification(
"Giving up reconnecting streaming events")
return
reconnections += 1
if self.p.reconntimeout is not None:
_time.sleep(self.p.reconntimeout)
self.put_notification("Trying to reconnect streaming events ({} of {})".format(
reconnections,
self.p.reconnections))
continue
except Exception as e:
self.put_notification(
self._create_error_notif(
e,
response))
`
If you find this solution to pass your better informed tests, I would appreciate it if you would integrate this so that my Dockerfile doesn't have to get too convoluted.
Thanks so much for your excellent contribution to the Backtrader Community!
Formatting attempt number 2:
oandav20store.py
def _t_streaming_events(self, q):
'''Callback method for streaming events'''
last_id = None
reconnections = 0
first_heartbeat_has_not_happened_yet = True
while True:
try:
response = self.oapi_stream.transaction.stream(
self.p.account
)
# process response
for msg_type, msg in response.parts():
if first_heartbeat_has_not_happened_yet:
if msg_type == 'transaction.TransactionHeartbeat':
last_id = msg.lastTransactionID
first_heartbeat_has_not_happened_yet = False
# if a connection occurred
if reconnections > 0:
if reconnections > 0 and last_id:
# get all transactions between the last seen and first from
# reconnected stream
old_transactions = self.get_transactions_since(
last_id)
for t in old_transactions:
#if msg_type == "transaction.Transaction":
if t.id > last_id:
self._transaction(t.dict())
last_id = t.id
reconnections = 0
if msg_type == "transaction.Transaction":
if not last_id or msg.id > last_id:
self._transaction(msg.dict())
last_id = msg.id
except (v20.V20ConnectionError, v20.V20Timeout) as e:
self.put_notification(str(e))
if (self.p.reconnections == 0 or self.p.reconnections > 0
and reconnections > self.p.reconnections):
# unable to reconnect after x times
self.put_notification(
"Giving up reconnecting streaming events")
return
reconnections += 1
if self.p.reconntimeout is not None:
_time.sleep(self.p.reconntimeout)
self.put_notification("Trying to reconnect streaming events ({} of {})".format(
reconnections,
self.p.reconnections))
continue
except Exception as e:
self.put_notification(
self._create_error_notif(
e,
response))
2.) I've removed the "if msg_type == "transaction.Transaction:" line. This ensures that the system will have an opportunity with each heartbeat to replay any transactions it may have missed due to a disconnection.
why would you do that? what different msg_types do you recieve there. the code is to ensure, that only transactions will get processed.
could you provide all msg_types you recieve there?
I'd be happy to,
In my forward testing, I only encountered messages of type Heartbeat and Transaction.
It appears to me that particular msg_type is referencing the msg_type of the heartbeat, not the msg_type of the transaction in the array of old_transactions (produced by self.get_transactions_since()). I considered doing something along the lines of
for msg_type, msg in response.parts():
, but after examining the get_transactions_since() function I fealt that it was a safe assumption that it was only returning messages of type Transaction, and indeed did not encounter any deviation from that assumption in my forward testing.
Here's get_transactions_since():
def get_transactions_since(self, id):
'''Returns all transactions since id'''
try:
response = self.oapi.transaction.since(
self.p.account,
id=id)
transactions = response.get('transactions', 200)
except (v20.V20ConnectionError, v20.V20Timeout) as e:
self.put_notification(str(e))
except Exception as e:
self.put_notification(
self._create_error_notif(
e,
response))
try:
return transactions
except NameError:
return None
i have committed an small update, could you check it out if it works for you
Sure thing, I'll post my results in a few minutes.
It's working wonderfully!
Several disconnections and it successfully replayed the missing transactions in the correct order, even when a disconnection occurred during the first trade. I would consider this issue resolved for my purposes.
Elegant code as well, keep up the great work Dasch!
@happydasch Why did you close this issue? This issue does not appear to be improving.
https://github.com/happydasch/btoandav20/commit/b525a0cc6d19e6e79e00d7f39948c6015d54dc67
no additional info and i cannot reproduce the issue, current state is as
It's working wonderfully!
Thank you for your reply. Does it really reflect the brightcircledevops's source?
I could not find it from the source differential.
#if msg_type == "transaction.Transaction":
as @brightcircledevops wrote, there were no other msg_types than "transaction.Transaction"
if you have some other msg_types being passed there, please report. The issue @brightcircledevops noted was about no first transaction id, which is now being taken into account.
I cannot reproduce the issue by myself, so i just can look into the code, logs, informations being posted here and try to fix that for you. As there is just the information available, that its working, i closed the issue. Please feel free with opening a new issue with more details and some logs, maybe some reproduceable code to see what the issue is.
I think the below code is not necessary.(That is " HERE " in code below) Because if msg_type that got from a stream is "transaction.TransactionHeartbeat", this fetching code doesn't work.
See: https://community.backtrader.com/topic/2967/order-synchronization/33
oandav20store.py