encode / httpx

A next generation HTTP client for Python. 🦋
https://www.python-httpx.org/
BSD 3-Clause "New" or "Revised" License
13.26k stars 840 forks source link

Wrong response with pipelining #1167

Closed qo4on closed 4 years ago

qo4on commented 4 years ago

I'm getting unique file descriptors fd for each file with this code:

{'result': 0, 'fd': 1, 'fileid': 91794101}
{'result': 0, 'fd': 2, 'fileid': 91794132}
{'result': 0, 'fd': 3, 'fileid': 91794134}

When I do the same using httpx I'm getting incorrect response 'fd': 1 for all the files:

async_client = httpx.AsyncClient()
resp = await asyncio.gather(
    *[async_client.get(file['url'], params=file['params'], allow_redirects=True) for file in files],
    return_exceptions=True
)
{'result': 0, 'fd': 1, 'fileid': 91794101}
{'result': 0, 'fd': 1, 'fileid': 91794132}
{'result': 0, 'fd': 1, 'fileid': 91794134}

The same incorrect response usually happens when I send requests in threads. Can I tune httpx to get correct unique fd's?

tomchristie commented 4 years ago

Thanks for raising this. We'd need to be able to replicate the issue in order to dig into it.

tomchristie commented 4 years ago

From reading through this I think it's likely that your issue here is that the server is responding with sequentially updating IDs, but you're issuing multiple requests in parallel, and getting the same result for each.

You probably actually want to either send the requests sequentially...

for file in files:
    await client.get(...)

Or limit the connection pool so that it doesn't establish multiple connections to the same server...

client = httpx.AsyncClient(limits=httpx.Limits(max_connections=1))
...

Note that we don't use HTTP pipelining (sending multiple requests before the first has completed) since it's widely become seen as a design mistake in HTTP, and is now recommended against.

But the second case will ensure that you'll end up with just a single connection established, and each request sent strictly after one another on it.

qo4on commented 4 years ago

@tomchristie Thank you for your answer. I managed to get correct response with dugong only. I did not try aiohttp.

server is responding with sequentially updating IDs

You are right, this is how pcloud api was designed. Using single TCP connection all async requests/packets must keep the order.

With await client.get(...) I get no benefit in speed compare to usual requests. httpx.Limits(max_connections=1) returns errors.

I made a test pcloud account, to replicate copy paste it to Colab.

!pip install -q git+https://github.com/python-dugong/python-dugong.git > /dev/null
!pip install -q git+https://github.com/tomgross/pycloud.git > /dev/null
!pip install -q -U nest-asyncio uvloop > /dev/null

import nest_asyncio   # for IPython
nest_asyncio.apply()  # for IPython
import sys
import ssl
import time
import json
import asyncio
import atexit
from pprint import pprint
from pcloud import PyCloud
from natsort import natsorted
from urllib.parse import urlparse, urlencode
from dugong import HTTPConnection, AioFuture

def ts(start=0):
    return round(time.monotonic() - start, 3)

def prepare_files(items):
    files = []

    for item in items:
        if all([
            isinstance(item, dict),
            'isfolder' in item,
            item['isfolder'] is False,
            int(item.get('size', 0)) > 0
        ]):
            files.append({
                'name': item['name'],
                'fileid': item['fileid'],
                'size': item['size'],
                'method': 'file_open',
                'params': {
                    'auth': pc.auth_token,
                    'path': f"/{item['name']}",
                    'flags': int('0x0040', 16)
                }
            })
    return files

class PC(PyCloud):
    endpoint = "https://eapi.pcloud.com/"

    def __init__(self, username, password):
        super().__init__(username, password)

if sys.platform == 'win32':
    asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
else:
    import uvloop
    uvloop.install()

pc = PC('atisek@yandex.com', 'sd7UY3t5m')

loop = asyncio.get_event_loop()
atexit.register(loop.close)

ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
ssl_context.verify_mode = ssl.CERT_REQUIRED
ssl_context.check_hostname = True
ssl_context.load_default_certs()

items = pc.listfolder(folderid=0)['metadata']['contents']
items = natsorted(items, key=lambda dic: dic['name'])
path_list = prepare_files(items)

start = time.monotonic()
with HTTPConnection(urlparse(pc.endpoint).netloc, ssl_context=ssl_context) as conn:
    # This generator function returns a coroutine that sends
    # all the requests.
    def send_requests():
        for path in path_list:
            yield from conn.co_send_request('GET', f"/{path['method']}?" + urlencode(path['params']))

    # This generator function returns a coroutine that reads
    # all the responses
    def read_responses():
        bodies = []
        for path in path_list:
            resp = yield from conn.co_read_response()
            assert resp.status == 200
            buf = yield from conn.co_readall()
            bodies.append(buf)
        return bodies

    # Create the coroutines
    send_crt = send_requests()
    recv_crt = read_responses()

    # Register the coroutines with the event loop
    AioFuture(send_crt, loop=loop)
    recv_future = AioFuture(recv_crt, loop=loop)

    # Run the event loop until the receive coroutine is done (which
    # implies that all the requests must have been sent as well):
    loop.run_until_complete(recv_future)

    # Get the result returned by the coroutine
    bodies = recv_future.result()

for r in bodies:
    print(json.loads(r))

print(f'{ts(start)}\tDone!')
pc.logout()

{'result': 0, 'fd': 1, 'fileid': 128105066}
{'result': 0, 'fd': 2, 'fileid': 128105067}
...
{'result': 0, 'fd': 59, 'fileid': 128105283}
{'result': 0, 'fd': 60, 'fileid': 128105285}
0.725   Done!
!pip install -q -U httpx[http2] > /dev/null
import httpx

pc = PC('atisek@yandex.com', 'sd7UY3t5m')

# async_client = httpx.AsyncClient()    # Returns fd = 1 for each file.
async_client = httpx.AsyncClient(limits=httpx.Limits(max_connections=1))
loop = asyncio.get_event_loop()

items = pc.listfolder(folderid=0)['metadata']['contents']
items = natsorted(items, key=lambda dic: dic['name'])
path_list = prepare_files(items)

start = time.monotonic()

async def async_request(path_list):
    resp = await asyncio.gather(
        *[async_client.get(pc.endpoint + path['method'], params=path['params'], allow_redirects=True) for path in path_list],
        return_exceptions=True
    )

    await async_client.aclose()
    return resp

fds = loop.run_until_complete(async_request(path_list))
for r in fds:
    print(r.json())

print(f'{ts(start)}\tDone!')
pc.logout()

{'result': 0, 'fd': 28, 'fileid': 128105066}
{'result': 0, 'fd': 25, 'fileid': 128105067}
{'result': 0, 'fd': 19, 'fileid': 128105074}
{'result': 0, 'fd': 16, 'fileid': 128105075}
{'result': 0, 'fd': 20, 'fileid': 128105080}
{'result': 0, 'fd': 2, 'fileid': 128105081}
{'result': 0, 'fd': 5, 'fileid': 128105086}
---------------------------------------------------------------------------
AttributeError                            Traceback (most recent call last)
<ipython-input-4-2c84585fc0e4> in <module>()
     14 fds = loop.run_until_complete(async_request(path_list))
     15 for r in fds:
---> 16     print(r.json())
     17 
     18 print(f'{ts(start)}\tDone!')

AttributeError: 'PoolTimeout' object has no attribute 'json'
tomchristie commented 4 years ago

You'll probably also want to disable the pool timeout if you're issuing lots of requests, all over a single HTTP/1.1 connection.

limits=httpx.Limits(max_connections=1), timeout=httpx.Timeout(5.0, pool=None)

Tho honestly, I'd just do this sequentially. You're not really getting loads of benefit from trying to do something in parallel when it's actually all over a single connection.

qo4on commented 4 years ago

In my view its 3-5 times faster when it works. The problem with dugong is that it works unstable.

limits=httpx.Limits(max_connections=1), timeout=httpx.Timeout(5.0, pool=None)

This does not fix the error. It looks like I will not get stable results with pipelining.