ethereum / web3.py

A python interface for interacting with the Ethereum blockchain and ecosystem.
http://web3py.readthedocs.io
MIT License
4.89k stars 1.67k forks source link

Fix websocket provider receives nothing forever when the peer server is closed or gone #3424

Open kaiix opened 1 week ago

kaiix commented 1 week ago

This should fix websocket provider receives nothing forever when the peer server is closed or gone.

When receiving websocket messages using iterator pattern, websockets.py suppresses ConnectionClosedOK errors. so _provider_specific_message_listener will stop iterating without an error, and _message_listener will continue the while loop without knowing that the connection is closed and will not receive any messages.

The fix use recv() function to allow the websocket to throw the ConnectionClosedOK exception.

(After merging this PR, I'll port the fix to the main branch)

fselmo commented 2 days ago

Thanks @kaiix, I do like this change and I think it makes a lot of sense. Since the ConnectionClosedOK isn't used as an error exception, but rather a signal that the connection was closed with a purpose and a successful callback, do you think this should be handled separately from silence_listener_task_exceptions?

In my mind, this could go one of two ways. We can handle the ConnectionClosedOK exceptions as a break in the listener task and shut it down, or we can let the user handle that. I think it may be quite noisy to always propagate that exception and leave it to the user to handle. One idea is that we could use a separate configuration for this, default it to be silently handled, but if a user wants full control they can choose their own logic for when a connection is successfully closed on purpose by the server.

Thoughts on that?

cc: @kclowes, @pacrob, @reedsa

fselmo commented 2 days ago

One idea is that we could use a separate configuration for this, default it to be silently handled, but if a user wants full control they can choose their own logic for when a connection is successfully closed on purpose by the server.

I imagine this exists in some shape of form like this:

websocket_v2.py (untouched from your code)

    async def _provider_specific_message_listener(self) -> None:
        while True:
            raw_message = await self._ws.recv()
            response = json.loads(raw_message)
            subscription = response.get("method") == "eth_subscription"
            await self._request_processor.cache_raw_response(
                response, subscription=subscription
            )

persistent.py

    async def _message_listener(self) -> None:
        self.logger.info(
            f"{self.__class__.__qualname__} listener background task started. Storing "
            "all messages in appropriate request processor queues / caches to be "
            "processed."
        )
        while True:
            # the use of sleep(0) seems to be the most efficient way to yield control
            # back to the event loop to share the loop with other tasks.
            await asyncio.sleep(0)
            try:
                await self._provider_specific_message_listener()
            except ConnectionClosedOK as e:
                self.logger.info("Connection closed successfully, disconnecting...")
                if self.raise_on_successful_close:
                    raise e
                else:
                    await self.disconnect()
                    break
            except Exception as e:
                if not self.silence_listener_task_exceptions:
                    raise e
                else:
                    self._error_log_listener_task_exception(e)
kaiix commented 1 day ago

@fselmo Understand what you mean, I think just rethrowing ConnectionClosedOK from message_listener_task is enough and don't need to introduce any other options to control the behavior.

The modified code looks like this:

persistent.py

    async def _message_listener(self) -> None:
        self.logger.info(
            f"{self.__class__.__qualname__} listener background task started. Storing "
            "all messages in appropriate request processor queues / caches to be "
            "processed."
        )
        while True:
            # the use of sleep(0) seems to be the most efficient way to yield control
            # back to the event loop to share the loop with other tasks.
            await asyncio.sleep(0)
            try:
                await self._provider_specific_message_listener()
            except ConnectionClosedOK:
                raise
            except Exception as e:
                if not self.silence_listener_task_exceptions:
                    raise e
                else:
                    self._error_log_listener_task_exception(e)

When the message listener encounters the ConnectionClosedOK, the listener task will end with the exception, _get_next_ws_message (ws.recv() and ws.process_subscriptions() use it) will rethrowing it in _handle_listener_task_exceptions.

If the user uses async iterator to process websocket subscriptions (which is the recommended in the doc) like this

async for response in w3.ws.process_subscriptions():
  ...

the rethrowed ConnectionClosedOK will cause a StopAsyncIteration in _AsyncPersistentMessageStream, which stopped the for loop as there are no more responses. It looks very reasonable.

Only when the user use recv() directly (which is not the recommended usage), they will need to handle ConnectionClosedOK exception manually like this:

while True:
  try:
    response =  w3.ws.recv()
  except ConnectionClosed:  # ConnectionClosed is the base class of ConnectionClosedOK
    break

How do you think?

kaiix commented 1 day ago

I first noticed this issue when listening for new blocks using a public node (and it turns out to be unstable), and I found that at some point, the CPU usage is 100% while the provider received nothing.

Just some additional information to help developers with the same problem find this issue when searching for high CPU usage.

fselmo commented 1 day ago

@kaiix I think that makes good sense. Have you had a chance to test this out? Do you mind updating this PR to reflect these changes?