encode / httpcore

A minimal HTTP client. ⚙️
https://www.encode.io/httpcore/
BSD 3-Clause "New" or "Revised" License
447 stars 96 forks source link

httpx 0.21.0 showing pool-connection traceback when 0.20.0 does not. #510

Closed jeremyschulman closed 2 years ago

jeremyschulman commented 2 years ago

I have observed an issue in version 0.21.0 that is not found in 0.20.0.

This appears to be related to: https://github.com/encode/httpx/issues/1461

When I am attempting to cancel tasks that have open connections:

    for task in asyncio.all_tasks():
        task.cancel()
    try:
        await asyncio.sleep(1)
    except:
        pass

Traceback

# ===> 
  File "/opt/jeremy/Demos/BeginnerAsyncIO/venv/lib/python3.8/site-packages/httpx/_client.py", line 1982, in __aexit__
    await self._transport.__aexit__(exc_type, exc_value, traceback)
  File "/opt/jeremy/Demos/BeginnerAsyncIO/venv/lib/python3.8/site-packages/httpx/_transports/default.py", line 270, in __aexit__
    await self._pool.__aexit__(exc_type, exc_value, traceback)
  File "/opt/jeremy/Demos/BeginnerAsyncIO/venv/lib/python3.8/site-packages/httpcore/_async/connection_pool.py", line 326, in __aexit__
    await self.aclose()
  File "/opt/jeremy/Demos/BeginnerAsyncIO/venv/lib/python3.8/site-packages/httpcore/_async/connection_pool.py", line 312, in aclose
    raise RuntimeError(
RuntimeError: The connection pool was closed while 1 HTTP requests/responses were still in-flight.

The above code does work AOK in version 0.20.

Any thoughts/ideas I could try? Thank you!

Cheers, -- Jeremy

jeremyschulman commented 2 years ago

Experimenting with the dependencies, I can actually use httpx==0.22.0, but I had to drop back to httpcore==0.14.5. When I use httpcore==0.14.6 I see the issue.

tomchristie commented 2 years ago

Hiya Jeremy,

Could you give a more complete code example that replicates the issue?

We've got some new error checking there which is (properly) triggering in the newer version.

It looks like you might be doing something like cancelling tasks in a way that closes the connection pool before cancelling requests handled by the pool. But it's a bit hard to say without a concrete example.

jeremyschulman commented 2 years ago

@tomchristie - You can find the complete code repository here: https://github.com/jeremyschulman/demo-beginner-concurrency

And the specific code I was working on is here: https://github.com/jeremyschulman/demo-beginner-concurrency/blob/main/demo_beginner_asyncio/find_macaddr.py#L94

For the current code, I removed the loop to cancel the tasks; but it was located previously: https://github.com/jeremyschulman/demo-beginner-concurrency/blob/main/demo_beginner_asyncio/find_macaddr.py#L114

Hope this helps. I can run any code and provide output if you need. Really appreciate you looking into this! Thank you!

jeremyschulman commented 2 years ago

I should also note, in the previous version, I was using asyncio.create_task() in the code here: https://github.com/jeremyschulman/demo-beginner-concurrency/blob/main/demo_beginner_asyncio/find_macaddr.py#L100

jeremyschulman commented 2 years ago

Ok, thought it would be more helpful to create a branch with the code that throws the issue: https://github.com/jeremyschulman/demo-beginner-concurrency/blob/devtest-httpx-510/demo_beginner_asyncio/find_macaddr.py#L94

With httpx==0.22.0, httpcore==0.14.5 I observe this output:

(venv) [jschulman@nms01 BeginnerAsyncIO]$  demo find-host -m 9c93.4eb6.930c
Locating switch-port for host with MAC-Address 9c:93:4e:b6:93:0c
Locating host ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╸━━━━ 262/295 0:00:01
Found on device lx6w13.nyc1, interface Ethernet29
Traceback (most recent call last):
  File "/opt/jeremy/Demos/BeginnerAsyncIO/venv/bin/demo", line 5, in <module>
    main()
  File "/opt/jeremy/Demos/BeginnerAsyncIO/demo-beginner-asyncio/demo_beginner_asyncio/cli.py", line 91, in main
    cli()
  File "/opt/jeremy/Demos/BeginnerAsyncIO/venv/lib/python3.8/site-packages/click/core.py", line 1128, in __call__
    return self.main(*args, **kwargs)
  File "/opt/jeremy/Demos/BeginnerAsyncIO/venv/lib/python3.8/site-packages/click/core.py", line 1053, in main
    rv = self.invoke(ctx)
  File "/opt/jeremy/Demos/BeginnerAsyncIO/venv/lib/python3.8/site-packages/click/core.py", line 1659, in invoke
    return _process_result(sub_ctx.command.invoke(sub_ctx))
  File "/opt/jeremy/Demos/BeginnerAsyncIO/venv/lib/python3.8/site-packages/click/core.py", line 1395, in invoke
    return ctx.invoke(self.callback, **ctx.params)
  File "/opt/jeremy/Demos/BeginnerAsyncIO/venv/lib/python3.8/site-packages/click/core.py", line 754, in invoke
    return __callback(*args, **kwargs)
  File "/opt/jeremy/Demos/BeginnerAsyncIO/venv/lib/python3.8/site-packages/click/decorators.py", line 26, in new_func
    return f(get_current_context(), *args, **kwargs)
  File "/opt/jeremy/Demos/BeginnerAsyncIO/demo-beginner-asyncio/demo_beginner_asyncio/cli.py", line 76, in cli_find_macaddr
    asyncio.run(find_macaddr.main(inventory=inventory, macaddr=macaddr))
  File "/usr/local/lib/python3.8/asyncio/runners.py", line 44, in run
    return loop.run_until_complete(main)
  File "/usr/local/lib/python3.8/asyncio/base_events.py", line 616, in run_until_complete
    return future.result()
asyncio.exceptions.CancelledError
jeremyschulman commented 2 years ago

With httpcore==0.14.7 I observe this output:

venv) [jschulman@nms01 BeginnerAsyncIO]$  demo find-host -m 9c93.4eb6.930c
Locating switch-port for host with MAC-Address 9c:93:4e:b6:93:0c
Locating host ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╸━━━━━ 256/295 0:00:01
Found on device lx6w13.nyc1, interface Ethernet29
unhandled exception during asyncio.run() shutdown
task: <Task finished name='Task-130' coro=<device_find_host_macaddr() done, defined at /opt/jeremy/Demos/BeginnerAsyncIO/demo-beginner-asyncio/demo_beginner_asyncio/find_macaddr.py:68> exception=RuntimeError('The connection pool was closed while 1 HTTP requests/responses were still in-flight.')>
Traceback (most recent call last):
  File "/opt/jeremy/Demos/BeginnerAsyncIO/demo-beginner-asyncio/demo_beginner_asyncio/find_macaddr.py", line 73, in device_find_host_macaddr
    res = await dev.cli(command=f"show mac address-table address {macaddr}")
  File "/opt/jeremy/Demos/BeginnerAsyncIO/venv/lib/python3.8/site-packages/aioeapi/device.py", line 183, in cli
    res = await self.jsonrpc_exec(jsonrpc)
  File "/opt/jeremy/Demos/BeginnerAsyncIO/venv/lib/python3.8/site-packages/aioeapi/device.py", line 226, in jsonrpc_exec
    res = await self.post("/command-api", json=jsonrpc)
  File "/opt/jeremy/Demos/BeginnerAsyncIO/venv/lib/python3.8/site-packages/httpx/_client.py", line 1820, in post
    return await self.request(
  File "/opt/jeremy/Demos/BeginnerAsyncIO/venv/lib/python3.8/site-packages/httpx/_client.py", line 1506, in request
    return await self.send(request, auth=auth, follow_redirects=follow_redirects)
  File "/opt/jeremy/Demos/BeginnerAsyncIO/venv/lib/python3.8/site-packages/httpx/_client.py", line 1601, in send
    await response.aread()
  File "/opt/jeremy/Demos/BeginnerAsyncIO/venv/lib/python3.8/site-packages/httpx/_models.py", line 1664, in aread
    self._content = b"".join([part async for part in self.aiter_bytes()])
  File "/opt/jeremy/Demos/BeginnerAsyncIO/venv/lib/python3.8/site-packages/httpx/_models.py", line 1664, in <listcomp>
    self._content = b"".join([part async for part in self.aiter_bytes()])
  File "/opt/jeremy/Demos/BeginnerAsyncIO/venv/lib/python3.8/site-packages/httpx/_models.py", line 1680, in aiter_bytes
    async for raw_bytes in self.aiter_raw():
  File "/opt/jeremy/Demos/BeginnerAsyncIO/venv/lib/python3.8/site-packages/httpx/_models.py", line 1734, in aiter_raw
    async for raw_stream_bytes in self.stream:
  File "/opt/jeremy/Demos/BeginnerAsyncIO/venv/lib/python3.8/site-packages/httpx/_client.py", line 145, in __aiter__
    async for chunk in self._stream:
  File "/opt/jeremy/Demos/BeginnerAsyncIO/venv/lib/python3.8/site-packages/httpx/_transports/default.py", line 239, in __aiter__
    async for part in self._httpcore_stream:
  File "/opt/jeremy/Demos/BeginnerAsyncIO/venv/lib/python3.8/site-packages/httpcore/_async/connection_pool.py", line 346, in __aiter__
    async for part in self._stream:
  File "/opt/jeremy/Demos/BeginnerAsyncIO/venv/lib/python3.8/site-packages/httpcore/_async/http11.py", line 294, in __aiter__
    await self.aclose()
  File "/opt/jeremy/Demos/BeginnerAsyncIO/venv/lib/python3.8/site-packages/httpcore/_async/http11.py", line 301, in aclose
    await self._connection._response_closed()
  File "/opt/jeremy/Demos/BeginnerAsyncIO/venv/lib/python3.8/site-packages/httpcore/_async/http11.py", line 193, in _response_closed
    async with self._state_lock:
  File "/opt/jeremy/Demos/BeginnerAsyncIO/venv/lib/python3.8/site-packages/httpcore/_synchronization.py", line 15, in __aenter__
    await self._lock.acquire()
  File "/opt/jeremy/Demos/BeginnerAsyncIO/venv/lib/python3.8/site-packages/anyio/_core/_synchronization.py", line 138, in acquire
    await cancel_shielded_checkpoint()
  File "/opt/jeremy/Demos/BeginnerAsyncIO/venv/lib/python3.8/site-packages/anyio/lowlevel.py", line 57, in cancel_shielded_checkpoint
    await get_asynclib().cancel_shielded_checkpoint()
  File "/opt/jeremy/Demos/BeginnerAsyncIO/venv/lib/python3.8/site-packages/anyio/_backends/_asyncio.py", line 464, in cancel_shielded_checkpoint
    await sleep(0)
  File "/usr/local/lib/python3.8/asyncio/tasks.py", line 644, in sleep
    await __sleep0()
  File "/usr/local/lib/python3.8/asyncio/tasks.py", line 638, in __sleep0
    yield
asyncio.exceptions.CancelledError

During handling of the above exception, another exception occurred:
jeremyschulman commented 2 years ago

I should also note that the Device class, referenced here: https://github.com/jeremyschulman/demo-beginner-concurrency/blob/devtest-httpx-510/demo_beginner_asyncio/find_macaddr.py#L12

Is a subclass of httpx.AsyncClient. https://github.com/jeremyschulman/aio-eapi/blob/main/aioeapi/device.py#L50

jeremyschulman commented 2 years ago

I believe the RuntimeError was introduced in this commit, part of 0.14.6: https://github.com/encode/httpcore/commit/7ddb4cabecb551ce6a79f7db5aa540f19530cb2b#diff-2c49ff24c06fb1f90e1cba7d18757d80f3197a92106e04ba77179497461d69aaR302

I am curious why this condition would need to result in a RultimeError vs. a non-exception log.

jeremyschulman commented 2 years ago

If you do feel strongly that this condition must be an exception, I would submit that it could be one defined httpcore._exceptions rather than a generic RuntimeException. This would enable a program to clearly discern this condition from other generic RuntimeError conditions. Thank you.

See my workaround here: https://github.com/jeremyschulman/demo-beginner-concurrency/blob/main/demo_beginner_asyncio/arista_eos.py#L173

tomchristie commented 2 years ago

The issue here isn't that we shouldn't be raising an exception, but that Python's asyncio currently makes it too easy to accidentally write incorrectly structured code.

I've written a bit more about that here, which you might(?) find helpful.

There's too much code in the examples you've linked to for me to dig through, but I'd assume that you're seeing an issue because you've some tasks sending requests, and also a task that the client was created in, and when you're cancelling everything all at once...

    for task in asyncio.all_tasks():
        task.cancel()

It ends up occurring in an arbitrary order, and the client can end up being closed before all the request/response tasks are closed.

Having task groups to properly scope task lifespans (see my link above) would almost certainly fix things. In the meantime you just need to be more careful about the order that you create and cancel tasks.

It's feasible that I'm wrong about that, and that there is an issue here, but I very much don't think that's the case, and I'd need to see a really simple demo example to convince me of it.

jeremyschulman commented 2 years ago

Hi @tomchristie - Thank you for looking into my issue and your reply. I can confirm that my program does have HTTP requests in-flight at the time of the program shutdown. While this is consistent with your analysis, I am struggling to cancel the outstanding tasks as you suggest.

I realize your time is not free for "AMA". You made a comment about "oddly (broken) code". In the network automation space, there are times where we will make requests to devices and then want to "discard" the requests that are in-flight because we found the answer we were looking for on another device. For the example I have it is "find a MAC address for a host, like a VOIP phone". I would very much like to understand the correct approach to write this code since it is such a common pattern.

I've re-crafted the code into what I believe is "not broken", but I am still getting the RuntimeError. I am hoping you can point me to the place where I could properly do the cancel. In the code that searches the network where the tasks list is created is a function called __device_find_host_macaddr. This function opens an httpx.AsyncClient in a context manager (which performs the aclose()).

Again, I really appreciate any guidance you can provide towards where I do the cancel as I am at a loss. I did read the resources you provided, but could not fathom a solution from them.

Thank you again! -- Jeremy

async def _search_network(
    inventory: List[str], macaddr: MacAddress, progressbar: Progress
) -> Optional[FindHostSearchResults]:
    """
    This function searches the network of the given inventory for the end-host
    with thive MAC address.  If the end-host is found then the results are
    retured in a "search results" dataclass. If not found, then return None.

    Parameters
    ----------
    inventory: List[str]
        The list of network devices to check.

    macaddr: MacAddress
        The end-host MAC addresss to locate

    progressbar: Progress
        A progress-bar CLI widget to indicate progress to the User.

    Returns
    -------
    Optional[FindHostSearchResults] - as described.
    """

    check_device_tasks = {
        asyncio.create_task(_device_find_host_macaddr(device=device, macaddr=macaddr))
        for device in inventory
    }

    found = None
    search_completed = asyncio.Event()

    pb_task = progressbar.add_task(
        description="Locating host", total=len(check_device_tasks)
    )

    def _on_done(done_task: asyncio.Task):
        nonlocal found
        progressbar.update(pb_task, advance=1)
        check_device_tasks.remove(done_task)

        with contextlib.suppress(asyncio.CancelledError):
            if _result := done_task.result():  # found macaddr on device edge-port
                found = _result
                search_completed.set()
            elif not check_device_tasks:  # done with all device-checks
                search_completed.set()

    for todo in check_device_tasks:
        todo.add_done_callback(_on_done)

    # wait for the search to be over, and if found, then cancel any of the
    # remaining check tasks.

    await search_completed.wait()

    for remainder in check_device_tasks:
        remainder.cancel()

    return found

And the called function that creates the httpx.AsyncClient:

async def _device_find_host_macaddr(
    device: str, macaddr: MacAddress
) -> Optional[FindHostSearchResults]:
    """
    This function examines a specific network device for the given end-host MAC
    address.  If the MAC address is found on a network "edge-port" then return
    the search results.  Otherwise, return None.

    Parameters
    ----------
    device: str
        The network device hostname

    macaddr: MacAddress
        The end-host MAC address

    Returns
    -------
    Optional[FindHostSearchResults] - as described.
    """

    # Device is a subclass of httpx.AsyncClient
    async with Device(host=device) as dev:

        # if the MAC address is not on this device, then return None.
        if not (interface := await dev.find_macaddr(macaddr)):
            return None

        # if the MAC address is found, but not on an edge-port, then return
        # None.
        if not await dev.is_edge_port(interface=interface):
            return None

        # If here, then the MAC address was found on this device on an edge-port.
        return FindHostSearchResults(device=device, interface=interface)