django / channels_redis

Redis channel layer backend for Django Channels
BSD 3-Clause "New" or "Revised" License
601 stars 197 forks source link

v4b1 - AsyncToSync closed event loop #317

Closed bbrowning918 closed 2 years ago

bbrowning918 commented 2 years ago

refs https://github.com/django/channels_redis/issues/312

bbrowning918 commented 2 years ago

I am not sure why the 3.7 tests hung, I will give them a try locally (did the work under 3.8)

carltongibson commented 2 years ago

@bbrowning918 Cool. Thanks.

From the log it looks like a hang. If you can’t resolve quickly, comment and I’ll have a look (rather than you spending an age on it!)

bbrowning918 commented 2 years ago

I could not get the tests to hang with either pytest with 3.7.13 or a tox run, but that's local. It seems it was the pub sub section, but no obvious clues as none of that changed.

bbrowning918 commented 2 years ago

I have gotten somewhere. The test_groups_basic in either test_pubsub.py and test_pubsub_sentinel.py tests can hang intermittently.

The part that hangs is the last section where we force a timeout for a message we correctly should not receive. It can be recreated by a test such as this, ran over and over until one sticks and never finishes.

@pytest.mark.asyncio
async def test_receive_timeout(channel_layer):
    channel_name = await channel_layer.new_channel(prefix="test-timeout-chan")
    with pytest.raises(asyncio.TimeoutError):
        async with async_timeout.timeout(1):
            await channel_layer.receive(channel_name)

Deeper inside receive, on the cleanup section triggered by async_timeout throwing asyncio.CancelledError, we attempt to unsubscribe the channel on the shard.

async def unsubscribe(self, channel):
    if channel in self._subscribed_to:
        self._subscribed_to.remove(channel)
        print("looking for sub conn")
        conn = await self._get_sub_conn()
        print("found sub conn")
        await conn.unsubscribe(channel)
        print("unsubscribed")

await self._get_sub_conn() seems to await something that never arrives. That method is beyond my depth/knowledge.

CI environments could be more susceptible assuming they are more resource constrained than a dedicated machine locally, as it feels like a timing thing.

carltongibson commented 2 years ago

OK, good detective work @bbrowning918 🕵️

@acu192 Can I ask you to have a glance if you get a cycle? (I shall have a dig too this week)

acu192 commented 2 years ago

@bbrowning918 Do you mind looking one call deeper to see which of the awaits inside _get_sub_conn() is hanging?

I just spent some time looking (again) at the diff where we migrated from aioredis to redis-py. I'm assuming that's where this creeped in, as we didn't see it before that change, and given most of our recent GitHub Actions have failed due to this. Edit: Actually, looking again, the recent GH action failures are only on this PR.

Beyond that... I'm struggling to keep my head above water so I can't look at this deeper right now. 😟

bbrowning918 commented 2 years ago

Okay, bear with the ignorance here but with more brute force printing I found the following.

    async def _get_sub_conn(self):
        if self._keepalive_task is None:
            self._keepalive_task = asyncio.ensure_future(self._do_keepalive())
        if self._lock is None:
            self._lock = asyncio.Lock()
        print(self._lock)
        async with self._lock:
            if self._sub_conn is not None and self._sub_conn.connection is None:
                await self._put_redis_conn(self._sub_conn)
                self._sub_conn = None
                self._notify_consumers(self.channel_layer.on_disconnect)
            if self._sub_conn is None:
                if self._receive_task is not None:
                    print(self._receive_task)
                    self._receive_task.cancel()
                    try:
                        print("waiting for receive_task")
                        await self._receive_task
                    except asyncio.CancelledError:
                        print("receive_task cancelled")
                        # This is the normal case, that `asyncio.CancelledError` is throw. All good.
                        pass

Which on a hanging run produces:

<asyncio.locks.Lock object at 0x7f88fd85a7f0 [unlocked]> <asyncio.locks.Lock object at 0x7f88fd85a7f0 [unlocked]> <Task pending name='Task-4' coro=<RedisSingleShardConnection._do_receiving() running at channels_redis/channels_redis/pubsub.py:409> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x7f88fd895490>()]>> waiting for receive_task receive_task got cancelled <asyncio.locks.Lock object at 0x7f88fd85a7f0 [unlocked]> <Task pending name='Task-5' coro=<RedisSingleShardConnection._do_receiving() running at channels_redis/channels_redis/pubsub.py:391> wait_for=> waiting for receive_task <asyncio.locks.Lock object at 0x7f88fd85a7f0 [locked]>

The _do_keepalive is triggering sequential runs on it's periodic timer and we seem to hit a deadlock. We end up waiting at self._recieve_task for a second/different task but it's cancellation never seems to occur.

A successful run ends with the CancellationError thrown and all the tidying up done. We don't get that last keepalive attempt that sticks on the lock.

<asyncio.locks.Lock object at 0x7f49b6ab6550 [unlocked]> <asyncio.locks.Lock object at 0x7f49b6ab6550 [unlocked]> <Task pending name='Task-4' coro=<RedisSingleShardConnection._do_receiving() running at channels_redis/channels_redis/pubsub.py:408> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x7f49b6af5820>()]>> waiting for receive_task receive_task got cancelled <asyncio.locks.Lock object at 0x7f49b6ab6550 [unlocked]> <Task pending name='Task-5' coro=<RedisSingleShardConnection._do_receiving() running at channels_redis/channels_redis/pubsub.py:408> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x7f49b6af5c70>()]>> waiting for receive_task receive_task got cancelled

acu192 commented 2 years ago

I'm only coming up with more questions, like (from the code-path you are discovering) this section is being hit... meaning that the Redis connection is lost very consistently during this test (much more consistently than I'd expect, as we're seeing something like 4/5 of our recent github actions fail here). If that section were not being hit, we'd never get into the deeper parts of the code that are blocking us. So... if I had time I'd actually start investigate that question.

But, onto the things you've discovered. You've discovered that await self._receive_task is the real blocker. If it were to return, the lock would unlock and the _do_keepalive() would get to do its thing, and all would be well.

So (assuming we're both correct up until now), there are only two awaits inside _do_receiving(), here and here. Those are the only two place that can be blocking us. Umm, the latter is weird. Why are we sleeping here!? That should be removed. It slipped into the migrated from aioredis to redis-py somehow, although it's not causing this particular issue. The former is all that's left, which (if I'm right), means that redis-py is actually what's blocking us.

All told, we should definitely move this to a new issue as it doesn't seem to relate to your PR here. In additions, while I'm dying to work on this (I really like this library and want to see it improve), I'm unable to commit significant time right now. I suspect (of course, unconfirmed) that we've done something wrong at a core level of our migration to redis-py -- not something that can be blamed by any single line of code, but rather something about how we're using redis-py that is breaking us. It's probably a deep issue.

acu192 commented 2 years ago

Edit: I immediately reevaluated my post, and made some strike-throughs on it after I realized that all the recent GH action failures are on this PR. So, maybe the situation is not so dire. Sorry for my error which led me to that rash conclusion.

acu192 commented 2 years ago

Edit edit: I decided to re-run the test suite in another PR, to see if this same deadlock would happen elsewhere. Indeed, it seems to have reproduced elsewhere, see: https://github.com/django/channels_redis/runs/7418417216 (I canceled the test after it had hung for over an hour)

So, I'm back to thinking this issue is unrelated to this PR, that it instead has something to do with our usage of redis-py. I'd love a second opinion on that though, as I've gone back and forth now.

bbrowning918 commented 2 years ago

I swapped locally back to main, reran test_receive_timeout a couple times and the hang reoccurred. I would agree this is a redis-py migration (intermittent) issue.

If this PR has to be set aside until the underlying hang/failure is solved, assuming this has not made it worse inadvertently, for some future merge in then that's just the way things go sometimes. A consistently passing test suite seems like the higher priority and harder problem for the redis-py bits to be completed. The underlying issue this PR sought to fix should not be difficult to do downstream of any other changes considering there is a simple test case to recreate/check.

If there is no objection, I will do my best to sum up what was learned so far and how to recreate the hang as a new issue for visibility sake with the hope more eyes on it will produce new insight or solutions.

acu192 commented 2 years ago

If there is no objection, I will do my best to sum up what was learned so far and how to recreate the hang as a new issue for visibility sake with the hope more eyes on it will produce new insight or solutions.

Yes, please go ahead. Thank you for the hunting you've done so far. It's been very valuable.

carltongibson commented 2 years ago

Thanks both for the work here! 🎁

I shall pick this up too and have a look.

bbrowning918 commented 2 years ago

The cleanup PRs either got some of the ghosts out, or there was finally a lucky test run.

carltongibson commented 2 years ago

@bbrowning918 That's good news... 👍 — there were a few other clean-ups/modernisations I spotted, e.g. replacing ensure_future with create_task — I think there's milage in continuing just to chip away at those to help narrow down exactly the block we're seeing here.

@nielsuit227 Would you be able to give this branch a run against your test suite, to see if it resolves the issue? Thanks!