mosquito / aio-pika

AMQP 0.9 client designed for asyncio and humans.
https://aio-pika.readthedocs.org/
Apache License 2.0
1.18k stars 186 forks source link

QueueIterator raises StopAsyncIteration when iterator/channel is closed. #615

Open Darsstar opened 5 months ago

Darsstar commented 5 months ago

See #358

Currently QueueIterator never throws a StopAsyncIterator exception. Not when the channel is closed, and not after QueueIterator's close method is called. Which implies that starting an async for message in queue.iterator(): loop will keep running "forever" even if no new message will ever arrive. ("forever" because the asyncio task can be canceled, etc.)

This PR fixes that in a backwards compatible way. Some tests are refactored to rely on this implicitly, and new ones that explicitly test QueueIterator.anext() throws certain exceptions have been added as well.

coveralls commented 5 months ago

Coverage Status

coverage: 92.045% (+3.9%) from 88.125% when pulling 9abe45fb9d804f188fdb6aba0a44368c7a922522 on Darsstar:QueueIterator-raises-StopAsyncIterator into 848c0250045811c7598123b46a1d3dda2745c905 on mosquito:master.

Darsstar commented 5 months ago

@mosquito ping

mosquito commented 3 months ago

@Darsstar this request contains a lot of changes that could potentially breaks backward compatibility. I'm still thinking about how to test it so that I understand I need to make a separate major release, or make do with a minor one.

Darsstar commented 3 months ago

this request contains a lot of changes that could potentially breaks backward compatibility.

I assume you are refering to:

9.4.0 already dropped Python 3.7. The changes taking advantage of 3.8, all contained in a single commit, were made in a way it should be backward compatible.

It probably should be a new major version due to the protected properties.

Although, I think those could be rewritten as @propertys with setters which notify a asyncio.Condition ... Than I think it boils down to wether you are supporting people inheriting from the abstract base classes without inheriting from the concrete classes aio-pika provides. If you do the major version should be bumped.

Darsstar commented 3 months ago

I rebased, which added the 3.12 tests, and now tests/test_amqp_robust_proxy.py::test_channel_reconnect_stairway[[0] 0.1-128] (consistantly) fails only on 3.12, time for me to debug the issue...

Darsstar commented 3 months ago

Small update: I am currently under the impression this PR isn't at fault.

I branched from upstream/master and altered the reconnect_stairway test to start at 60 and go all the way up to 4096, instead of every power of 2 in the range [64, 4096]. The tests fail on all Python versions, so it seems to be some sort of race condition based on how tasks end up being scheduled...? The larger the stair value, the less likely it is to fail.

I don't have a strong lead, but I started looking at aiomisc and aiormq as well. Hopefully I'll stumble on the cause in the next two weeks or so...

Darsstar commented 3 months ago

The tests are passing again :)

Public backward incompatible API changes:

Internal backward incompatible API changes:

Those are all I found going over all the changes again.

My preference would be leaving it as is and reflecting that in the version number. Say the words and I will will turn Connection._closed and Channel._closed into properties and undo the QueueIterator.close() change.

LockedThread commented 2 months ago

Hey @Darsstar, I am coming from https://github.com/mosquito/aio-pika/issues/623 and I have a question about how we should be handling the TimeoutError being raised from the application perspective? Recently, this exception has been throwing periodically in my production code and I am unsure of how I should be handling it... I thought the RobustConnection would handle this.

LockedThread commented 2 months ago

I just re-read the issue and this PR may also be fixing a memory leak I have been facing for a while. I am going to pull this version down and see if it fixes it. Will report back.

LockedThread commented 2 months ago

This PR also seems to fix my memory leak. @mosquito Do we have an ETA on when this could be merged? I am going to start using @Darsstar's version now.

Darsstar commented 2 months ago

Hey @Darsstar, I am coming from #623 and I have a question about how we should be handling the TimeoutError being raised from the application perspective? Recently, this exception has been throwing periodically in my production code and I am unsure of how I should be handling it... I thought the RobustConnection would handle this.

This branch should only throw TimeoutError if you pass a non-None value as the timeout keyword argument to Queue.iterator(). If you do, presumably you did so for a reason.

iterator = queue.iterator(timeout=60)
while True:
    try:
        message = await iterator
    except StopAsyncIteration:
        break
    except asyncio.TimeoutError:
        # do whatever you want to do
        continue
    # handle message here

Alternatively you could create a wrapper

class Wrapper:
    def __init__(self, iterator: AsyncIterator):
        self.iterator = iterator

    async def __anext__(self):
        while True
            try:
                return await self.iterator
            except StopAsyncIteration:
                # do whatever you want to do

async for message in Wrapper(queue.iterator(timeout=60)):
    # handle message here
mosquito commented 2 months ago

@LockedThread @Darsstar sorry, lots of changes, should planning to retest all these myself.

LockedThread commented 2 months ago

@LockedThread @Darsstar sorry, lots of changes, should planning to retest all these myself.

All good, we appreciate your diligent work in supporting this project.

Darsstar commented 2 months ago

Unsurprisingly fixing this requires more code and therefor increases the runtime.

@gglluukk's benchmark (thanks!) show about 10% runtime degredation:

before your patch:
rmq2psql.py --loop-type queue_iteration_with_timeout --max-bulks 100: #1: 4.905
rmq2psql.py --loop-type queue_iteration_with_timeout --max-bulks 100: #2: 4.503
rmq2psql.py --loop-type queue_iteration_with_timeout --max-bulks 100: #3: 4.809
rmq2psql.py --loop-type queue_iteration_with_timeout --max-bulks 100: #4: 4.870
rmq2psql.py --loop-type queue_iteration_with_timeout --max-bulks 100: #5: 4.741
Average: 4.766

after your patch:
rmq2psql.py --loop-type queue_iteration_with_timeout --max-bulks 100: #1: 5.481
rmq2psql.py --loop-type queue_iteration_with_timeout --max-bulks 100: #2: 5.123
rmq2psql.py --loop-type queue_iteration_with_timeout --max-bulks 100: #3: 5.397
rmq2psql.py --loop-type queue_iteration_with_timeout --max-bulks 100: #4: 5.437
rmq2psql.py --loop-type queue_iteration_with_timeout --max-bulks 100: #5: 5.128
Average: 5.313
LockedThread commented 2 months ago

Unsurprisingly fixing this requires more code and therefor increases the runtime.

@gglluukk's benchmark (thanks!) show about 10% runtime degredation:


before your patch:

rmq2psql.py --loop-type queue_iteration_with_timeout --max-bulks 100: #1: 4.905

rmq2psql.py --loop-type queue_iteration_with_timeout --max-bulks 100: #2: 4.503

rmq2psql.py --loop-type queue_iteration_with_timeout --max-bulks 100: #3: 4.809

rmq2psql.py --loop-type queue_iteration_with_timeout --max-bulks 100: #4: 4.870

rmq2psql.py --loop-type queue_iteration_with_timeout --max-bulks 100: #5: 4.741

Average: 4.766

after your patch:

rmq2psql.py --loop-type queue_iteration_with_timeout --max-bulks 100: #1: 5.481

rmq2psql.py --loop-type queue_iteration_with_timeout --max-bulks 100: #2: 5.123

rmq2psql.py --loop-type queue_iteration_with_timeout --max-bulks 100: #3: 5.397

rmq2psql.py --loop-type queue_iteration_with_timeout --max-bulks 100: #4: 5.437

rmq2psql.py --loop-type queue_iteration_with_timeout --max-bulks 100: #5: 5.128

Average: 5.313

It would be great to get the time down but I am personally willing to accept a 10% degradation in order to not have a memory leak.

Darsstar commented 2 months ago

It would be great to get the time down but I am personally willing to accept a 10% degradation in order to not have a memory leak.

I don't see optimisation potential without porting the library to C/C++/Rust. (While keeping the implementation correct.)

LockedThread commented 1 month ago

It would be great to get the time down but I am personally willing to accept a 10% degradation in order to not have a memory leak.

I don't see optimisation potential without porting the library to C/C++/Rust. (While keeping the implementation correct.)

Hopefully PyO3 gets better support for async/await, while maintaining interoperability with asyncio. There is a lot of active work on this right now. Once that happens, using Rust will be feasible.

In the meantime, losing 10% in performance is worthwhile to make sure I dont get OOM kills.

mosquito commented 1 month ago

You should do a performance test with cProfile for example. The last time I did this, it marshall in pamqp the slowest one.