django / channels_redis

Redis channel layer backend for Django Channels
BSD 3-Clause "New" or "Revised" License
595 stars 197 forks source link

Multiple Daphne instances with Redis backend get stuck #134

Open ralphje opened 6 years ago

ralphje commented 6 years ago

I am currently debugging the following situation, for which I have not been able to create a reproducible situation in a testing environment, mostly due to time contraints. I am posting what I know now, perhaps this rings a bell somewhere that makes me stop looking further, but I suspect a bug of some kind:

I have a Nginx proxy server that proxies between two Daphne instances running the same codebase. I am running a Redis channel backend. I am using Celery for several backend tasks, and want to report their status to the client that is waiting on them. For this, they publish a PROGRESS state and I have hooked some Channels code inside the storage function, so that a message is sent to a group waiting on that task, e.g.

async_to_sync(channel_layer.group_send)("celery-status-<task-uuid>", {"type": "celery.task_status", "text": task_status})

I have a consumer that looks a bit like this:

class MyConsumer(AsyncWebsocketConsumer):
    async def receive(self, text_data=None, bytes_data=None):
        await self.channel_layer.group_add("celery-status-<task-uuid>", self.channel_name)

    async def celery_task_status(self, event):
        await self.send(text_data=event["text"])

Now, when someone requests to receive updates for the given Celery task ID, and I send status updates to the client. (I also immediately send the current status to the consumer, which works fine, so is left out of the code example).

The bug I encounter, is that after a while, progress updates are not sent to the clients anymore. This only appears to happen when there are two or more Daphne servers running. The Redis queues fill up. When I shutdown one of those Daphne servers, and clear the Redis cache, it continues to work fine.

Perhaps this has to do with my Nginx proxy setup, which has simply two upstream servers specified, although I'm not sure that's the culprit. I think two Daphne instances is the root of the problem.

Is this a known issue? Is there something I can add to this bug to make it more reproducible?

andrewgodwin commented 6 years ago

Well, the first problem I see is that you have self.send not await self.send, so definitely fix that. Without awaiting it, you might not ever give the other coroutine a chance to send the data and proceed to lock up the process.

ralphje commented 6 years ago

That's a mere typo in my example code :).

andrewgodwin commented 6 years ago

In that case, I need actual code that breaks so I can try to look at it and potentially reproduce it!

ralphje commented 6 years ago

Sure thing. I will try to produce some way to reproduce this ASAP. Was primarily postponing that to prevent investing time in a known issue ;)

andrewgodwin commented 6 years ago

It is possible it is related to https://github.com/django/channels_redis/issues/79, but since neither this or that have steps to reproduce or detailed error info, I can't be sure yet.

ralphje commented 6 years ago

I have been able to reproduce this issue in a limited environment, but not very reliably. The code and steps to reproduce are in this repository: https://github.com/ralphje/channels939

ralphje commented 6 years ago

Have you been able to reproduce this issue?

I ran into another issue, that may or may not be related: celery/celery#3898. This describes an error with Celery that stalls processing tasks when hiredis is installed. hiredis is a requirement of aioredis, thus being installed by channels_redis.

Not saying that it is in fact related but the symptoms are very similar.

gravitylow commented 6 years ago

Is this related to https://github.com/django/channels_redis/issues/86 ? If so, upgrading channels_redis might resolve the issue.

ralphje commented 6 years ago

I have tried to replicate the issue on 2.1.1 and it still appears to be an issue.

adamhooper commented 6 years ago

We've been running into this problem on production. I added some debugging messages to our code and ran MONITOR on Redis. Here's what I found:

The one time I reproduced this problem, it happened near application start. Log messages:

E  DEBUG 2018-10-18 22:39:52,480 connection 1 140326916581120 Creating tcp connection to ('redis-service', 6379)
E  DEBUG 2018-10-18 22:39:52,492 connection 1 140326916581120 Creating tcp connection to ('redis-service', 6379)
E  DEBUG 2018-10-18 22:39:52,497 connection 1 140326916581120 Creating tcp connection to ('redis-service', 6379)
E  DEBUG 2018-10-18 22:39:52,502 websockets 1 140326916581120 Added to channel workflow-5852
E  DEBUG 2018-10-18 22:39:52,504 websockets 1 140326916581120 Added to channel workflow-5057
E  DEBUG 2018-10-18 22:40:02,943 websockets 1 140326916581120 Added to channel workflow-6154
E  DEBUG 2018-10-18 22:40:03,352 websockets 1 140326916581120 Added to channel workflow-5893
E  DEBUG 2018-10-18 22:40:03,402 websockets 1 140326916581120 Added to channel workflow-6132
E  DEBUG 2018-10-18 22:40:04,596 connection 1 140326916581120 Cancelling waiter (<Future cancelled>, [None, None])
E  DEBUG 2018-10-18 22:40:04,596 util 1 140326916581120 Waiter future is already done <Future cancelled>
E  DEBUG 2018-10-18 22:40:04,596 websockets 1 140326916581120 Discarded from channel workflow-5057
E  DEBUG 2018-10-18 22:40:04,596 websockets 1 140326916581120 Discarded from channel workflow-5852
E  DEBUG 2018-10-18 22:40:06,317 websockets 1 140326916581120 Added to channel workflow-5899
E  DEBUG 2018-10-18 22:40:06,402 websockets 1 140326916581120 Added to channel workflow-5852
E  DEBUG 2018-10-18 22:40:26,824 websockets 1 140326916581120 Added to channel workflow-6130

These are messages on Server A: group_add (Added to channel) and group_discard (Discarded from channel). Further logs show that Server A succeeds in calling group_send(), and Server B receives them. But messages destined for Server A are never received.

Next, I ran Redis MONITOR to watch traffic. I found that only Server B was calling BRPOPLPUSH every 5s: Server A was not calling it. (I was not monitoring when those log messages appeared, so I don't know what Server A's last messages were.)

That's where I'm stuck now. We've become quick to identify when the problem occurs and we have an icky workaround (restart the dead server).

I'm not ruling out error on our part; but clearly Server A's event loop is still humming (because it handles HTTP requests), so the error seems constrained to a single Redis connection.

I've tried understanding channels_redis' logic, and I'm a bit confused. Why are there connection pools, instead of one send connection and one receive connection per host? Why doesn't BRPOP go in a singleton long-running task on the event loop that won't be canceled, instead of using locks and backup queues and cancel-specific code and cleaners? (These are changes I'd be happy to work on, if they fix this bug.)

... but at the same time, I don't have proof that the error is in channels_redis. How can I find out whether the error is indeed in that receive() logic? Add logging messages in a fork, perhaps? What would be the wisest places?

adamhooper commented 6 years ago

I used to see this problem every two or three days, so I wrote a RabbitMQ channel layer. The problem hasn't appeared for weeks now.

RabitMQ channel layer: https://github.com/CJWorkbench/channels_rabbitmq

RabbitMQ seems a better fit for this task than Redis, because the AMQP protocol addresses all the layer's needs explicitly. Clients don't fiddle with server-side state, and there are no races or expiries to worry about. If you're in a containerized environment, RabbitMQ is just as easy to install as Redis.

I'm left suspicious that the problem relates to channels_redis' use of connection pools. (See my previous comment.) I wrote this layer without them, and I think that's why it's more stable. Instead, the layer starts listening for messages as soon as it's constructed; it only stops if you call .close() or close the event loop.

Please give it a shot!

</plug>

ericls commented 6 years ago

@adamhooper If I'm not mistaken, loops are coupled to their threads. And @sync_to_async uses threadpool, where new loops are created. These loops cannot use connections established using in the loop that's in the main thread.

adamhooper commented 6 years ago

@ericls I'm not quite sure what you're talking about....

Is it "different event loop requires different connection?" channels_rabbitmq and channels_redis address this in the same way: a mapping from EventLoop to Connection (well, List[Connection], in channels_redis' case). Indeed, each project has a battery of unit tests that spawns and destroys several event loops.

Is it "invoking async code from @async_to_sync's executor threads is tricky?" Again, channels_rabbitmq and channels_redis address this with the same strategy: er, nothing. (Channel-layer methods can only be invoked on the event loop's main thread, not its executor threads.)

The one difference between channels_rabbitmq and channels_redis is that channels_rabbitmq immediately starts consuming messages when you connect, and channels_redis waits for you to ask for one. I think we're straying pretty far from this particular bug report, though.

The point is: I think my workaround is viable. If there's a bug in channels_rabbitmq, please report it! https://github.com/CJWorkbench/channels_rabbitmq/issues

andrewgodwin commented 6 years ago

This appears to be a problem in channels_redis, not in Daphne, so I am going to move it over there. Additionally, unless we get some direct ways to reproduce this reliably, I'm not going to be able to actually address this and fix it.

khpeterson commented 5 years ago

I am running into what seems like the same issue - multiple instances of daphne running behind apache on AWS, communicating through ElastiCache/redis. The clients consist of simple websocket event producer/consumer pairs.

Initially, I was seeing dropped channel layer messages and ChannelFull exceptions around the default timeout for my long running websockets (86400 seconds). To make this happen more frequently I shortened websocket_timeout to 300s and though it still takes a while, it's fairly repeatable in a manageable amount of time (several hours with 10 + 10 clients) sourcing/sinking messages every few seconds.

At the bottom of this post is a snapshot of the redis log just before the ChannelFull exceptions start piling up (with message data intentionally awk'd to make it more readable). Basically, things are running along ok with lpush, brpoplpush, brpop commands repeating in sequence but then the brpop/brpoplpush commands just stop and eventually the channels fill. There is nothing in the daphne logs indicating a problem until the ChannelFull exceptions start.

I've reviewed channels_redis/core.py and from what I can tell either receive_single() is not getting called or redis is stuck (assuming redis MONITOR only logs the brpop commands when they complete).

Shutting down and restarting all of my websocket clients seems to be enough to keep going (don't need to restart daphne). However, after the batch of shutdowns, I see the exception:

2018-11-18 17:43:41,425 daphne.server ERROR Exception inside application: File "/opt/python/run/venv/local/lib/python3.6/site-packages/channels/sessions.py", line 179, in call return await self.inner(receive, self.send) File "/opt/python/run/venv/local/lib/python3.6/site-packages/channels/middleware.py", line 41, in coroutine_call await inner_instance(receive, send) File "/opt/python/run/venv/local/lib/python3.6/site-packages/channels/consumer.py", line 59, in call [receive, self.channel_receive], self.dispatch File "/opt/python/run/venv/local/lib/python3.6/site-packages/channels/utils.py", line 59, in await_many_dispatch await task File "/opt/python/run/venv/local/lib/python3.6/site-packages/channels_redis/core.py", line 449, in receive assert not self.receive_lock.locked()

Maybe this is just https://github.com/django/channels_redis/issues/136, but maybe the same lock hole is preventing receive_single() from getting called.

Here's the redis log, filtered for the channel that fills up:

[ec2-user@ip-172-30-1-57 ~]$ cat redis_2018-11-17.log | grep "asgi:specific.VhOui" | awk '/"lpush"/{printf("%s %s %s %s %s \n", $1, $2, $3, $4, $5);next};{print $0}' | tail -400l | head -64 +1542562767.658355 [0 172.30.1.243:50576] "lpush" "asgi:specific.VhOuiFYa!" +1542562767.659099 [0 172.30.1.243:50576] "expire" "asgi:specific.VhOuiFYa!" "60" +1542562767.660627 [0 172.30.2.213:55348] "brpop" "asgi:specific.VhOuiFYa!$inflight" "0" +1542562767.664428 [0 172.30.2.213:55348] "eval" "\n local backed_up = redis.call('LRANGE', ARGV[2], 0, -1)\n for i = #backed_up, 1, -1 do\n redis.call('LPUSH', ARGV[1], backed_up[i])\n end\n redis.call('DEL', ARGV[2])\n " "0" "asgi:specific.VhOuiFYa!" "asgi:specific.VhOuiFYa!$inflight" +1542562767.664477 [0 lua] "lrange" "asgi:specific.VhOuiFYa!$inflight" "0" "-1" +1542562767.664486 [0 lua] "del" "asgi:specific.VhOuiFYa!$inflight" +1542562767.665996 [0 172.30.2.213:55348] "brpoplpush" "asgi:specific.VhOuiFYa!" "asgi:specific.VhOuiFYa!$inflight" "5" +1542562772.920181 [0 172.30.2.213:55348] "eval" "\n local backed_up = redis.call('LRANGE', ARGV[2], 0, -1)\n for i = #backed_up, 1, -1 do\n redis.call('LPUSH', ARGV[1], backed_up[i])\n end\n redis.call('DEL', ARGV[2])\n " "0" "asgi:specific.VhOuiFYa!" "asgi:specific.VhOuiFYa!$inflight" +1542562772.920237 [0 lua] "lrange" "asgi:specific.VhOuiFYa!$inflight" "0" "-1" +1542562772.920246 [0 lua] "del" "asgi:specific.VhOuiFYa!$inflight" +1542562772.921701 [0 172.30.2.213:55348] "brpoplpush" "asgi:specific.VhOuiFYa!" "asgi:specific.VhOuiFYa!$inflight" "5" +1542562778.049731 [0 172.30.2.213:55348] "eval" "\n local backed_up = redis.call('LRANGE', ARGV[2], 0, -1)\n for i = #backed_up, 1, -1 do\n redis.call('LPUSH', ARGV[1], backed_up[i])\n end\n redis.call('DEL', ARGV[2])\n " "0" "asgi:specific.VhOuiFYa!" "asgi:specific.VhOuiFYa!$inflight" +1542562778.049789 [0 lua] "lrange" "asgi:specific.VhOuiFYa!$inflight" "0" "-1" +1542562778.049799 [0 lua] "del" "asgi:specific.VhOuiFYa!$inflight" +1542562778.051274 [0 172.30.2.213:55348] "brpoplpush" "asgi:specific.VhOuiFYa!" "asgi:specific.VhOuiFYa!$inflight" "5" +1542562779.622664 [0 172.30.2.213:54476] "llen" "asgi:specific.VhOuiFYa!" +1542562779.624265 [0 172.30.2.213:54476] "lpush" "asgi:specific.VhOuiFYa!" +1542562779.626234 [0 172.30.2.213:55348] "brpop" "asgi:specific.VhOuiFYa!$inflight" "0" +1542562779.626447 [0 172.30.2.213:54476] "expire" "asgi:specific.VhOuiFYa!" "60" +1542562779.629902 [0 172.30.2.213:55348] "eval" "\n local backed_up = redis.call('LRANGE', ARGV[2], 0, -1)\n for i = #backed_up, 1, -1 do\n redis.call('LPUSH', ARGV[1], backed_up[i])\n end\n redis.call('DEL', ARGV[2])\n " "0" "asgi:specific.VhOuiFYa!" "asgi:specific.VhOuiFYa!$inflight" +1542562779.629952 [0 lua] "lrange" "asgi:specific.VhOuiFYa!$inflight" "0" "-1" +1542562779.629961 [0 lua] "del" "asgi:specific.VhOuiFYa!$inflight" +1542562779.631445 [0 172.30.2.213:55348] "brpoplpush" "asgi:specific.VhOuiFYa!" "asgi:specific.VhOuiFYa!$inflight" "5" +1542562784.730905 [0 172.30.2.213:54476] "llen" "asgi:specific.VhOuiFYa!" +1542562784.732442 [0 172.30.2.213:54476] "lpush" "asgi:specific.VhOuiFYa!" +1542562784.734218 [0 172.30.2.213:54476] "expire" "asgi:specific.VhOuiFYa!" "60" +1542562784.734412 [0 172.30.2.213:55348] "brpop" "asgi:specific.VhOuiFYa!$inflight" "0" +1542562784.737969 [0 172.30.2.213:55348] "eval" "\n local backed_up = redis.call('LRANGE', ARGV[2], 0, -1)\n for i = #backed_up, 1, -1 do\n redis.call('LPUSH', ARGV[1], backed_up[i])\n end\n redis.call('DEL', ARGV[2])\n " "0" "asgi:specific.VhOuiFYa!" "asgi:specific.VhOuiFYa!$inflight" +1542562784.738022 [0 lua] "lrange" "asgi:specific.VhOuiFYa!$inflight" "0" "-1" +1542562784.738030 [0 lua] "del" "asgi:specific.VhOuiFYa!$inflight" +1542562784.739534 [0 172.30.2.213:55348] "brpoplpush" "asgi:specific.VhOuiFYa!" "asgi:specific.VhOuiFYa!$inflight" "5" +1542562784.926082 [0 172.30.1.243:50576] "llen" "asgi:specific.VhOuiFYa!" +1542562784.926817 [0 172.30.1.243:50576] "lpush" "asgi:specific.VhOuiFYa!" +1542562784.927628 [0 172.30.1.243:50576] "expire" "asgi:specific.VhOuiFYa!" "60" +1542562784.929140 [0 172.30.2.213:55348] "brpop" "asgi:specific.VhOuiFYa!$inflight" "0" +1542562784.933055 [0 172.30.2.213:55348] "eval" "\n local backed_up = redis.call('LRANGE', ARGV[2], 0, -1)\n for i = #backed_up, 1, -1 do\n redis.call('LPUSH', ARGV[1], backed_up[i])\n end\n redis.call('DEL', ARGV[2])\n " "0" "asgi:specific.VhOuiFYa!" "asgi:specific.VhOuiFYa!$inflight" +1542562784.933104 [0 lua] "lrange" "asgi:specific.VhOuiFYa!$inflight" "0" "-1" +1542562784.933113 [0 lua] "del" "asgi:specific.VhOuiFYa!$inflight" +1542562784.934688 [0 172.30.2.213:55348] "brpoplpush" "asgi:specific.VhOuiFYa!" "asgi:specific.VhOuiFYa!$inflight" "5" +1542562788.229071 [0 172.30.1.243:50644] "llen" "asgi:specific.VhOuiFYa!" +1542562788.229929 [0 172.30.1.243:50644] "lpush" "asgi:specific.VhOuiFYa!" +1542562788.230680 [0 172.30.1.243:50644] "expire" "asgi:specific.VhOuiFYa!" "60" +1542562788.232263 [0 172.30.2.213:55348] "brpop" "asgi:specific.VhOuiFYa!$inflight" "0" +1542562788.237090 [0 172.30.2.213:55348] "eval" "\n local backed_up = redis.call('LRANGE', ARGV[2], 0, -1)\n for i = #backed_up, 1, -1 do\n redis.call('LPUSH', ARGV[1], backed_up[i])\n end\n redis.call('DEL', ARGV[2])\n " "0" "asgi:specific.VhOuiFYa!" "asgi:specific.VhOuiFYa!$inflight" +1542562788.237138 [0 lua] "lrange" "asgi:specific.VhOuiFYa!$inflight" "0" "-1" +1542562788.237147 [0 lua] "del" "asgi:specific.VhOuiFYa!$inflight" +1542562788.238730 [0 172.30.2.213:55348] "brpoplpush" "asgi:specific.VhOuiFYa!" "asgi:specific.VhOuiFYa!$inflight" "5" +1542562788.627279 [0 172.30.1.243:50644] "llen" "asgi:specific.VhOuiFYa!" +1542562788.628044 [0 172.30.1.243:50644] "lpush" "asgi:specific.VhOuiFYa!" +1542562788.628909 [0 172.30.1.243:50644] "expire" "asgi:specific.VhOuiFYa!" "60" +1542562788.630558 [0 172.30.2.213:55348] "brpop" "asgi:specific.VhOuiFYa!$inflight" "0" +1542562788.639462 [0 172.30.2.213:55348] "eval" "\n local backed_up = redis.call('LRANGE', ARGV[2], 0, -1)\n for i = #backed_up, 1, -1 do\n redis.call('LPUSH', ARGV[1], backed_up[i])\n end\n redis.call('DEL', ARGV[2])\n " "0" "asgi:specific.VhOuiFYa!" "asgi:specific.VhOuiFYa!$inflight" +1542562788.639511 [0 lua] "lrange" "asgi:specific.VhOuiFYa!$inflight" "0" "-1" +1542562788.639520 [0 lua] "del" "asgi:specific.VhOuiFYa!$inflight" +1542562788.641432 [0 172.30.2.213:55348] "brpoplpush" "asgi:specific.VhOuiFYa!" "asgi:specific.VhOuiFYa!$inflight" "5" +1542562791.845991 [0 172.30.2.213:54476] "llen" "asgi:specific.VhOuiFYa!" +1542562791.847563 [0 172.30.2.213:54476] "lpush" "asgi:specific.VhOuiFYa!" +1542562791.849981 [0 172.30.2.213:54476] "expire" "asgi:specific.VhOuiFYa!" "60" +1542562791.850158 [0 172.30.2.213:55348] "brpop" "asgi:specific.VhOuiFYa!$inflight" "0" +1542562795.742859 [0 172.30.1.243:50644] "llen" "asgi:specific.VhOuiFYa!" +1542562795.743758 [0 172.30.1.243:50644] "lpush" "asgi:specific.VhOuiFYa!" +1542562795.744502 [0 172.30.1.243:50644] "expire" "asgi:specific.VhOuiFYa!" "60" +1542562795.840877 [0 172.30.1.243:50644] "llen" "asgi:specific.VhOuiFYa!" +1542562795.841651 [0 172.30.1.243:50644] "lpush" "asgi:specific.VhOuiFYa!" +1542562795.842366 [0 172.30.1.243:50644] "expire" "asgi:specific.VhOuiFYa!" "60" +1542562796.847021 [0 172.30.1.243:50644] "llen" "asgi:specific.VhOuiFYa!" +1542562796.848291 [0 172.30.1.243:50644] "lpush" "asgi:specific.VhOuiFYa!" +1542562796.848978 [0 172.30.1.243:50644] "expire" "asgi:specific.VhOuiFYa!" "60" +1542562797.832308 [0 172.30.1.243:50644] "llen" "asgi:specific.VhOuiFYa!" +1542562797.833167 [0 172.30.1.243:50644] "lpush" "asgi:specific.VhOuiFYa!" +1542562797.833873 [0 172.30.1.243:50644] "expire" "asgi:specific.VhOuiFYa!" "60" +1542562798.847320 [0 172.30.1.243:50644] "llen" "asgi:specific.VhOuiFYa!" +1542562798.848146 [0 172.30.1.243:50644] "lpush" "asgi:specific.VhOuiFYa!" +1542562798.848832 [0 172.30.1.243:50644] "expire" "asgi:specific.VhOuiFYa!" "60"

And here's freeze output (Python 3.6.5):

aioredis==1.2.0 asgiref==2.3.2 async-timeout==3.0.1 attrs==18.2.0 autobahn==18.11.1 Automat==0.7.0 channels==2.1.5 channels-redis==2.3.1 constantly==15.1.0 daphne==2.2.3 Django==2.1.3 django-health-check==3.8.0 django-redis-cache==1.8.0 hiredis==0.2.0 hyperlink==18.0.0 idna==2.7 incremental==17.5.0 msgpack==0.5.6 mysqlclient==1.3.13 PyHamcrest==1.9.0 pytz==2018.7 redis==2.10.6 six==1.11.0 Twisted==18.9.0 txaio==18.8.1 zope.interface==4.6.0

khpeterson commented 5 years ago

I spent more time reviewing channels_redis/core.py and believe I found a window where receive_lock is not released:

                tasks = [
                    self.receive_lock.acquire(),
                    self.receive_buffer[channel].get(),
                ]
                tasks = [asyncio.ensure_future(task) for task in tasks]
                try:
                    done, pending = await asyncio.wait(
                        tasks, return_when=asyncio.FIRST_COMPLETED
                    )
                    for task in pending:
                        # Cancel all pending tasks.
                        task.cancel()
                except asyncio.CancelledError:
                    # Ensure all tasks are cancelled if we are cancelled.
                    # Also see: https://bugs.python.org/issue23859
                    for task in tasks:
                        task.cancel()

                    raise

I believe it's possible for a task to be in the FINISHED state when the asyncio.CancelledError exception is handled above. In this case, task.cancel() will return False, and if this task happens to be the future for receive_lock.acquire() then the lock will never be released.

I changed my version of core.py to look like this:

                except asyncio.CancelledError:
                    # Ensure all tasks are cancelled if we are cancelled.
                    # Also see: https://bugs.python.org/issue23859
                    for task in tasks:
                        cancelOK = task.cancel()
                        assert cancelOK

                    raise

And sure enough I'm seeing the assertion in my daphne logs before the ChannelFull exceptions start.

2018-11-21 23:24:58,177 daphne.server ERROR Exception inside application: assert cancelOK . . . 2018-11-21 23:30:45,970 relay.devices ERROR Events/receive: caught ChannelFull, device_id = dummyid.4, channel = specific.QRUgfpZV!NhjHIbKcYeZc

khpeterson commented 5 years ago

I think this fix will take care of the unreleased lock - what do you think?

diff --git a/channels_redis/core.py b/channels_redis/core.py
index b3e5a98..455c78e 100644
--- a/channels_redis/core.py
+++ b/channels_redis/core.py
@@ -384,7 +384,11 @@ class RedisChannelLayer(BaseChannelLayer):
                         # Ensure all tasks are cancelled if we are cancelled.
                         # Also see: https://bugs.python.org/issue23859
                         for task in tasks:
-                            task.cancel()
+                            cancelOK = task.cancel()
+                            if not cancelOK:
+                                assert task.done()
+                                if task.result() is True:
+                                    self.receive_lock.release()

                         raise
ghost commented 5 years ago

We're having the same exact issue on production as per closed issue: https://github.com/django/channels_redis/issues/132

Although the error that I've mentioned it was due Python 3.5 or another package, now everything works with no exception and the channel layer still seems to "break" after a while.

I can reproduce on my local environment just reloading a bunch of tabs with our chat system (and forcing a lot of connect/disconnects). Unfortunately I don't have time to test on a simple code base to attach here.

After a while, without any errors, the messages sent via group_send doesn't arrive anymore.

What we're testing since yesterday was reverting back to 2.2.1 and it seems fine for now (we're doing some more tests and will put this on production soon).

PS: we only use a single Daphne process.

andymikulski commented 5 years ago

What we're testing since yesterday was reverting back to 2.2.1 and it seems fine for now (we're doing some more tests and will put this on production soon). PS: we only use a single Daphne process.

I'm in a similar boat, just chiming in for future devs with the same issue. Single Daphne process, after ~3 connections everything seems to hang. Dropping down to 2.2.1 fixed the issues for me.

carltongibson commented 5 years ago

Dropping down to 2.2.1 fixed the issues for me.

I'll reopen to investigate but, can anyone git bisect down to the change that caused the regression? (That would be very helpful.)