theelous3 / asks

Async requests-like httplib for python.
MIT License
508 stars 63 forks source link

Possible bug in Session._checked_out_sockets #55

Closed dd-dent closed 6 years ago

dd-dent commented 6 years ago

Working on an async API downloader for work, my current crack at it uses asks and trio.

Randomly, the following statement:

response = await self._session.get(params=payload, timeout=300)

could inspire this:

...(Loads of random stuff...)
File "~\code\shopstyle-plugin\src\shopstyle_plugin\downloader.py", line 110, in _safe_fetch
    response = await self._session.get(params=payload, timeout=3000)
File "~\code\shopstyle-plugin\venv\lib\site-packages\asks\sessions.py", line 171, in request
    await self._replace_connection(sock)
File "~\code\shopstyle-plugin\venv\lib\site-packages\asks\sessions.py", line 255, in _replace_connection
    self._checked_out_sockets.remove(sock)
ValueError: deque.remove(x): x not in deque

I only went as far out to see what deque was problematic, which is the field mentioned in the title, which in turn is an instance of SocketQ(deque).
So... am I looking at a bug or misuse here?

carlbordum commented 6 years ago

Hi dd-dent!

I think we need a bit more context here. What does _session_get do?

theelous3 commented 6 years ago

Hi. Can you post a full traceback? Junk included :)

Also, what version of asks?

dd-dent commented 6 years ago

Forgot to mention, using pycharm on win 10 with python 3.6.4 @Zaab1t said session is an instance of asks.sessions.Session passed to an instance of my own downloader class. the session.get call is wrapped in a function meant to handle various exceptions and provide some basic retry logic based on this.
I query the shopstyle api (specifics here).
I make lots of get requests for every run of the script (more than 36k the most so far), and I just became aware that my internet connection is incredibly unreliable.

here's a sample query(note I omitted the pid key which is my API key for obvious reasons):
http://api.shopstyle.co.uk/api/v2/products?fl=r196&cat=womens-sneakers&fts=Canvas&pid=*OMITTED*&limit=50&sort=Popular

@theelous3 I'm using asks 1.3.9 and trio 0.3.0
Also, in case you missed it, here's the whole traceback, complete with junk, sans some paths :)

Traceback (most recent call last):
  File "~/code/PycharmProjects/shopstyle-plugin/src/shopstyle_plugin/plugin.py", line 68, in <module>
    main()
  File "~\code\shopstyle-plugin\venv\lib\site-packages\click\core.py", line 722, in __call__
    return self.main(*args, **kwargs)
  File "~\code\shopstyle-plugin\venv\lib\site-packages\click\core.py", line 697, in main
    rv = self.invoke(ctx)
  File "~\code\shopstyle-plugin\venv\lib\site-packages\click\core.py", line 895, in invoke
    return ctx.invoke(self.callback, **ctx.params)
  File "~\code\shopstyle-plugin\venv\lib\site-packages\click\core.py", line 535, in invoke
    return callback(*args, **kwargs)
  File "~/code/PycharmProjects/shopstyle-plugin/src/shopstyle_plugin/plugin.py", line 49, in main
    trio.run(start_run, data, parser, session)
  File "~\code\shopstyle-plugin\venv\lib\site-packages\trio\_core\_run.py", line 1225, in run
    return result.unwrap()
  File "~\code\shopstyle-plugin\venv\lib\site-packages\trio\_core\_result.py", line 119, in unwrap
    raise self.error
  File "~\code\shopstyle-plugin\venv\lib\site-packages\trio\_core\_run.py", line 1334, in run_impl
    msg = task.coro.send(next_send)
  File "~\code\shopstyle-plugin\venv\lib\site-packages\trio\_core\_run.py", line 923, in init
    self.entry_queue.spawn()
  File "~\code\shopstyle-plugin\venv\lib\site-packages\trio\_util.py", line 109, in __aexit__
    await self._agen.asend(None)
  File "~\code\shopstyle-plugin\venv\lib\site-packages\async_generator\_impl.py", line 274, in asend
    return await self._do_it(self._it.send, value)
  File "~\code\shopstyle-plugin\venv\lib\site-packages\async_generator\_impl.py", line 290, in _do_it
    return await ANextIter(self._it, start_fn, *args)
  File "~\code\shopstyle-plugin\venv\lib\site-packages\async_generator\_impl.py", line 202, in send
    return self._invoke(self._it.send, value)
  File "~\code\shopstyle-plugin\venv\lib\site-packages\async_generator\_impl.py", line 209, in _invoke
    result = fn(*args)
  File "~\code\shopstyle-plugin\venv\lib\site-packages\trio\_core\_run.py", line 318, in open_nursery
    await nursery._nested_child_finished(nested_child_exc)
  File "...\Python36\Lib\contextlib.py", line 99, in __exit__
    self.gen.throw(type, value, traceback)
  File "~\code\shopstyle-plugin\venv\lib\site-packages\trio\_core\_run.py", line 203, in open_cancel_scope
    yield scope
  File "~\code\shopstyle-plugin\venv\lib\site-packages\trio\_core\_multierror.py", line 144, in __exit__
    raise filtered_exc
  File "~\code\shopstyle-plugin\venv\lib\site-packages\trio\_core\_run.py", line 203, in open_cancel_scope
    yield scope
  File "~\code\shopstyle-plugin\venv\lib\site-packages\trio\_core\_run.py", line 318, in open_nursery
    await nursery._nested_child_finished(nested_child_exc)
  File "~\code\shopstyle-plugin\venv\lib\site-packages\trio\_core\_run.py", line 427, in _nested_child_finished
    raise MultiError(self._pending_excs)
  File "~\code\shopstyle-plugin\venv\lib\site-packages\trio\_core\_run.py", line 1334, in run_impl
    msg = task.coro.send(next_send)
  File "~/code/PycharmProjects/shopstyle-plugin/src/shopstyle_plugin/plugin.py", line 58, in start_run
    nursery.start_soon(downloader.run, row, idx)
  File "~\code\shopstyle-plugin\venv\lib\site-packages\trio\_util.py", line 109, in __aexit__
    await self._agen.asend(None)
  File "~\code\shopstyle-plugin\venv\lib\site-packages\async_generator\_impl.py", line 274, in asend
    return await self._do_it(self._it.send, value)
  File "~\code\shopstyle-plugin\venv\lib\site-packages\async_generator\_impl.py", line 290, in _do_it
    return await ANextIter(self._it, start_fn, *args)
  File "~\code\shopstyle-plugin\venv\lib\site-packages\async_generator\_impl.py", line 202, in send
    return self._invoke(self._it.send, value)
  File "~\code\shopstyle-plugin\venv\lib\site-packages\async_generator\_impl.py", line 209, in _invoke
    result = fn(*args)
  File "~\code\shopstyle-plugin\venv\lib\site-packages\trio\_core\_run.py", line 318, in open_nursery
    await nursery._nested_child_finished(nested_child_exc)
  File "...\Python36\Lib\contextlib.py", line 99, in __exit__
    self.gen.throw(type, value, traceback)
  File "~\code\shopstyle-plugin\venv\lib\site-packages\trio\_core\_run.py", line 203, in open_cancel_scope
    yield scope
  File "~\code\shopstyle-plugin\venv\lib\site-packages\trio\_core\_multierror.py", line 144, in __exit__
    raise filtered_exc
  File "~\code\shopstyle-plugin\venv\lib\site-packages\trio\_core\_run.py", line 203, in open_cancel_scope
    yield scope
  File "~\code\shopstyle-plugin\venv\lib\site-packages\trio\_core\_run.py", line 318, in open_nursery
    await nursery._nested_child_finished(nested_child_exc)
  File "~\code\shopstyle-plugin\venv\lib\site-packages\trio\_core\_run.py", line 427, in _nested_child_finished
    raise MultiError(self._pending_excs)
  File "~\code\shopstyle-plugin\venv\lib\site-packages\trio\_core\_run.py", line 1334, in run_impl
    msg = task.coro.send(next_send)
  File "~\code\shopstyle-plugin\src\shopstyle_plugin\downloader.py", line 60, in run
    await self._price_break(row, nursery)
  File "~\code\shopstyle-plugin\venv\lib\site-packages\trio\_util.py", line 109, in __aexit__
    await self._agen.asend(None)
  File "~\code\shopstyle-plugin\venv\lib\site-packages\async_generator\_impl.py", line 274, in asend
    return await self._do_it(self._it.send, value)
  File "~\code\shopstyle-plugin\venv\lib\site-packages\async_generator\_impl.py", line 290, in _do_it
    return await ANextIter(self._it, start_fn, *args)
  File "~\code\shopstyle-plugin\venv\lib\site-packages\async_generator\_impl.py", line 202, in send
    return self._invoke(self._it.send, value)
  File "~\code\shopstyle-plugin\venv\lib\site-packages\async_generator\_impl.py", line 209, in _invoke
    result = fn(*args)
  File "~\code\shopstyle-plugin\venv\lib\site-packages\trio\_core\_run.py", line 318, in open_nursery
    await nursery._nested_child_finished(nested_child_exc)
  File "...\Python36\Lib\contextlib.py", line 99, in __exit__
    self.gen.throw(type, value, traceback)
  File "~\code\shopstyle-plugin\venv\lib\site-packages\trio\_core\_run.py", line 203, in open_cancel_scope
    yield scope
  File "~\code\shopstyle-plugin\venv\lib\site-packages\trio\_core\_multierror.py", line 144, in __exit__
    raise filtered_exc
  File "~\code\shopstyle-plugin\venv\lib\site-packages\trio\_core\_run.py", line 203, in open_cancel_scope
    yield scope
  File "~\code\shopstyle-plugin\venv\lib\site-packages\trio\_core\_run.py", line 318, in open_nursery
    await nursery._nested_child_finished(nested_child_exc)
  File "~\code\shopstyle-plugin\venv\lib\site-packages\trio\_core\_run.py", line 427, in _nested_child_finished
    raise MultiError(self._pending_excs)
  File "~\code\shopstyle-plugin\venv\lib\site-packages\trio\_core\_run.py", line 1334, in run_impl
    msg = task.coro.send(next_send)
  File "~\code\shopstyle-plugin\src\shopstyle_plugin\downloader.py", line 87, in _extract_rows
    response = await self._safe_fetch(payload, row['idx'])
  File "~\code\shopstyle-plugin\src\shopstyle_plugin\downloader.py", line 110, in _safe_fetch
    response = await self._session.get(params=payload, timeout=3000)
  File "~\code\shopstyle-plugin\venv\lib\site-packages\asks\sessions.py", line 171, in request
    await self._replace_connection(sock)
  File "~\code\shopstyle-plugin\venv\lib\site-packages\asks\sessions.py", line 255, in _replace_connection
    self._checked_out_sockets.remove(sock)
ValueError: deque.remove(x): x not in deque

I appreciate attention, let me know if anything else is needed and I'll try to reply ASAP (a bit busy for the next few hours)

theelous3 commented 6 years ago

Thanks! I was in work earlier so couldn't take a proper look. Will poke around it this evening and / or tomorrow evening.

theelous3 commented 6 years ago

By the way, are you using any stream sockets? Like r = await session.get(url, stream=True)

Side note: am working on retries and such in a local branch at the moment, so you won't need to implement it yourself soon :)

dd-dent commented 6 years ago

Nope, just the regular response objects from a shared session instance.

I'll note that this particular exception was thrown under rather extreme circumstances:
Very high connection limit (from 500 to around 2000), hundreds to thousands of pending requests and\or corutines, the API server severing the connection causing (mostly caught) exceptions to be thrown like it's the end of the world along with a suddenly sketchy internet service (random disconnects, dropped packets etc...) that the ISP can't seem to fix...

Needless to say, that's not normal circumstances, and I did refactor a bit since then, with connections now limited to 20 and a trio.CapacityLimiter further throttling things down so I'll have no more than several hundred requests pending\running at any given moment.
In essence, I'm using the session connection limit to throttle access to the server and the trio.CapacityLimiter as buffer for all the rest, so I don't blow up anything else, like might have happened here or run out of memory.

If you want I'll try writing a script which would reproduce similar conditions when I have the time.

I understand that it might be difficult to solve\pinpoint this issue, along with the possibility that it might've been caused due to simple misuse by me (a bit new to python aio after all...) but this lib really helped me, so I'd like to help back if I can :)

theelous3 commented 6 years ago

Your first paragraph made me feel dirty in a very good way.

I would love a similar environment. I'm trying to recreate one at the moment, but running a few workers locally with httpbin isn't throwing any issues. Not dirty enough!

If you can whip something up that throws the same error that'd be great. I'll keep trying in the mean time.

dd-dent commented 6 years ago

I'm at work for the next few hours, but free for the weekend.

I'm planning on writing a script to more or less recreate the same load conditions and flow.

Until I do, I can only guess at the cause as there are many possible culprits at play here.
I'll say that the issue disappeared when I nailed down the server limits and ran it from a remote ubuntu VPS with stable connectivity.

To summarize, I think possible causes would appear to be either platform (win vs linux), bad connectivity, and\or misuse by me, which would hopefully could be identified once I get down to writing that script...

Will update in a few hours\tomorrow.

theelous3 commented 6 years ago

I have a sneaking suspicion in the back of my mind that if a connection sits in the pool for a while unused, and is dc'd by the server, it is not handled correctly. I also have a sneaking suspicion that I fixed that a while ago. Maybe I imagined it. Time to dig!

At the very least it makes sense given that I can make 100k requests locally with no issues, where nothing sits still, but irl it's not as smooth.

dd-dent commented 6 years ago

So...
I whipped up a little load tester for asks, modeled after my own scenario. asks-test.zip
The script itself is called load_tester.py, and is reasonably commented as to explain things.
I used a nodejs package called json-server which let me set up a reasonable fake for my purposes.
A couple of quick notes:
There's no exception handling in the script, and the fake server doesn't do throttling.
You should use the following lines in load_tester.py modify the test:

job_count = 20 # total jobs to run
job_limit = 10 # trio.Capacitylimiter for jobs
connection_limit = 20 # asks.Session connections kwarg

If anything is unclear, or if I did something horribly wrong please tell me.

theelous3 commented 6 years ago

Hey, I was away for a few days somewhere sunny. Nice, I'll take a look over this now :)

dd-dent commented 6 years ago

It's been raining like crazy here the last few days, so at least someone, somewhere had fun...

Haven't had much time to mess with it myself...
Set the connection limit high enough and you'll run out of file descriptors, set the job limit high enough (with enough jobs to use it) and you'll see the mem usage soar.
I haven't tried taking down the server mid run, or throwing various codes (like 400, 500 etc).

Also forgot to mention that the server can be run with the -d option to add delay in ms...

theelous3 commented 6 years ago

Check it peeps:

https://github.com/theelous3/asks/tree/miracle2k-timeout-leak

Specifically:

https://github.com/theelous3/asks/commit/1fc1310042335f4d32b05bd919d91afeaeeb092c

Ignore the pants on head addition of the pytest-cache.

Seem to have fixed the leaky sockets by removing retry logic from the silly place it was (internal of the request) and moving it to the session level, and handling errors there too, a la the thinking of @miracle2k . Working well for random test cases on my end. Thoughts?

miracle2k commented 6 years ago

My app is still exhibiting this behaviour. I think it might be related to timeouts:

import trio
import asks
import multio

multio.init('trio')

async def main():
    session = asks.Session()
    for x in range(1000):
        try:
            await session.get('http://slowwly.robertomurray.co.uk/delay/3000/url/http://www.google.co.uk', timeout=1)
        except Exception as e:
            print("failed, now sockets: %s, %s" % (session._conn_pool, session._checked_out_sockets))
            pass

trio.run(main)

Output:

failed, now sockets: deque([]), deque([<trio.SocketStream object at 0x107a79048>])
failed, now sockets: deque([]), deque([<trio.SocketStream object at 0x107a79048>, <trio.SocketStream object at 0x107a795c0>])
failed, now sockets: deque([]), deque([<trio.SocketStream object at 0x107a79048>, <trio.SocketStream object at 0x107a795c0>, <trio.SocketStream object at 0x107a9f710>])
failed, now sockets: deque([]), deque([<trio.SocketStream object at 0x107a79048>, <trio.SocketStream object at 0x107a795c0>, <trio.SocketStream object at 0x107a9f710>, <trio.SocketStream object at 0x1075c6b70>])
failed, now sockets: deque([]), deque([<trio.SocketStream object at 0x107a79048>, <trio.SocketStream object at 0x107a795c0>, <trio.SocketStream object at 0x107a9f710>, <trio.SocketStream object at 0x1075c6b70>, <trio.SocketStream object at 0x107a81b70>])

I do not think it's enough to just handle the RequestTimeout exception either. As discussed in another ticket, if I wrap the whole thing in a trio.move_on_after, a CanceledError would be raised. I still think the most reliable way to deal with this might be be some kind of finally-based system. No matter the exception, once the request is done, the socket should be returned.

theelous3 commented 6 years ago

What's the exception there? The ValueError: deque.remove(x): x not in deque thing?

Away from system atm. Can't test. Will though.

As an aside, that's delay endpoint is fun.

theelous3 commented 6 years ago

Wait, I just read what you said properly. Forget what was written here a moment ago. Am digging.

miracle2k commented 6 years ago

Sorry for not being clear. It is about the sockets leaking, so this might not actually be the right ticket! But it's related to the https://github.com/theelous3/asks/tree/miracle2k-timeout-leak branch.

theelous3 commented 6 years ago

Yep no I gottcha @miracle2k. Working on it at the moment. You'll be pleased to know there is probably a finally :)

theelous3 commented 6 years ago

@miracle2k can you post example of your move_on_after wrap?

God there should be an IM feature here.

miracle2k commented 6 years ago

Thanks for fixing it! Here is the still occurring problem with move_on_after:

import trio
import asks
import multio

multio.init('trio')

async def main():
    session = asks.Session()
    for x in range(1000):
        with trio.move_on_after(1):
            await session.get('http://slowwly.robertomurray.co.uk/delay/3000/url/http://www.google.co.uk', timeout=1)
        print("now sockets: %s, %s" % (session._conn_pool, session._checked_out_sockets))

trio.run(main)

I know that finally is tough to add (I forget the details, but I ran in to an issue), but maybe a catch-all Exception works? (and then re-raise?)

miracle2k commented 6 years ago

Note though that trio's Cancelled actually subclasses BaseException, so catch Exception would not catch it, so maybe we'd have to catch BaseException.

theelous3 commented 6 years ago

So what we can't do, is anything at all involving actual io-ish async once a Cancelled has been raised (like closing a socket). As soon as we do the coro hits a brick wall and nothing further happens there, and the next event outside of the cancelled coro takes place - in our example, the next request.

I refactored a little bit, and added a _murdered flag that is set synchronously when a BaseException is raised, informing the session that a request has been killed.

If something has been killed we clean it up before we try to do anything later on. To be honest, I feel a little weird putting that in the request method, but it's a fast and simple check and it keeps it out of the user's hair. A little ugly, but no performance cost. You can also call this method directly with Session.cleanup.

https://github.com/theelous3/asks/commit/10e0d38abf769d1512afb0ebbcc4323f05425991 https://github.com/theelous3/asks/commit/812c03a76ff7f935a8f765fd88d4b1a00e3f8947 (whoops got ahead of myself https://github.com/theelous3/asks/commit/a43f8460cf93d1f3d2d512220834d6f5d35282f7)

Shortly once we add nice timeouts around the connection process, there should be little to no reason to use anything that may trigger this flag anyway (at least solely for the request), which will be nice.

Seems fine to me overall. I'm open to refactors on this, and also open to leave this open a little while considering how good the bug-hunting's been by @miracle2k, I don't want to hastily close this again.

miracle2k commented 6 years ago

I'll give it a try.

Interesting about not being able to catch and re-raise the Canceled exception. I would have assumed that works, because I think it works in asyncio. Maybe trio is different here (maybe @njsmith can tell us).

njsmith commented 6 years ago

I'm not quite following the discussion here, but I can say:

theelous3 commented 6 years ago

Hi hi. Yes, we're using SocketStreams. Note in the following example aclose is aliased to close

Catching all BaseException in the cancelled task and trying to clean up after them, we use the following:

            except BaseException as e:
                print(type(e))
                print(type(sock))
                print('Closing socket')
                await sock.close()
                print('Socket closed')
                sock._active = False
                print('Replacing connection')
                await self._replace_connection(sock)
                print('Connection replaced')
                raise e

Run using the example posted a few comments ago, employing trio.move_on_after, we get the following output:

>>> trio.run(main)
<class 'trio.Cancelled'>
<class 'trio.SocketStream'>
Closing socket
now sockets: deque([]), deque([<trio.SocketStream object at 0x7fdd6d994780>])
<class 'trio.Cancelled'>
<class 'trio.SocketStream'>
Closing socket
now sockets: deque([]), deque([<trio.SocketStream object at 0x7fdd6d994780>, <trio.SocketStream object at 0x7fdd6d994898>])
<class 'trio.Cancelled'>
<class 'trio.SocketStream'>
Closing socket
now sockets: deque([]), deque([<trio.SocketStream object at 0x7fdd6d994780>, <trio.SocketStream object at 0x7fdd6d994898>, <trio.SocketStream object at 0x7fdd6ec6deb8>])
<class 'trio.Cancelled'>
<class 'trio.SocketStream'>
Closing socket
now sockets: deque([]), deque([<trio.SocketStream object at 0x7fdd6d994780>, <trio.SocketStream object at 0x7fdd6d994898>, <trio.SocketStream object at 0x7fdd6ec6deb8>, <trio.SocketStream object at 0x7fdd6d929f98>])
<class 'trio.Cancelled'>
<class 'trio.SocketStream'>
Closing socket
now sockets: deque([]), deque([<trio.SocketStream object at 0x7fdd6d994780>, <trio.SocketStream object at 0x7fdd6d994898>, <trio.SocketStream object at 0x7fdd6ec6deb8>, <trio.SocketStream object at 0x7fdd6d929f98>, <trio.SocketStream object at 0x7fdd6d9b94a8>])

All other exceptions that trigger tidying up the socket use the same code and work as expected. For example, here's the output of some timeouts:

>>> trio.run(main)
<class 'asks.errors.RequestTimeout'>
<class 'trio.SocketStream'>
Closing socket
Socket closed
Replacing connection
Connection replaced
failed, now sockets: deque([]), deque([])
<class 'asks.errors.RequestTimeout'>
<class 'trio.SocketStream'>
Closing socket
Socket closed
Replacing connection
Connection replaced
failed, now sockets: deque([]), deque([])
<class 'asks.errors.RequestTimeout'>
<class 'trio.SocketStream'>
Closing socket
Socket closed
Replacing connection
Connection replaced
failed, now sockets: deque([]), deque([])
<class 'asks.errors.RequestTimeout'>
<class 'trio.SocketStream'>
Closing socket
Socket closed
Replacing connection
Connection replaced
failed, now sockets: deque([]), deque([])
<class 'asks.errors.RequestTimeout'>
<class 'trio.SocketStream'>
Closing socket
Socket closed
Replacing connection
Connection replaced
failed, now sockets: deque([]), deque([])

Here's a branch that just catches BaseException and tries to tidy up, for illustration and experimentation purposes: https://github.com/theelous3/asks/tree/trio_cancelled

njsmith commented 6 years ago

Ah, I see.

So what's happening in the example is that the await in await sock.close() indicates that this is a checkpoint, i.e. a place where trio can raise Cancelled. And cancellation in trio is "level-triggered", so it does in fact raise Cancelled. (But even if it weren't level-triggered, you could still get a Cancelled exception here, if it happened that some external timeout expired just as you were about to reach this code, so you'd still need to be prepared to handle that – the level-triggered thing just makes it much easier it hit.)

Now aclose is special: it guarantees that even if it is cancelled, it does still close the underlying socket. But then after that it raises Cancelled, so the rest of your bookkeeping logic doesn't run.

One approach would be to put the await sock.close() at the end of the bookkeeping, so that the other code runs regardless of whether it raises an exception. But in this case I guess that might be tricky because await self._replace_connection(sock) also has an await, so might be a checkpoint too...

Can you elaborate a little on what this bookkeeping is trying to do? I would have expected that once a socket was closed you would just throw it away. What does _replace_connection do? Why are you keeping a deque of sockets that are in use – wouldn't it be simpler if the pool stopped tracking them entirely while they were in use, and then started tracking them again only once they became reusable (if they do)?

miracle2k commented 6 years ago

Ok, to summarize, here is what the problem is:

with trio.move_on_after(1):
    # Inside the asks library:
    socket_stream = await open_trio_socket_stream()
    try:
        await long_running_socket_operation(socket_stream)
    except trio.Canceled:
        # we want to close the socket now and re-raise the exception to continue cancellation
        await socket_stream.aclose()
        some_other_cleanup_steps()
        raise

What happens here is that because the task is already cancelled, socket_stream.aclose raises a Canceled Exception, too (but still closes the socket, if I understand correctly what Nathaniel said).

And again, if I understand what @njsmith said correctly, this is ok and to be expected, and we should deal with it? So, either we do:

# Catch Canceled directly
except trio.Canceled:
    # Do all the other cleanup steps first
    some_other_cleanup_steps()

    # Let aclose re-raise cancelled
    await socket_stream.aclose()

    # Just to be safe, re-raise ourselves in case aclose did not
    raise

Or we do:

# Catch all base exceptions
except BaseException:
    # Call aclose and catch any cancelled exception it may raise
    try:
        await socket_stream.aclose()
    except CanceledError:
        pass

    # Do anything else for cleanup
    some_other_cleanup_steps()

    # Re-raise
    raise

The problem of course is that asks should not have a dependency on trio directly, so maybe be even more aggressive:

except BaseException:
    try:
        await socket_stream.aclose()
    # Instead of only catching trio.CanceledError, catch all.
    except BaseException:
        pass

    some_other_cleanup_steps()

    raise

While none of them seem particularly pretty, I do think the solution we have now, where in the case of outside cancellation such as here the the socket gets added to a temporary murdered list, and I as the library user have to call a special cleanup method, is quiet unergonomic. After the amount of work that went into making timeouts a core part of trio, it seems a bit unfortunate to discourage the use of them here.

theelous3 commented 6 years ago

@njsmith I see. I guessed this was roughly what was taking place. Of note, however, when I tried handling it like:


    except BaseException as e:
        await self._handle_exception(e, sock)

    # where _handle_exception loooked like
    async def _handle_exception(e, sock):
        print('this will print')
        await sock.aclose()
        print('this will not print')
        raise e

this will print would print, and likewise for the other. I expected that if the first await encountered after the Cancelled was taken as a point for trio to step in, I would never see the first print. This isn't the case. How come we can get one await deep in the _handle_exception case, but not when closing sock?

Note: I just made a branch removing the _checked_out_sockets thing. I hadn't realised, but it's an artifact from when we used to track and limit connections by checked out vs in pool. This logic is no longer required. This also means we can simply await sock.close in a BaseException case, and do no further cleanup.

njsmith commented 6 years ago

How come we can get one await deep in the _handle_exception case, but not when closing sock?

That's discussed some here: https://trio.readthedocs.io/en/latest/reference-core.html#checkpoints

Basically, it's not literally the await that makes something a checkpoint. Rather, trio makes it so that all of its built in async funxtions are checkpoints. So if you see await <...something in trio...> that's a checkpoint, and if you see await some_third_party_func() then, well... Why is there an await there, unless it ultimately ends up calling some async function that's built into trio? So seeing an await doesn't mean something is necessarily a checkpoint, but as a caller you should probably be prepared to get a Cancelled exception, just in case.

Also btw, note that writing except trio.Cancelled is probably not a good idea – for example, if there are nurseries involved, then after being cancelled you might get a MultiError([Cancelled(), Cancelled()]), and your except block won't run.

Code like this is fine though:

except:
    some_synchronous_cleanup()
    await obj.aclose()
    raise

(And look how short and simple it is :-).)

theelous3 commented 6 years ago

Oh, neato.

https://github.com/theelous3/asks/commit/d82c328e3f69da0721ea9081db96ffaa21116dc2 (not merged atm)

Now we just

            except BaseException as e:
                await sock.close()
                raise e

As MultiError is BaseException too.

njsmith commented 6 years ago

Nice!