encode / uvicorn

An ASGI web server, for Python. 🦄
https://www.uvicorn.org/
BSD 3-Clause "New" or "Revised" License
8.1k stars 701 forks source link

ContextVars pollution when uvicorn installed without [standard] extensions #2167

Open Kludex opened 7 months ago

Kludex commented 7 months ago

Discussed in https://github.com/encode/uvicorn/discussions/2044

Originally posted by **vladyslav-burylov** July 12, 2023 Hi team, we have discovered a weird ContextVars behaviour when uvicorn being installed without [standard] extensions. Can you please take a look - should this be considered as a bug or maybe you can help to explain why it happens? # Summary - Since uvicorn creates async task for each specific request, it is expected that any context vars set while processing would be isolated within a request scope - However, above works well with 'uvicorn[standard]' edition only # Workarounds - Use `uvicorn[standard]` instead of plain `uvicorn` (seems to be a good option) - Use ASGI `scope` to store request-bound data (not always possible: for example opentelemetry cloud tracing middleware stores current trace span inside context var and this logic cannot be changed) # Steps to reproduce ## Hardware/OS/tools - MacBook Pro M1 - macOS Ventura 13.3.1 - Python 3.10.9 installed via pyenv - uvicorn 0.22.0 ## Prepare environment ```sh mkdir test && cd test pyenv install 3.10.9 # unless already installed pyenv local 3.10.9 python --version # expected to print 3.10.9 python -m venv test source ./test/bin/activate pip install requests pip install 'uvicorn==0.22.0' ``` ## Prepare test scripts - create following files: ### client.py ```python import time import uuid import requests print("") print("Starting...") def main(): # Everything is good when payload is small huge_payload = "" for x in range(10000): huge_payload += f"{str(uuid.uuid4())}\n" # Send request each 1 second reusing single connection (without connection reuse it works well) with requests.Session() as session: while True: try: response = session.post('http://127.0.0.1:8002/test', huge_payload) print(response.text) # server will be responding with either 'OK' or 'FAIL' # In case if server reports 'FAIL' - we reproduced the issue and are OK to exit if response.text == "FAIL": break except KeyboardInterrupt: break except BaseException as ex: print(str(ex)) time.sleep(0.1) main() ``` ### server.py ```python import uuid import uvicorn from contextvars import ContextVar # This var expected to store some request-bound value. Here 'request_id' as example. request_id: ContextVar[str] = ContextVar("request_id", default="") # Minimalistic ASGI app which don't use any 3pt libraries class MyApp: async def __call__(self, scope, receive, send) -> None: # https://asgi.readthedocs.io/en/latest/specs/lifespan.html if scope["type"] == "lifespan": while True: message = await receive() if message['type'] == 'lifespan.startup': await send({'type': 'lifespan.startup.complete'}) elif message['type'] == 'lifespan.shutdown': await send({'type': 'lifespan.shutdown.complete'}) return print("") # request_id expected to be empty at this point: request has just started val = request_id.get() # Validate it and record unexpected behaviour if any failed = False if val: failed = True print("UNEXPECTED request_id value: ", val) # Generate new GUID and store it inside request_id for this request guid = str(uuid.uuid4()) token = request_id.set(guid) # Primitive request processing: reply with OK/FAIL body # FAIL means that bug has been reproduced (request_id has been polluted by some of previous request) try: print("(1) ", guid) # Debug logging await send({"type": "http.response.start", "status": 200, "headers": []}) print("(2) ", guid) # Debug logging await send({"type": "http.response.body", "body": b"FAIL" if failed else b"OK"}) print("(3) ", guid) # Debug logging # Request completed. # Reset request_id to an empty value finally: request_id.reset(token) print("(4) ", guid) # Debug logging # Simplistic server without any magic server = uvicorn.Server(config=uvicorn.Config( MyApp(), host="127.0.0.1", port=8002, workers=1, access_log=False, proxy_headers=False, server_header=False )) server.run() ``` ### Executing test 1. Start server ```sh # cd test source ./test/bin/activate python server.py ``` 2. Start client ```sh # cd test source ./test/bin/activate python client.py ``` ### Expected behaviour - No errors reported, both server and client running forever ### Actual behaviour - request_id context var value at the beginning of requests gets polluted with previous value eventually - server.py log ```txt INFO: Started server process [32392] INFO: Waiting for application startup. INFO: Application startup complete. INFO: Uvicorn running on http://127.0.0.1:8002 (Press CTRL+C to quit) (1) 57f92756-b50e-4627-824e-da5a93865316 (2) 57f92756-b50e-4627-824e-da5a93865316 (3) 57f92756-b50e-4627-824e-da5a93865316 (4) 57f92756-b50e-4627-824e-da5a93865316 UNEXPECTED request_id value: 57f92756-b50e-4627-824e-da5a93865316 (1) dbd3ce8a-65ec-4796-b829-6f8da4287c3e (2) dbd3ce8a-65ec-4796-b829-6f8da4287c3e (3) dbd3ce8a-65ec-4796-b829-6f8da4287c3e (4) dbd3ce8a-65ec-4796-b829-6f8da4287c3e ``` - client.py log ```txt Starting... OK FAIL ``` ### Notes - It seems to be occurring periodically (unstable behaviour) - constants within test script adjusted to give high replication probability - Test code uses requests library to make sample requests, however other client libraries give the same result (originally I have replicated it using much more sophisticated httpx.AsyncClient) - Works well with `uvicorn[standard]` - maybe it's happening because standard edition utilises `uvloop`?

[!IMPORTANT]

  • We're using Polar.sh so you can upvote and help fund this issue.
  • We receive the funding once the issue is completed & confirmed by you.
  • Thank you in advance for helping prioritize & fund our backlog.

Fund with Polar

simon-sk commented 7 months ago

This seems to be the same issue as in #2152

Kludex commented 6 months ago

I've confirmed it also fails with Python 3.12, and it doesn't reproduce with Hypercorn.

It only reproduces with asyncio.

@graingert Do you know whose's fault is it here?

xingdongzhe commented 3 weeks ago

@Kludex Hello, I am not reproduce on windows 10 with python3.10.9 & uvicorn==0.22.0 & requests==2.31.0 I use pip install uvicorn==0.22.0 requests==2.31.0

And I reproduce this on Linux ubuntu 6.5.0-35-generic #35~22.04.1-Ubuntu SMP PREEMPT_DYNAMIC Tue May 7 09:00:52 UTC 2 x86_64 x86_64 x86_64 GNU/Linux, and fix with pip install uvicorn[standard]==0.22.0

updated: 2024-06-09

env

  • OS: Ubuntu 22.04
  • Python: 3.7, 3.8, 3.10 I can reproduce this for above three python versions and fixed with pip install uvloop And I found that It can reproduce with POST body's payload length > 65536 when second request, here new client.py We just make payload length start with 65531
    
    import time
    import requests

print("") print("Starting...")

def main(payload_len):

Everything is good when payload is small

huge_payload = "a" * payload_len

# Send request each 1 second reusing single connection (without connection reuse it works well)
with requests.Session() as session:
    # while True:
    for i in range(2):
        try:
            response = session.post('http://127.0.0.1:8002/test', huge_payload)
            print(f"payload_len:{payload_len}", response.text)  # server will be responding with either 'OK' or 'FAIL'

            # In case if server reports 'FAIL' - we reproduced the issue and are OK to exit
            if response.text == "FAIL":
                break
        except KeyboardInterrupt:
            break

        except BaseException as ex:
            print(str(ex))

        time.sleep(0.1)

if name == "main": payload_len = 65531 for n in range(payload_len, payload_len + 10): main(n)

### why 65536(64KB)?
I grep 65536 in uvicorn code, found that:
When body payload greater than 65536, it calls `pause_reading` & `resume_reading`
When calls  `pause_reading`, something broken
```python
# uvicorn/protocols/http/flow_control.py
# Line 15
HIGH_WATER_LIMIT = 65536

# uvicorn/protocols/http/h11_impl.py
# Line 280
def handle_events(self) -> None:
    elif event_type is h11.Data:
        if self.conn.our_state is h11.DONE:
            continue
        print(f"{id(self)=} {len(event.data)=}")
        self.cycle.body += event.data
        if len(self.cycle.body) > HIGH_WATER_LIMIT:
            self.flow.pause_reading()
        self.cycle.message_event.set()

What happens in asyncio?

when calls self.flow.pause_reading():

self.flow.pause_reading()
    self._transport.pause_reading()

Know that at Linux, asyncio use _UnixSelectorEventLoop and _SelectorSocketTransport And I let _SelectorSocketTransport. pause_reading & _SelectorSocketTransport.resume_reading do nothing, it fixed. More than that, I remove self._loop._remove_reader(self._sock_fd) and self._add_reader(self._sock_fd, self._read_ready) in above _SelectorSocketTransport functions, it fixed

# asyncio/select_events.py
# Line 802-816
def pause_reading(self):
    if self._closing or self._paused:
        return
    self._paused = True
    #self._loop._remove_reader(self._sock_fd)
    if self._loop.get_debug():
        logger.debug("%r pauses reading", self)

def resume_reading(self):
    if self._closing or not self._paused:
        return
    self._paused = False
    #self._add_reader(self._sock_fd, self._read_ready)
    if self._loop.get_debug():
        logger.debug("%r resumes reading", self)

I don't know why this fix, anyone can tell?

update: 2024-06-11

env

  • OS: Ubuntu 22.04
  • Python: 3.8
  • aiohttp: 3.6.2
  • requests: 2.31.0 It will not reproduce when not use requests.Session. And aiohttp will fail some time(50/1000)
    
    import time
    import requests

print("") print("Starting...")

def main(payload_len):

Everything is good when payload is small

huge_payload = "a" * payload_len

# Send request each 1 second reusing single connection (without connection reuse it works well)
for i in range(2):
    try:
        response = requests.post('http://127.0.0.1:8002/test', huge_payload)
        print(f"payload_len:{payload_len}",
              response.text)  # server will be responding with either 'OK' or 'FAIL'

        # In case if server reports 'FAIL' - we reproduced the issue and are OK to exit
        if response.text == "FAIL":
            break
    except KeyboardInterrupt:
        break

    except BaseException as ex:
        print(str(ex))

    time.sleep(0)

if name == "main": payload_len = 65537 for n in range(payload_len, payload_len + 1000): main(n)