django / channels

Developer-friendly asynchrony for Django
https://channels.readthedocs.io
BSD 3-Clause "New" or "Revised" License
6.02k stars 794 forks source link

Channel Layer doesn't work for AsyncHttpConsumer #1302

Open krhitesh opened 5 years ago

krhitesh commented 5 years ago
wuqianlin commented 5 years ago

I meet almost the same problem: The requirements:

channels==2.2.0
channels-redis==2.4.0
daphne==2.3.0
Django==2.1.7
svn==0.3.46
redis==3.2.0

When I use the Postman test the follow code:

class LongPollConsumer(AsyncHttpConsumer):
    async def handle(self, body):
        print(self.channel_name)
        headers = [(b"Content-Type", b"application/json")]
        await self.send({"type": "http_response_start", "status": 200, "headers": headers})
        await self.send({"type": "http_response_body", "body": b"response", "more_body": False})

    async def disconnect(self):
        print('disconnected!!!!')

console output:

specific.HlmtREry!uwboHUsrTBjg
disconnected!!!!

the Postman always state "Loading..." status;use the chrome to test, it keep "padding" status. According to the document https://channels.readthedocs.io/en/latest/topics/consumers.html.

malefice commented 5 years ago

I can confirm this issue. My AsyncHttpConsumer subclass is mostly based on the example provided in the docs. I even hardcoded the group name for debugging purposes.

from channels.generic.http import AsyncHttpConsumer

class ServerSentEventsConsumer(AsyncHttpConsumer):
    async def handle(self, body):
        await self.channel_layer.group_add('test123', self.channel_name)
        await self.send_headers(headers=[
            (b"Cache-Control", b"no-cache"),
            (b"Content-Type", b"text/event-stream"),
            (b"Transfer-Encoding", b"chunked"),
        ])
        await self.send_body(b'', more_body=True)

    async def event_message(self, event):
        payload = 'data: test\n\n'
        await self.send_body(payload.encode('utf-8'), more_body=True)

When I open the corresponding URL in a browser, the page will just wait for content which is the expected behavior. If I run the following commands in a Django shell, I expect the event_message to run, and in turn, the browser should display data: test, but it does not.

from channels.layers import get_channel_layer
from asgiref.sync import async_to_sync
channel_layer = get_channel_layer()
async_to_sync(channel_layer.group_send)('test123', {'type': 'event.message'})

I am using channels~=2.2, channels-redis~=2.4, and Django~=1.11.

malefice commented 5 years ago

So, I did a little more digging into the channels source code, and my investigation led me to this and this.

The await_many_dispatch method is the one responsible for dispatching stuff like http.request, websocket.connect, and all other custom "events" sent. For AsyncHttpConsumer, the http.request event will be dispatched to the http_request method.

According to the source code, the http_request method will always raise a StopConsumer exception once the handle method has finished. This exception will, in turn, trigger the finally clause in await_many_dispatch which will cancel all tasks, including the task handling channel layer events.

Putting in a while loop inside the handle method will also not work, because it is called from inside the task handling http.request, and if that task does not end, the task handling channel layer events will not be given a chance to run.

What I did to validate my findings was to use a keepalive flag that will be turned on if send_body was invoked with more_body=True, and if that flag is on, the StopConsumer exception will not be raised. With the example code below, I can get the chat_message method to trigger.

class SseConsumer(AsyncHttpConsumer):

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.keepalive = False

    async def handle(self, body):
        print('start handle')
        await self.send_headers(headers=[
            (b'Cache-Control', b'no-cache'),
            (b'Content-Type', b'text/event-stream'),
            (b"Transfer-Encoding", b"chunked"),
            (b'Access-Control-Allow-Origin', b'*'),
        ])

        await self.send_body(b'', more_body=True)
        await self.channel_layer.group_add('test123', self.channel_name)

    async def send_body(self, body, *, more_body=False):
        if more_body:
            self.keepalive = True
        assert isinstance(body, bytes), "Body is not bytes"
        await self.send(
            {"type": "http.response.body", "body": body, "more_body": more_body}
        )

    async def http_request(self, message):
        if "body" in message:
            self.body.append(message["body"])
        if not message.get("more_body"):
            try:
                await self.handle(b"".join(self.body))
            finally:
                if not self.keepalive:
                    await self.disconnect()
                    raise StopConsumer()

    async def chat_message(self, event):
        payload = 'event: test\ndata: 2\n\n'
        await self.send_body(payload.encode('utf-8'), more_body=True)

I am not sure what other stuff this might break, but I can submit a PR if this is adequate, otherwise I will defer to the masters. At the very least, part of the investigation has already been performed, and hopefully this helped save some time.

mukuzz commented 3 years ago

@malefice I'm too trying to implement SSE using AsyncHttpConsumer. Your fix works for me but are there any side effects of this?

kenjones21 commented 3 years ago

@malefice I'm too trying to implement SSE using AsyncHttpConsumer. Your fix works for me but are there any side effects of this?

Not the author, but I used the fix for several months with no observed side effects (many thanks to @malefice). That said, we weren't using channels for anything besides Server sent events, so YMMV.

rohit20001221 commented 2 years ago


class SseConsumer(AsyncHttpConsumer):

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.keepalive = False

    async def handle(self, body):
        print('start handle')
        await self.send_headers(headers=[
            (b'Cache-Control', b'no-cache'),
            (b'Content-Type', b'text/event-stream'),
            (b"Transfer-Encoding", b"chunked"),
            (b'Access-Control-Allow-Origin', b'*'),
        ])

        await self.send_body(b'', more_body=True)
        await self.channel_layer.group_add(self.group_name, self.channel_name)

    async def send_body(self, body, *, more_body=False):
        if more_body:
            self.keepalive = True
        assert isinstance(body, bytes), "Body is not bytes"
        await self.send(
            {"type": "http.response.body", "body": body, "more_body": more_body}
        )

    async def http_request(self, message):
        if "body" in message:
            self.body.append(message["body"])
        if not message.get("more_body"):
            try:
                await self.handle(b"".join(self.body))
            finally:
                if not self.keepalive:
                    await self.disconnect()
                    raise StopConsumer()

modified the SseConsumer to Generalize it you can see the below example for reference

class ExampleNotifier(SseConsumer):
    group_name = 'example_group'

    async def handle_user_data(self, event):
        payload = 'data: %s\n\n' % json.dumps(event["message"])

        await self.send_body(payload.encode('utf-8'), more_body=True)

await layer.group_send('example_group', {'message': {'user': 'krishna'}, "type":"handle.user.data"})