dolamroth / starlette-web

Asynchronous web framework, based on Starlette and inspired by Django
MIT License
3 stars 1 forks source link

Deadlock in Channel with InMemoryChannelLayer, if same channel is used to subscribe in topic A and publish to topic B #68

Closed dolamroth closed 7 months ago

dolamroth commented 7 months ago

This deadlocks

import string
from typing import Any
import random
import uuid

from starlette_web.common.channels.base import Channel
from starlette_web.common.channels.layers.local_memory import InMemoryChannelLayer
from starlette_web.contrib.redis.channel_layers import RedisPubSubChannelLayer
import anyio

RAND_RANGE = (1, 2)
CHANNEL = 'letters'

class PrintWorker:
    def __init__(self, channel) -> None:
        self.channel = channel

    async def return_item(self, item: Any) -> Any:
        sleep = random.randint(*RAND_RANGE)
        pw_ri_res = f'{sleep}__{item}'
        print(pw_ri_res)
        return pw_ri_res

    async def print_from_broadcast(self) -> None:
        async with self.channel.subscribe(CHANNEL) as subscriber:
            async for event in subscriber:
                pw_pfb_res = await self.return_item(event.message)
                await self.channel.publish('block', message=pw_pfb_res)

class AsyncLetterGenerator:
    letters = list(string.ascii_uppercase)

    def __init__(self, channel) -> None:
        self.channel = channel
        self._id = uuid.uuid4()

    async def return_letter_generator(self, length):
        for _ in range(length):
            sleep = random.randint(*RAND_RANGE)
            await anyio.sleep(sleep)
            alg_rlg_res = random.choice(self.letters) + f"_{self._id}"
            await self.channel.publish(CHANNEL, message=alg_rlg_res)

class ProcessBlocker:
    def __init__(self, channel) -> None:
        self.channel = channel

    async def try_block(self):
        async with self.channel.subscribe('block') as subscriber:
            async for event in subscriber:
                print('before publish to hard-block', event)
                await self.channel.publish('hard-block', message='message to publish')
                print('after publish to hard-block', event)

async def main() -> Any:
    channel_layer = InMemoryChannelLayer()
    # channel_layer = RedisPubSubChannelLayer(host="localhost", port=6379, db=1)

    async with Channel(channel_layer) as channel:
        pw = PrintWorker(channel)
        alg1 = AsyncLetterGenerator(channel)
        alg2 = AsyncLetterGenerator(channel)
        alg3 = AsyncLetterGenerator(channel)
        alg4 = AsyncLetterGenerator(channel)
        async_blocker = ProcessBlocker(channel)

        async with anyio.create_task_group() as task_group:
            async def run_return_receiver() -> None:
                await pw.print_from_broadcast()
                task_group.cancel_scope.cancel()

            async def run_letter_senders() -> None:
                task_group.start_soon(alg1.return_letter_generator, 5)
                task_group.start_soon(alg2.return_letter_generator, 5)
                task_group.start_soon(alg3.return_letter_generator, 5)
                task_group.start_soon(alg4.return_letter_generator, 5)

            async def listener(channel) -> None:
                async with channel.subscribe('hard-block') as subscriber:
                    async for event in subscriber:
                        print(event)

            task_group.start_soon(run_return_receiver)
            task_group.start_soon(run_letter_senders)
            task_group.start_soon(async_blocker.try_block)
            task_group.start_soon(listener, channel)
            # await run_letter_senders()

if __name__ == "__main__":
    results: Any = anyio.run(main)