Closed MrNaif2018 closed 2 years ago
I have a feeling that we can patch aiorpcx to wrap the task in it's TimeoutAfter handler in this coroutine:
async def _ensure_cancellation(coro):
watcher = asyncio.Event()
task = asyncio.ensure_future(coro)
task.add_done_callback(lambda _: watcher.set())
try:
await watcher.wait()
except asyncio.CancelledError:
task.cancel()
await watcher.wait()
raise
return task.result()
Ref dask/distributed#6235
OldTaskGroup might be impacting too, as it is the one which runs FxThread.run
I have patched update_safe
this way:
if not hasattr(self, "done"):
with mock.patch.object(self, "get_rates",side_effect=TaskTimeout(5)):
self.done = True
self.quotes = await self.get_rates(ccy)
else:
self.quotes = await self.get_rates(ccy)
Is it possible that this exception is raised? Maybe the issue is exactly because CancelledError bases off BaseException now, so it is not caught anywhere?
Ok, here are the full logs:
Traceback (most recent call last):
File "/home/electrum/.local/lib/python3.9/site-packages/electrum/exchange_rate.py", line 98, in update_safe
self.quotes = await self.get_rates(ccy)
File "/home/electrum/.local/lib/python3.9/site-packages/electrum/exchange_rate.py", line 337, in get_rates
json = await self.get_json('api.coingecko.com', '/api/v3/exchange_rates')
File "/home/electrum/.local/lib/python3.9/site-packages/electrum/exchange_rate.py", line 76, in get_json
async with session.get(url) as response:
File "/home/electrum/.local/lib/python3.9/site-packages/aiohttp/client.py", line 1138, in __aenter__
self._resp = await self._coro
File "/home/electrum/.local/lib/python3.9/site-packages/aiohttp/client.py", line 634, in _request
break
File "/home/electrum/.local/lib/python3.9/site-packages/aiohttp/helpers.py", line 721, in __exit__
raise asyncio.TimeoutError from None
asyncio.exceptions.TimeoutError
It then goes into async with timeout_after() block and gets hang forever
Should be fixed by #7955
There are more relevant details in https://github.com/spesmilo/electrum/pull/7955 , but I would like to collect everything in this thread instead.
@accumulator in https://github.com/spesmilo/electrum/pull/7955#issuecomment-1225841951 :
There is one more scenario I ran in to a while back. In certain conditions (I believe a TimeoutError) the FX updates stop completely. It is particularly noticable on Android, as android freezes the app when put in the background, leading to a timeout later.
@accumulator in https://github.com/spesmilo/electrum/pull/7955#issuecomment-1227016933 :
So, I've reproduced the issue by blocking the exchange service using ip(6)tables.
Flow is as follows:
- app startup, exchange_rate._trigger is triggered from code, leading to exit from timeout block and request to exchange rate service
- exchange rate fetch times out after ~45 seconds, 'failed fx quotes: TimeoutError()'
- loop restarts, entering
async with timeout_after(150)
again, waiting for trigger- timeout never happens
It looks like the
aiorpcx.curio.timeout_after
is not reliably timing out (not sure why, might be tied to a request object somewhere?). The patch I added usesasyncio.wait_for
instead, and is reliably timing out.
@MrNaif2018 in https://github.com/spesmilo/electrum/pull/7955#issuecomment-1227060429 :
we need to wait to get asyncio.exceptions.TimeoutError, then we need to wait for timeout_after to timeout
sudo iptables -I OUTPUT -d api.coingecko.com -j DROP
Change timeout to something like 10 seconds and see it get stuck
Thanks for the reproduction steps. Using those, I have tried to investigate the root cause.
With this patch:
diff --git a/electrum/exchange_rate.py b/electrum/exchange_rate.py
index e2c4add95e..e9b573bc4c 100644
--- a/electrum/exchange_rate.py
+++ b/electrum/exchange_rate.py
@@ -429,7 +429,7 @@ class Biscoint(ExchangeBase):
class Walltime(ExchangeBase):
async def get_rates(self, ccy):
- json = await self.get_json('s3.amazonaws.com',
+ json = await self.get_json('s3.amazonaws.com',
'/data-production-walltime-info/production/dynamic/walltime-info.json')
return {'BRL': to_decimal(json['BRL_XBT']['last_inexact'])}
@@ -541,19 +541,33 @@ class FxThread(ThreadJob, EventListener):
return fmt_str.format(rounded_amount)
async def run(self):
- while True:
- # approx. every 2.5 minutes, refresh spot price
- try:
- async with timeout_after(150):
- await self._trigger.wait()
- self._trigger.clear()
- # we were manually triggered, so get historical rates
- if self.is_enabled() and self.show_history():
- self.exchange.get_historical_rates(self.ccy, self.cache_dir)
- except TaskTimeout:
- pass
- if self.is_enabled():
- await self.exchange.update_safe(self.ccy)
+ self.logger.info(f"heyheyhey. run() entered")
+ try:
+ while True:
+ task = asyncio.current_task()
+ self.logger.info(f"heyheyhey. run(). cp1. new iter starts. {task=}. {getattr(task, '_externally_cancelled', None)=}")
+ # approx. every 2.5 minutes, refresh spot price
+ try:
+ async with timeout_after(10):
+ self.logger.info(f"heyheyhey. run(). cp2. timeout_after entered.")
+ await self._trigger.wait() # <<<
+ self._trigger.clear()
+ self.logger.info(f"heyheyhey. run(). cp3")
+ # we were manually triggered, so get historical rates
+ if self.is_enabled() and self.show_history():
+ self.exchange.get_historical_rates(self.ccy, self.cache_dir)
+ except TaskTimeout:
+ self.logger.info(f"heyheyhey. run(). cp4. got TaskTimeout")
+ pass
+ self.logger.info(f"heyheyhey. run(). cp5")
+ if self.is_enabled():
+ self.logger.info(f"heyheyhey. run(). cp6. calling update_safe")
+ await self.exchange.update_safe(self.ccy)
+ except BaseException as e:
+ self.logger.exception(f"heyheyhey. outer exception: {e!r}")
+ raise
+ finally:
+ self.logger.info(f"heyheyhey. run() finally exiting.")
def is_enabled(self):
return bool(self.config.get('use_exchange_rate', DEFAULT_ENABLED))
warning: in the working copy of 'electrum/exchange_rate.py', LF will be replaced by CRLF the next time Git touches it
diff --git a/electrum/util.py b/electrum/util.py
index 94cc06f78b..b09fa76d42 100644
--- a/electrum/util.py
+++ b/electrum/util.py
@@ -1283,7 +1283,7 @@ def make_aiohttp_session(proxy: Optional[dict], headers=None, timeout=None):
if timeout is None:
# The default timeout is high intentionally.
# DNS on some systems can be really slow, see e.g. #5337
- timeout = aiohttp.ClientTimeout(total=45)
+ timeout = aiohttp.ClientTimeout(total=5)
elif isinstance(timeout, (int, float)):
timeout = aiohttp.ClientTimeout(total=timeout)
ssl_context = ssl.create_default_context(purpose=ssl.Purpose.SERVER_AUTH, cafile=ca_path)
@@ -1365,6 +1365,7 @@ def _aiorpcx_monkeypatched_set_new_deadline(task, deadline):
task._orig_cancel()
task._timed_out = None if getattr(task, "_externally_cancelled", False) else deadline
def mycancel(*args, **kwargs):
+ _logger.info(f"mycancel() entered. {task=}. {args=}. {kwargs=}", stack_info=True)
task._orig_cancel(*args, **kwargs)
task._externally_cancelled = True
task._timed_out = None
I get this log:
I | exchange_rate.FxThread | heyheyhey. run() entered
I | exchange_rate.FxThread | heyheyhey. run(). cp1. new iter starts. task=<Task pending name='Task-11' coro=<FxThread.run() running at /home/user/wspace/electrum/electrum/exchange_rate.py:548> cb=[TaskGroup._on_done()]>. getattr(task, '_externally_cancelled', None)=None
I | exchange_rate.FxThread | heyheyhey. run(). cp2. timeout_after entered.
I | exchange_rate.FxThread | heyheyhey. run(). cp3
I | exchange_rate.FxThread | heyheyhey. run(). cp5
I | exchange_rate.FxThread | heyheyhey. run(). cp6. calling update_safe
I | exchange_rate.CoinGecko | getting fx quotes for EUR
I | util | mycancel() entered. task=<Task pending name='Task-11' coro=<FxThread.run() running at /home/user/wspace/electrum/electrum/exchange_rate.py:565> wait_for=<Future pending cb=[BaseSelectorEventLoop._sock_write_done(9, handle=<Handle BaseS...9.120', 443))>)(), Task.task_wakeup()]> cb=[TaskGroup._on_done()]>. args=(). kwargs={}
Stack (most recent call last):
File "/usr/lib/python3.10/threading.py", line 966, in _bootstrap
self._bootstrap_inner()
File "/usr/lib/python3.10/threading.py", line 1009, in _bootstrap_inner
self.run()
File "/home/user/wspace/electrum/electrum/util.py", line 1159, in run_with_except_hook
run_original(*args2, **kwargs2)
File "/usr/lib/python3.10/threading.py", line 946, in run
self._target(*self._args, **self._kwargs)
File "/home/user/wspace/electrum/electrum/util.py", line 1498, in run_event_loop
loop.run_until_complete(stopping_fut)
File "/usr/lib/python3.10/asyncio/base_events.py", line 633, in run_until_complete
self.run_forever()
File "/usr/lib/python3.10/asyncio/base_events.py", line 600, in run_forever
self._run_once()
File "/usr/lib/python3.10/asyncio/base_events.py", line 1896, in _run_once
handle._run()
File "/usr/lib/python3.10/asyncio/events.py", line 80, in _run
self._context.run(self._callback, *self._args)
File "/home/user/wspace/electrum/packages/aiohttp/helpers.py", line 667, in __call__
cb(*args, **kwargs)
File "/home/user/wspace/electrum/packages/aiohttp/helpers.py", line 727, in timeout
task.cancel()
File "/home/user/wspace/electrum/electrum/util.py", line 1368, in mycancel
_logger.info(f"mycancel() entered. {task=}. {args=}. {kwargs=}", stack_info=True)
I | exchange_rate.CoinGecko | failed fx quotes: TimeoutError()
I | exchange_rate.FxThread | heyheyhey. run(). cp1. new iter starts. task=<Task pending name='Task-11' coro=<FxThread.run() running at /home/user/wspace/electrum/electrum/exchange_rate.py:548> cb=[TaskGroup._on_done()]>. getattr(task, '_externally_cancelled', None)=True
I | exchange_rate.FxThread | heyheyhey. run(). cp2. timeout_after entered.
E | exchange_rate.FxThread | heyheyhey. outer exception: CancelledError()
Traceback (most recent call last):
File "/home/user/wspace/electrum/electrum/exchange_rate.py", line 553, in run
await self._trigger.wait() # <<<
File "/usr/lib/python3.10/asyncio/locks.py", line 214, in wait
await fut
asyncio.exceptions.CancelledError
I | exchange_rate.FxThread | heyheyhey. run() finally exiting.
So the issue is with the aiorpcx monkey patch in util.py
, as it relies on side-effecting the asyncio.Task
, and it patches Task.cancel
. However, aiohttp
also uses Task.cancel
for its own timeouting of the http request, with the same Task
object, and this confuses timeout_after
. Ultimately FxThread.run
exits.
I am not yet sure how to properly fix.
@SomberNight Maybe this one would help? https://github.com/spesmilo/electrum/issues/7952#issuecomment-1225560100
Opening an issue to document our discussion on IRC
After running in a long-running process for quite some time (usually it happens every day and fixes itself after restart), exchange rates no longer fetch.
Last log lines displayed are fetching exchange rates for USD, and then
exchange_rate.CoinGecko | failed fx quotes: TimeoutError
After that, no fetching exchange rates messages ever appear, set_currency doesn't help either, which means that main process got stopped by some exception possibly.
I think it might be because of cf3f92531ec99befd2ab1df628649498931ca78a, 2c57c78ebe77287ba9698225bf8e20880b8fee62 or aiorpcx upgrade
It is something in between a3e1d2e25e78231b30ff1103345095827a909d88 and 73ba00d7dd87af5e63ab2a30617fae1b8d36d0c4 or python version update from 3.7 to 3.9 in our case