ethereum / web3.py

A python interface for interacting with the Ethereum blockchain and ecosystem.
http://web3py.readthedocs.io
MIT License
4.89k stars 1.67k forks source link

`PersistentConnectionProvider` UX Improvements #3397

Open fselmo opened 1 month ago

fselmo commented 1 month ago

The newer PersistentConnectionProvider implementations, mostly the WebSocketProvider, seem to be getting a lot of use and a lot of good feedback is coming back from users, whether through issues or Discord. It's time to start thinking about some good UX abstractions that can help facilitate subscription management, re-connection logic, and anything else that might make sense to bake into the library.

Some ideas that have been brewing:


If you have UX improvements, pain points, or any comments on anything discussed in this issue, please feel free to join the discussion below!

DefiDebauchery commented 1 month ago

You hit on the two that I'm most interested in! As a user, I don't want to have to manage process_subscriptions().

Not specific to the Persistent Provider itself, but as far as overall UX - and to tack onto the mention of .subscribe(), do you think it would be worthwhile to add optional kwargs to the method instead of (or in addition to) relying on LogsSubscriptionArg?

    async def subscribe(
        self,
        subscription_type: SubscriptionType,  # Meh
        subscription_arg: Optional[
            Union[
                LogsSubscriptionArg,  # logs, optional filter params
                bool,  # newPendingTransactions, full_transactions
            ]
        ] = None,
        callback: Optional[Callable] = None,  # Sneaking that in here
        *,
        address: Optional[
            Union[
                Address,
                ChecksumAddress,
                ENS,
                Sequence[Union[Address, ChecksumAddress, ENS]]
            ],
        ]= None,
        topics: Optional[Sequence[Union[HexStr, Sequence[HexStr]]]] = None,
    ) -> HexStr:

This will certainly require some additional sanity-checking, and may even require separate methods for subscriptions between newHeads and logs (et al; which wouldn't be terrible, as I don't necessarily love subscription_type), but it may be more expressive to have a defined set of arguments, while still allowing people to use pre-packaged log structures.

clover-es commented 1 month ago

For me as web3.py user, my expectation was to have some class abstraction such as SubscriptionManager that will allow to:

This is a quick draft of the SubscriptionManager class I provided in Discord: ```python import asyncio from typing import Dict from eth_typing import HexStr from web3 import AsyncWeb3, WebSocketProvider from web3.types import SubscriptionType from websockets import ConnectionClosedError, ConnectionClosed class SubscriptionHandler: w3_socket: AsyncWeb3 = None # Dictionary containing callbacks for each subscription id callbacks: Dict[HexStr, callable] = {} def __init__(self, wss_url): self.wss_url = wss_url async def process_subscriptions(self) -> None: """ Performs the websocket connection and processes the subscriptions and calls the callbacks :return: None """ async for self.w3_socket in AsyncWeb3(WebSocketProvider(self.wss_url)): try: async for message in self.w3_socket.socket.process_subscriptions(): try: self.callbacks[message["subscription"]](message["result"]) except ValueError as e: try: print(f"Callback for {message['subscription']} not found") except ValueError as e: print(f"Unexpected response from RPC: {e}") except (ConnectionClosedError, ConnectionClosed) as e: continue except asyncio.CancelledError: print("Cancelling subscriptions") for sub_id in self.callbacks.keys(): await self.w3_socket.eth.unsubscribe(sub_id) break async def subscribe( self, callback: callable, event_type: SubscriptionType, **event_params ) -> HexStr: """ Subscribes to the given event type with the given callback. Must be called while process_subscriptions() task is running :param callback: The function to call when the event is received :param event_type: The event type to subscribe to :param event_params: Additional parameters to pass to the subscription :return: The subscription ID """ if self.is_connected(): sub_id = await self.w3_socket.eth.subscribe(event_type, event_params) print(f"Subscribed to {sub_id}") self.callbacks[sub_id] = callback return sub_id else: raise RuntimeError( "Websocket connection not established, it's not possible to subscribe" ) async def unsubscribe(self, sub_id: HexStr) -> None: """ Unsubscribes from a subscription identified by sub_id. Must be called while process_subscriptions() task is running :param sub_id: The subscription ID to unsubscribe from :return: None """ if self.is_connected(): await self.w3_socket.eth.unsubscribe(sub_id) self.callbacks.pop(sub_id) else: raise RuntimeError( "Websocket connection not established, it's not possible to unsubscribe" ) def is_connected(self) -> bool: return self.w3_socket is not None ```
And usage of the class itself: ```python def callback_logs(message): print(f"New log received: {message}") def callback_heads(message): print(f"New header received: {message}") async def main(): subs_handler = SubscriptionHandler("wss://eth.drpc.org") # Connects to the RPC wss sub_task = asyncio.create_task(subs_handler.process_subscriptions()) # Waits for the connection to be established while not subs_handler.is_connected(): await asyncio.sleep(1) # Subscribes to desired events await subs_handler.subscribe( callback_logs, "logs", address="0xb4e16d0168e52d35cacd2c6185b44281ec28c9dc" ) new_heads_id = await subs_handler.subscribe(callback_heads, "newHeads") try: await asyncio.sleep(10) # Unsubscribe from new heads after 10 seconds (test unsubscribe) await subs_handler.unsubscribe(new_heads_id) while True: await asyncio.sleep(0) except asyncio.CancelledError: sub_task.cancel() await sub_task if __name__ == "__main__": asyncio.run(main()) ```
jimtje commented 1 month ago

I think it would be great if the logic and client can be separated, both to future-proof in case of any changes in the underlying libraries that is out of the control of the maintainers here the interface here can be maintained. It also would allow for better integration of both other async libraries (trio, curio, even gevent even though it's not technically async) but also different clients (httpx being the most prominent, also trio-websocket or any AnyIO based client). People have hacked together their own implementations of their stack (myself included), but what that turns into is that I end up using multiple web3 clients which is fine for personal use (even if it may involve transactions on the level of a small country's GDP in terms of values transacted) but is not ideal for people who perhaps aren't or haven't been writing code to interact with web3 nodes constantly and the abstraction is not merely shortcuts but maybe the primary way to learn how one interacts with web3 writ large programmatically.

This might be a slightly longer-term project and cover more than just PersistentConnectionProvider - although that is part of it for sure, but luckily libraries like AnyIO would help quite a bit, and there isn't really a deadline per se. I don't know how many people use other clients and implementations for most of the stack, but I suspect that httpx at least has a fair number of users, which felt unlikely as recent as the first year of COVID. I think web3 adoption is only going to increase and a library that is able to reduce friction for both maintainers/contributors and users in the long run would be helpful for a lot of people.