python-hyper / hyper

HTTP/2 for Python.
http://hyper.rtfd.org/en/latest/
MIT License
1.05k stars 191 forks source link

TooManyStreamsError when tried with multiple requests #371

Closed cheburakshu closed 6 years ago

cheburakshu commented 6 years ago

ENV:

Python 3.5.2 (default, Nov 23 2017, 16:37:01) 
[GCC 5.4.0 20160609] on linux
>>> hyper.__version__
'0.7.0'

If I create a connection, create 101 requests without reading a response, it aborts with TooManyStreamsError. But, if I redo it by reading the response also, the error does not occur.

Questions:

  1. Is there a reason behind it working when the response is read?

Standalone client: client.py

from hyper import HTTPConnection
from scrapy.selector import Selector
from functools import lru_cache
import asyncio

class hyper(object):

    def __init__(self,*args,**kwargs):
        pass

    @lru_cache(maxsize=2048)
    def connection(self,*args,**kwargs):
        host=kwargs.get('host')
        port=kwargs.get('port')
        conn = HTTPConnection('{}:{}'.format(host,port))
        print('new connection')
        return conn

    async def request(self,*args,**kwargs):
        connection = kwargs.get('connection')
        method = kwargs.get('method')
        path = kwargs.get('path')
        body = kwargs.get('body')
        connection.request(method,path)

    async def get_response(self,*args,**kwargs):
        connection = kwargs.get('connection')
        return connection.get_response().read()

def play():
    h2=hyper()
    tasks=[]
    loop=asyncio.get_event_loop()
    conn = h2.connection(**{'host':'google.com','port':'443'})
    for _ in range(0,101):
        req = h2.request(**{'connection':conn,'method':'GET','path':'/'})
        #res = h2.get_response(**{'connection':conn})
        tasks.append(asyncio.ensure_future(req))
        #tasks.append(asyncio.ensure_future(res))
    results=loop.run_until_complete(asyncio.gather(*tasks,return_exceptions=False))
    print(results)

play()

If the above code is executed, the below error results:

Error:

cheburakshu@instance-4:~$ python3 client.py 
new connection
Traceback (most recent call last):
  File "client.py", line 43, in <module>
    play()
  File "client.py", line 40, in play
    results=loop.run_until_complete(asyncio.gather(*tasks,return_exceptions=False))
  File "/usr/lib/python3.5/asyncio/base_events.py", line 387, in run_until_complete
    return future.result()
  File "/usr/lib/python3.5/asyncio/futures.py", line 274, in result
    raise self._exception
  File "/usr/lib/python3.5/asyncio/tasks.py", line 239, in _step
    result = coro.send(None)
  File "client.py", line 24, in request
    connection.request(method,path)
  File "/home/cheburakshu/.local/lib/python3.5/site-packages/hyper/common/connection.py", line 103, in request
    method=method, url=url, body=body, headers=headers
  File "/home/cheburakshu/.local/lib/python3.5/site-packages/hyper/http20/connection.py", line 281, in request
    self.endheaders(message_body=body, final=True, stream_id=stream_id)
  File "/home/cheburakshu/.local/lib/python3.5/site-packages/hyper/http20/connection.py", line 555, in endheaders
    stream.send_headers(headers_only)
  File "/home/cheburakshu/.local/lib/python3.5/site-packages/hyper/http20/stream.py", line 98, in send_headers
    conn.send_headers(self.stream_id, headers, end_stream)
  File "/home/cheburakshu/.local/lib/python3.5/site-packages/h2/connection.py", line 836, in send_headers
    (max_open_streams, self.open_outbound_streams)
h2.exceptions.TooManyStreamsError: Max outbound streams is 100, 100 open

But, if the same is done with responses read, it works, even if the no. of requests is raised to 1000!

from hyper import HTTPConnection
from scrapy.selector import Selector
from functools import lru_cache
import asyncio

class hyper(object):

    def __init__(self,*args,**kwargs):
        pass

    @lru_cache(maxsize=2048)
    def connection(self,*args,**kwargs):
        host=kwargs.get('host')
        port=kwargs.get('port')
        conn = HTTPConnection('{}:{}'.format(host,port))
        print('new connection')
        return conn

    async def request(self,*args,**kwargs):
        connection = kwargs.get('connection')
        method = kwargs.get('method')
        path = kwargs.get('path')
        body = kwargs.get('body')
        connection.request(method,path)

    async def get_response(self,*args,**kwargs):
        connection = kwargs.get('connection')
        return connection.get_response().read()

def play():
    h2=hyper()
    tasks=[]
    loop=asyncio.get_event_loop()
    conn = h2.connection(**{'host':'google.com','port':'443'})
    for _ in range(0,101):
        req = h2.request(**{'connection':conn,'method':'GET','path':'/'})
        res = h2.get_response(**{'connection':conn})
        tasks.append(asyncio.ensure_future(req))
        tasks.append(asyncio.ensure_future(res))
    results=loop.run_until_complete(asyncio.gather(*tasks,return_exceptions=False))
    print(results)

play()
Lukasa commented 6 years ago

It's highly likely that the responses contain enough data that if you aren't reading them the streams don't complete. In general you need to be reading responses to ensure that the flow control window moves appropriately.

cheburakshu commented 6 years ago

It is because of h2 expecting the global no. of streams to be less than 100.

https://github.com/Lukasa/hyper/blob/development/hyper/http20/response.py#L143

The stream is closed after reading a response for the current request. Is there a reason for doing so? So, this ensures the no. of active streams never exceed the specified global 100, but incurs heavy overhead when additonal requests create streams instead of re-using the existing ones.

        # We're at the end, close the connection.
        if response_complete:
            self.close()

New streams are created for each request and it is behind a lock and adds overhead for request:

https://github.com/Lukasa/hyper/blob/development/hyper/http20/connection.py#L600

    def _new_stream(self, stream_id=None, local_closed=False):
        """
        Returns a new stream object for this connection.
        """
        # Concurrency
        #
        # Hold _lock: ensure that threads accessing the connection see
        # self.next_stream_id in a consistent state
        #
        # No I/O occurs, the delay in waiting threads depends on their number.
        with self._lock:
            s = Stream(
                stream_id or self.next_stream_id,
                self.__wm_class(DEFAULT_WINDOW_SIZE),
                self._conn,
                self._send_outstanding_data,
                self._recv_cb,
                self._stream_close_cb,
            )
            s.local_closed = local_closed
            self.streams[s.stream_id] = s
            self.next_stream_id += 2

            return s