aio-libs / aiohttp

Asynchronous HTTP client/server framework for asyncio and Python
https://docs.aiohttp.org
Other
14.9k stars 1.99k forks source link

RuntimeError('Session is closed') #787

Closed fafhrd91 closed 8 years ago

fafhrd91 commented 8 years ago

conditions: Under high load on VM

i see two major problems, client should not close unowned connector. also traceback is not helpful at all.

this line closes session https://github.com/KeepSafe/aiohttp/blob/master/aiohttp/client.py#L580

glaslos commented 8 years ago

Same here:

Traceback (most recent call last):
  ...
    connector=self.stats_conn
  File "/usr/local/lib/python3.4/dist-packages/aiohttp/client.py", line 578, in __iter__
    return (yield from self._coro)
  File "/usr/local/lib/python3.4/dist-packages/aiohttp/client.py", line 150, in _request
    raise RuntimeError('Session is closed')
RuntimeError: Session is closed
asvetlov commented 8 years ago

At first I don't recommend to use bare functions like aiohttp.get() or aiohttp.post().

Second, the error happens only if internal exception occurs.

As workaround I'll suppress RuntimeError like shown in comments to raise original error.

glaslos commented 8 years ago

I am actually using:

r = yield from aiohttp.request('post', url, data=json.dumps(data), connector=self.stats_conn)
asvetlov commented 8 years ago

Use ClientSession as in the very first example (http://aiohttp.readthedocs.org/en/stable/client.html#make-a-request) and you'll never get the problem.

glaslos commented 8 years ago

Sharing a session between workers worked for me. Not sure why I was running into problems sharing a connection.

alesdakshanin commented 8 years ago

@asvetlov I'm still having this issue. Please note that I'm using ClientSession to make requests as you mentioned above.

I am writing some kind of stress-testing tool. It makes as much as possible HTTP requests. I'm trying to limit number of concurrent connections by using TCPConnector with limit parameter set, but my tasks fail with following error:

Task exception was never retrieved
future: <Task finished coro=<fetch() done, defined at ./test.py:11> exception=RuntimeError('Session is closed',)>
Traceback (most recent call last):
  File "/usr/lib/python3.5/asyncio/tasks.py", line 239, in _step
    result = coro.send(None)
  File "./test.py", line 13, in fetch
    async with session.get(url, allow_redirects=False) as response:
  File "/home/ales/.virtualenvs/aio-loadtest/lib/python3.5/site-packages/aiohttp/client.py", line 537, in __aenter__
    self._resp = yield from self._coro
  File "/home/ales/.virtualenvs/aio-loadtest/lib/python3.5/site-packages/aiohttp/client.py", line 151, in _request
    raise RuntimeError('Session is closed')
RuntimeError: Session is closed
Here's the **application code** ``` import asyncio from collections import namedtuple from aiohttp import ClientSession, Timeout, TCPConnector Config = namedtuple('Config', ['url', 'concurrency', 'duration', 'timeout']) async def fetch(url, *, connector, loop): async with ClientSession(loop=loop, connector=connector) as session: async with session.get(url, allow_redirects=False) as response: await response.read() async def run_test(config, *, loop): tasks_pack_size = max(10 * config.concurrency, 2000) connector = TCPConnector(limit=config.concurrency) tasks = [] try: with Timeout(config.duration, loop=loop): seconds = 0 while True: not_completed_tasks = [task for task in tasks if not task.done()] if len(not_completed_tasks) < tasks_pack_size: new_tasks = [loop.create_task(fetch(config.url, connector=connector, loop=loop)) for _ in range(tasks_pack_size)] tasks = not_completed_tasks + new_tasks seconds += 1 await asyncio.sleep(1) except asyncio.TimeoutError: print('Stopping') finally: print('Finish') def run(url, concurrency, duration, timeout): config = Config(url=url, concurrency=concurrency, duration=duration, timeout=timeout) loop = asyncio.get_event_loop() coro = run_test(config, loop=loop) future = asyncio.ensure_future(coro, loop=loop) print('Running test on {url}'.format(url=config.url)) try: loop.run_until_complete(future) except KeyboardInterrupt: loop.stop() print('Stopped') if __name__ == '__main__': run( url='http://127.0.0.1:8888/asdf', concurrency=25, duration=30, timeout=10 ) ```
And my **environment** $ pip freeze ``` aiohttp==0.22.2 chardet==2.3.0 multidict==1.2.1 wheel==0.26.0 ``` $ python --version `Python 3.5.2`
asvetlov commented 8 years ago

Session owns connector.

Try something like this:

import asyncio
from collections import namedtuple

from aiohttp import ClientSession, Timeout, TCPConnector

Config = namedtuple('Config', ['url', 'concurrency', 'duration', 'timeout'])

async def fetch(url, *, session):
    async with session.get(url, allow_redirects=False) as response:
        await response.read()

async def run_test(config, *, loop):
    tasks_pack_size = max(10 * config.concurrency, 2000)
    tasks = []
    connector = TCPConnector(limit=config.concurrency)
    async with ClientSession(loop=loop, connector=connector) as session:
        try:
            with Timeout(config.duration, loop=loop):
                seconds = 0
                while True:
                    not_completed_tasks = [task for task in tasks if not task.done()]

                    if len(not_completed_tasks) < tasks_pack_size:
                        new_tasks = [loop.create_task(fetch(config.url, session=session))
                                      for _ in range(tasks_pack_size)]
                        tasks = not_completed_tasks + new_tasks

                    seconds += 1
                    await asyncio.sleep(1)

        except asyncio.TimeoutError:
            print('Stopping')

        finally:
            print('Finish')

def run(url, concurrency, duration, timeout):
    config = Config(url=url, concurrency=concurrency, duration=duration, timeout=timeout)

    loop = asyncio.get_event_loop()

    print('Running test on {url}'.format(url=config.url))

    try:
        loop.run_until_complete(run_test(config, loop=loop))
    finally:
        loop.stop()
    print('Stopped')

if __name__ == '__main__':
    run(
        url='http://127.0.0.1:8888/asdf',
        concurrency=25,
        duration=30,
        timeout=10
    )

P.S. Using asyncio.Queue instead of tasks list will make the code more elegant I believe.

supersaeyan commented 7 years ago

I am trying to make something very similar @AlesDokshanin but i need to recreate session with every request as i am using socks connectors provided by aiosocks. I am also building a stress tester but with a geo demographic feature to simulate and analyze global traffic using public proxies. Since i am supporting http, socks5 and socks4 protocol, i had to use aiosocks and i have to pass the proxy connector at the ClientSession creation itself. How do i create multiple sessions asynchronously ?

import asyncio
from aiohttp import ClientSession
import time

async def fetch(i, url, session, proxy):
    async with session.get(url, proxy=proxy, timeout=60) as resp:
        print(i, resp.status)
        data = await resp.json()
        print(i, data['origin'])
        return data

async def bound_fetch(i, sem, url, session, proxy):
    # Getter function with semaphore.
    async with sem:
        await fetch(i, url, session, proxy)

async def run(r):
    url = "https://httpbin.org/get?show_env"
    tasks = []
    sem = asyncio.Semaphore(1000)
    proxy = "http://83.239.58.162:8080" # one proxy for example, will use a random proxy for every request

    async with ClientSession() as session:
        for i in range(r):
            # print(i)
            task = asyncio.ensure_future(bound_fetch(i, sem, url, session, proxy))
            tasks.append(task)

        responses = asyncio.gather(*tasks)
        await responses

t1 = time.time()
number = 10000
loop = asyncio.get_event_loop()

future = asyncio.ensure_future(run(number))
loop.run_until_complete(future)
print("TIME:",time.time() - t1)
`

@asvetlov This works for http proxies because http proxy is supported by aiohttp and can be passed directly to the session.request.

`
import asyncio
from aiohttp import ClientSession
import aiosocks
from aiosocks.connector import (
    proxy_connector, HttpProxyAddr
)
import time

async def fetch(i, url, conn):
    async with ClientSession(connector=conn) as session:
        async with session.get(url, timeout=60) as resp:
            print(i, resp.status)
            data = await resp.json()
            print(i, data['origin'])
            return data

async def bound_fetch(i, sem, url, conn):
    # Getter function with semaphore.
    async with sem:
        await fetch(i, url, conn)

async def run(r):
    url = "https://httpbin.org/get?show_env"
    tasks = []
    sem = asyncio.Semaphore(1000)

    # proxy = "http://83.239.58.162:8080"
    proxy = "127.0.0.1:1080"
    socks5_addr = proxy.split(':')[0]
    socks5_port = proxy.split(':')[1]
    conn = proxy_connector(aiosocks.Socks5Addr(socks5_addr, int(socks5_port)), remote_resolve=True, verify_ssl=False)

    for i in range(r):
        # print(i)
        task = asyncio.ensure_future(bound_fetch(i, sem, url, conn))
        tasks.append(task)

    responses = asyncio.gather(*tasks)
    await responses

t1 = time.time()
number = 10000
loop = asyncio.get_event_loop()

future = asyncio.ensure_future(run(number))
loop.run_until_complete(future)
print("TIME:",time.time() - t1)

This is what i am trying to do with socks5 and socks4 but sessions need to be created asynchronously for every proxy.

fafhrd91 commented 7 years ago

in aiohttp 2.0, session accept connector_owner parameter, if it is False, then session does not close connector. but from example I don't see reason why you need separate sessions. also in 2.0 you don't need to use Semaphore, Connector's limit parameter works like semaphore.

asfaltboy commented 5 years ago

Thanks @fafhrd91, so just to clarify, do the old examples we find online, which suggest to use semaphore to limit amount of open connections, are causing some sort of race-condition issue with the new implementation?

fafhrd91 commented 5 years ago

@asfaltboy sorry, I can not help. I don’t work with python anymore

lock[bot] commented 4 years ago

This thread has been automatically locked since there has not been any recent activity after it was closed. Please open a new issue for related bugs. If you feel like there's important points made in this discussion, please include those exceprts into that new issue.