ethereum / web3.py

A python interface for interacting with the Ethereum blockchain and ecosystem.
http://web3py.readthedocs.io
MIT License
4.97k stars 1.7k forks source link

Async iteration of AsyncWeb3 doesn't work as expected #3375

Closed ryanc-bs closed 5 months ago

ryanc-bs commented 5 months ago

What happened?

I am using the latest stable release - 6.18.0

I'm trying out the AsyncWeb3 class following the usage as described in the docs (https://web3py.readthedocs.io/en/stable/providers.html#usage). I would like to maintain an indefinite subscription, automatically reconnecting where necessary due to connection drops. From my understanding this should be possible via the second example using the async for w3 in AsyncWeb3.persistent_websocket(... pattern, however this does not work as expected for me.

Using that example, I am able to connect and retrieve some values, however after a short while when receiving a ConnectionClosedError the connection is not automatically retried. Instead, the main asyncio task is cancelled meaning that I am unable to retry by catching websockets.ConnectionClosed.

Code that produced the error

import asyncio
import logging

from websockets.exceptions import ConnectionClosed
from web3 import AsyncWeb3
from web3.providers import WebsocketProviderV2

logging.basicConfig(level=logging.INFO)

async def main():
    async for w3 in AsyncWeb3.persistent_websocket(
        WebsocketProviderV2(...)
    ):
        try:
            subscription_id = await w3.eth.subscribe("newHeads")

            async for response in w3.ws.process_subscriptions():
                block_number = response["result"]["number"]
                logging.info("block number: %d", block_number)
        except ConnectionClosed:
            logging.info("connection closed, retrying")

if __name__ == "__main__":
    asyncio.run(main())

Full error output

File "/Users/ryancollingham/code/block-scholes/defiDataGrabber/main.py", line 24, in <module>
    asyncio.run(main())
  File "/Users/ryancollingham/.pyenv/versions/3.11.8/lib/python3.11/asyncio/runners.py", line 190, in run
    return runner.run(main)
           ^^^^^^^^^^^^^^^^
  File "/Users/ryancollingham/.pyenv/versions/3.11.8/lib/python3.11/asyncio/runners.py", line 118, in run
    return self._loop.run_until_complete(task)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/ryancollingham/.pyenv/versions/3.11.8/lib/python3.11/asyncio/base_events.py", line 654, in run_until_complete
    return future.result()
           ^^^^^^^^^^^^^^^
  File "/Users/ryancollingham/code/block-scholes/defiDataGrabber/main.py", line 19, in main
    except ConnectionClosed:
           ^^^^^^^^^^^^^^^
  File "/Users/ryancollingham/code/block-scholes/defiDataGrabber/repro.py", line 21, in main
    async for response in w3.ws.process_subscriptions():
  File "/Users/ryancollingham/.pyenv/versions/defiDataGrabber/lib/python3.11/site-packages/web3/manager.py", line 454, in __anext__
    return await self.manager._get_next_ws_message()
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/ryancollingham/.pyenv/versions/defiDataGrabber/lib/python3.11/site-packages/web3/manager.py", line 367, in _get_next_ws_message
    return await self._ws_message_stream().__anext__()
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/ryancollingham/.pyenv/versions/defiDataGrabber/lib/python3.11/site-packages/web3/manager.py", line 380, in _ws_message_stream
    response = await self._request_processor.pop_raw_response(subscription=True)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/ryancollingham/.pyenv/versions/defiDataGrabber/lib/python3.11/site-packages/web3/providers/websocket/request_processor.py", line 230, in pop_raw_response
    raw_response = await self._subscription_response_queue.get()
                   ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/ryancollingham/.pyenv/versions/3.11.8/lib/python3.11/asyncio/queues.py", line 158, in get
    await getter
asyncio.exceptions.CancelledError
ERROR:asyncio:Task exception was never retrieved
future: <Task finished name='Task-5' coro=<WebsocketProviderV2._ws_message_listener() done, defined at /Users/ryancollingham/.pyenv/versions/defiDataGrabber/lib/python3.11/site-packages/web3/providers/websocket/websocket_v2.py:212> exception=ConnectionClosedError(None, Close(code=<CloseCode.INTERNAL_ERROR: 1011>, reason='keepalive ping timeout'), None)>
Traceback (most recent call last):
  File "/Users/ryancollingham/.pyenv/versions/defiDataGrabber/lib/python3.11/site-packages/websockets/legacy/protocol.py", line 1301, in close_connection
    await self.transfer_data_task
  File "/Users/ryancollingham/.pyenv/versions/defiDataGrabber/lib/python3.11/site-packages/websockets/legacy/protocol.py", line 963, in transfer_data
    message = await self.read_message()
              ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/ryancollingham/.pyenv/versions/defiDataGrabber/lib/python3.11/site-packages/websockets/legacy/protocol.py", line 1033, in read_message
    frame = await self.read_data_frame(max_size=self.max_size)
            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/ryancollingham/.pyenv/versions/defiDataGrabber/lib/python3.11/site-packages/websockets/legacy/protocol.py", line 1108, in read_data_frame
    frame = await self.read_frame(max_size)
            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/ryancollingham/.pyenv/versions/defiDataGrabber/lib/python3.11/site-packages/websockets/legacy/protocol.py", line 1165, in read_frame
    frame = await Frame.read(
            ^^^^^^^^^^^^^^^^^
  File "/Users/ryancollingham/.pyenv/versions/defiDataGrabber/lib/python3.11/site-packages/websockets/legacy/framing.py", line 68, in read
    data = await reader(2)
           ^^^^^^^^^^^^^^^
  File "/Users/ryancollingham/.pyenv/versions/3.11.8/lib/python3.11/asyncio/streams.py", line 750, in readexactly
    await self._wait_for_data('readexactly')
  File "/Users/ryancollingham/.pyenv/versions/3.11.8/lib/python3.11/asyncio/streams.py", line 543, in _wait_for_data
    await self._waiter
asyncio.exceptions.CancelledError

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/Users/ryancollingham/.pyenv/versions/defiDataGrabber/lib/python3.11/site-packages/web3/providers/websocket/websocket_v2.py", line 236, in _ws_message_listener
    raise e
  File "/Users/ryancollingham/.pyenv/versions/defiDataGrabber/lib/python3.11/site-packages/web3/providers/websocket/websocket_v2.py", line 223, in _ws_message_listener
    async for raw_message in self._ws:
  File "/Users/ryancollingham/.pyenv/versions/defiDataGrabber/lib/python3.11/site-packages/websockets/legacy/protocol.py", line 498, in __aiter__
    yield await self.recv()
          ^^^^^^^^^^^^^^^^^
  File "/Users/ryancollingham/.pyenv/versions/defiDataGrabber/lib/python3.11/site-packages/websockets/legacy/protocol.py", line 568, in recv
    await self.ensure_open()
  File "/Users/ryancollingham/.pyenv/versions/defiDataGrabber/lib/python3.11/site-packages/websockets/legacy/protocol.py", line 948, in ensure_open
    raise self.connection_closed_exc()
websockets.exceptions.ConnectionClosedError: sent 1011 (internal error) keepalive ping timeout; no close frame received

Fill this section in if you know how this could or should be fixed

It seems like the error handling here is problematic: https://github.com/ethereum/web3.py/blob/08665fcd54e0e593ba6d19a884b4ba8c9b5753d0/web3/providers/persistent/websocket.py#L198-L209

Since the listener is run in a separate background task, when it cancels all tasks this prevents the ConnectionClosed error from being caught and handled by reconnecting.

web3 Version

6.18.0

Python Version

3.11.8

Operating System

osx

Output from pip freeze

aiohttp==3.9.5
aiosignal==1.3.1
attrs==23.2.0
bitarray==2.9.2
black==24.4.2
certifi==2024.2.2
charset-normalizer==3.3.2
ckzg==1.0.1
click==8.1.7
cytoolz==0.12.3
eth-account==0.11.2
eth-hash==0.7.0
eth-keyfile==0.8.1
eth-keys==0.5.1
eth-rlp==1.0.1
eth-typing==4.2.2
eth-utils==4.1.0
eth_abi==5.1.0
frozenlist==1.4.1
greenlet==3.0.3
hexbytes==0.3.1
idna==3.7
iniconfig==2.0.0
jsonschema==4.22.0
jsonschema-specifications==2023.12.1
lru-dict==1.2.0
msgpack==1.0.8
multidict==6.0.5
mypy==1.10.0
mypy-extensions==1.0.0
neovim==0.3.1
packaging==24.0
parsimonious==0.10.0
pathspec==0.12.1
platformdirs==4.2.1
pluggy==1.5.0
protobuf==5.26.1
pycryptodome==3.20.0
pynvim==0.5.0
pytest==8.2.0
pyunormalize==15.1.0
referencing==0.35.1
regex==2024.4.28
requests==2.31.0
rlp==4.0.1
rpds-py==0.18.0
ruff==0.4.2
toolz==0.12.1
typing_extensions==4.11.0
urllib3==2.2.1
web3==6.18.0
websockets==12.0
yarl==1.9.4
fselmo commented 5 months ago

Thanks for reporting this @ryanc-bs and putting it back in the spotlight. This is a known bug. I believe that, for now, you can set the silence_listener_task_exceptions kwarg on the WebsocketProviderV2 and catch all Exception cases in your code block since the listener task may raise a asyncio.exceptions.CancelledError I believe.

I hope to get to this next on my list if not very soon.

ryanc-bs commented 5 months ago

Thanks, apologies if this has already been reported - I could not find anything similar in the open issues currently.

I tried setting the silence_listener_task_exceptions parameter to True however that does not appear to work well for me as it gets stuck in an error loop:

ERROR:web3.providers.WebsocketProviderV2:Exception caught in listener, error logging and keeping listener background task alive.
    error=ConnectionClosedError: sent 1011 (internal error) keepalive ping timeout; no close frame received
ERROR:web3.providers.WebsocketProviderV2:Exception caught in listener, error logging and keeping listener background task alive.
    error=ConnectionClosedError: sent 1011 (internal error) keepalive ping timeout; no close frame received
ERROR:web3.providers.WebsocketProviderV2:Exception caught in listener, error logging and keeping listener background task alive.
    error=ConnectionClosedError: sent 1011 (internal error) keepalive ping timeout; no close frame received
ERROR:web3.providers.WebsocketProviderV2:Exception caught in listener, error logging and keeping listener background task alive.
    error=ConnectionClosedError: sent 1011 (internal error) keepalive ping timeout; no close frame received
ERROR:web3.providers.WebsocketProviderV2:Exception caught in listener, error logging and keeping listener background task alive.
    error=ConnectionClosedError: sent 1011 (internal error) keepalive ping timeout; no close frame received
...

What does seem to work well is using the async context manager and wrapping in a retry loop like:

async def main():
    while True:
        try:
            await subscribe()
        except ConnectionClosed:
            logging.info("connection closed, retrying")

async def subscribe():
    async with AsyncWeb3.persistent_websocket(
        WebsocketProviderV2(...)
    ) as w3:
        subscription_id = await w3.eth.subscribe("newHeads")

        async for response in w3.ws.process_subscriptions():
            block_number = response["result"]["number"]
            logging.info("block number: %d", block_number)

It's a little unclear to me why the error handling in this case is different to that of the async iterator, from a quick glance at the source code I could not find where the relevant __aenter__ / __aiter__ methods are defined exactly.

Also with respect to the asyncio.CancelledError, since that is inheriting from BaseException it cannot be caught by catching Exception instances. Also it is my understanding that this shouldn't be caught without re-raising as per the docs: https://docs.python.org/3/library/asyncio-exceptions.html#asyncio.CancelledError

fselmo commented 5 months ago

Also with respect to the asyncio.CancelledError, since that is inheriting from BaseException it cannot be caught by catching Exception instances.

Yep, that checks out. You'd have to handle that separately.

Also it is my understanding that this shouldn't be caught without re-raising as per the docs: https://docs.python.org/3/library/asyncio-exceptions.html#asyncio.CancelledError

Correct. In any cases where there isn't a bug, this should be handled appropriately. This is definitely a bug at the moment but I'm glad you have a workaround for now.

Thanks, apologies if this has already been reported - I could not find anything similar in the open issues currently.

It was only reported in the Discord server so this actually helps us keep track of it so thank you! :)

I'll get to this asap 👍🏼