Closed frederikaalund closed 8 years ago
Hi, I had a look at this and this one looks tricky. RxPY looks to be disposing correctly, but there seems to be some impedance mismatch between asyncio task created by ensure_future
and the aiohttp session, thus cancelling the task will not cancel the request and vice versa. I tried a different approach by creating the aiohttp session manually and attaching it to the lifetime of the subscription with the using()
operator:
#!/usr/bin/env python
import asyncio
import aiohttp
from rx import Observable
from rx.concurrency import AsyncIOScheduler
loop = asyncio.get_event_loop()
loop.set_debug(True)
scheduler = AsyncIOScheduler(loop)
class Resource(object):
def __init__(self):
loop = asyncio.get_event_loop()
self.session = aiohttp.ClientSession(loop=loop)
def dispose(self):
self.session.close()
def get_observable(_, __):
def resource_factory():
return Resource()
def observable_factory(resource):
future = asyncio.ensure_future(resource.session.get('http://lipsum.com'))
return Observable.from_future(future)
return Observable.using(resource_factory, observable_factory)
def result_mapper(observable, result, i):
result.close()
return result
sub = Observable.interval(2000, scheduler=scheduler) \
.flat_map(get_observable, result_mapper) \
.subscribe(lambda x: print('on_next:', x),
lambda e: print('on_error:', e),
lambda: print('on_completed'))
try:
loop.run_forever()
except KeyboardInterrupt:
sub.dispose()
loop.close()
Does this work better for you?
Thanks for the detailed response! Unfortunately, the error still occurs with the program that you provided. I appreciate the effort, though. I also learnt about the using
operator. That might come in handy!
[...] but there seems to be some impedance mismatch between asyncio task created by ensure_future and the aiohttp session, thus cancelling the task will not cancel the request and vice versa.
Seemingly so. I'll see about creating an issue there.
FWIW I ended up ditching flat_map
ing the get request in favor of sending the request as part of the subscription. Perhaps this is also more idiomatic since the get_observable
isn't a pure function (performing the get request is really a side effect). I don't know.
Here is what I use so far:
#!/usr/bin/env python
import asyncio
import aiohttp
from rx import Observable
from rx.concurrency import AsyncIOScheduler
loop = asyncio.get_event_loop()
loop.set_debug(True)
scheduler = AsyncIOScheduler(loop)
session = aiohttp.ClientSession(loop=loop)
requests = []
max_simultaneous_requests = 10
async def get():
async with session.get('http://lipsum.com') as response:
return response
def try_get(x):
if max_simultaneous_requests <= len(requests):
return
def done(request):
try:
result = request.result()
print(result)
except Exception as error:
print('error!', error)
requests.remove(request)
request = asyncio.ensure_future(get())
request.add_done_callback(done)
requests.append(request)
sub = Observable.interval(1000, scheduler=scheduler) \
.subscribe(try_get,
lambda e: print('on_error:', e),
lambda: print('on_completed'))
try:
loop.run_forever()
except KeyboardInterrupt:
for request in requests:
request.cancel()
close = asyncio.ensure_future(session.close())
loop.run_until_complete(close)
loop.close()
It works without errors. It is a little tedious to manually track the requests but I can't come up with something nicer.
Thanks again for your efforts!
Side note: I found out that you're supposed to do await result.release()
instead of result.close()
. However, this did not fix the problem.
This thread has been automatically locked since there has not been any recent activity after it was closed. Please open a new issue for related bugs.
The error occurs in the following program.
Every second, a simple get request is sent (default is to lipsum.com). This works as expected. Should you signal the program via a keyboard interrupt, it terminates without error.
However, an error occurs if you follow these steps:
When the process exits, it gives an error:
I thought that when a future is wrapped in an observable, said observable cancels the future when the corresponding subscription is disposed. Apparently, a future is leaked somehow and never cancelled.
Do note that I've been unable to simplify this further. The error only happens with asynchronous Aiohttp requests. I can't reproduce the error using plain asyncio (and no RxPY). Only when using RxPY+Aiohttp does it happen. Very strange.
Python 3.5.1. RxPY commit e9f5cecc52b9d236cbe57bd40f724c8faa2882dc. Aiohttp 0.22.5.