encode / broadcaster

Broadcast channels for async web apps. 📢
BSD 3-Clause "New" or "Revised" License
1.12k stars 120 forks source link

Didn't get any event in subscriber. #140

Closed ChrisNi888 closed 2 months ago

ChrisNi888 commented 2 months ago

I tried to use the sample code in FastAPI, and I can receive client information, but I can't send the received information back. I'm using Redis, and I've checked the Redis logs, and the subscription and publication are normal. I think the problem might be that the broadcast is not correctly subscribed to or read. Does anyone know what the problem is?

async def chatroom_ws_receiver(websocket: WebSocket, broadcast: Broadcast):
    async for message in websocket.iter_text():
        print('服务端收到消息:',message)
        await broadcast.publish(channel="chatroom", message=message)

async def chatroom_ws_sender(websocket: WebSocket, broadcast: Broadcast):
    print('进入sender函数')
    async with broadcast.subscribe(channel="chatroom") as subscriber:
        print('subscriber: ',subscriber)  # this print is ok
        async for event in subscriber:
            print('服务端发送消息:',event.message)  # didm't get this print
            await websocket.send_text(event.message)

@router.websocket("/chat_room")
async def chatroom_ws(
    websocket: WebSocket
):
    await websocket.accept()

async with anyio.create_task_group() as task_group:
        # run until first is complete
    async def run_chatroom_ws_receiver() -> None:
        await chatroom_ws_receiver(websocket=websocket, broadcast=broadcast)
        task_group.cancel_scope.cancel()

    task_group.start_soon(run_chatroom_ws_receiver)
    await chatroom_ws_sender(websocket, broadcast)

If I use redis only, it works well.

async def test_redis_pubsub(redis:Redis):
    p = redis.pubsub()
    await p.subscribe('chatroom')

    # send
    await redis.publish('chatroom', 'Hello, Redis!')

    # receive
    message = await p.get_message(timeout=5)
    if message:
        print("Received:", message)
    else:
        print("No message received")
ChrisNi888 commented 2 months ago

I think I find it.

If I write it directly in the app's path, like this: @app.websocket("/chat_room") the result is normal.

If I create a router, like this:

router = APIRouter(
prefix="/ws",
tags=["WebSocket"],
)

@router.websocket("/chat_room")

the problem mentioned above will occur. The reason is unclear. Maybe there is something I don't know about Fastapi.

ChrisNi888 commented 2 months ago

close