vxgmichel / aiostream

Generator-based operators for asynchronous iteration
http://aiostream.readthedocs.io
GNU General Public License v3.0
800 stars 34 forks source link

Idea: Broadcasting to multiple consumers simultaneously #55

Open andersea opened 4 years ago

andersea commented 4 years ago

I have a few instances where I need to send the same items to multiple consumers. I have a working implementation for trio.

def broadcast(aiter):
    send_channels = []
    lock = trio.StrictFIFOLock()

    async def listen():
        send_channel, receive_channel = trio.open_memory_channel(1)
        send_channels.append(send_channel)
        try:
            while True:
                async with lock:
                    try:
                        yield receive_channel.receive_nowait()
                    except trio.WouldBlock:
                        value = await aiter.__anext__()
                        for s in send_channels:
                            if s != send_channel:
                                s.send_nowait(value)
                        yield value
        finally:
            send_channels.remove(send_channel)

    return listen

This relies on the lock being fair. Otherwise what could happen is, that if one listener somehow got ahead of the pack, s.send_nowait could try to send to a full channel, which would cause the listen function to throw a trio.WouldBlock error.

What do you think? Is this of interest for aiostream?

vxgmichel commented 4 years ago

Hi @andersea !

I have a few instances where I need to send the same items to multiple consumers.

There's a related issue about this use case: #35

It's not implemented yet but I think it makes sense. It's somehow related to the use case you described previously, i.e dispatching to several streams. I think it could be interesting to use a similar syntax for both cases:

# Cold stream - dispatching
producer_stream = stream.iterate(produce())
async with producer_stream.stream() as producer:
    [...]

# Hot stream - broadcasting
producer_stream = stream.iterate(produce())
async with producer_stream.hotstream() as hot_producer:
    [...]

What do you think?

andersea commented 4 years ago

Ok. I have read through #35

It does look like the same basic idea. It behaves a bit differently from what I need. If I understand it correctly, listeners that lag behind will potentially lose messages in that implementation, since the hotstream will replace futures if the queue is full. This seems undesirable for me. I would prefer to apply back pressure in this case.

The implementation wouldn't be directly trio compatible, as far as I can tell, since you don't have tasks in trio.

andersea commented 4 years ago

My broadcast implementation is not safe, I have found.

Here is a simple example that causes a deadlock:

import string

import trio
from aiostream import stream

from broadcast import broadcast

async def producer():
    i = 0
    lowercase_letters = string.ascii_lowercase
    while True:
        for letter in lowercase_letters:
            value = (i % 2, letter)
            print(f'Producer yielding {value}')
            yield value
            i += 1
            await trio.sleep(.5)

async def main():
    listen = broadcast(producer())
    zipped = stream.zip(
        stream.filter(listen(), lambda i: i[0] == 0),
        stream.filter(listen(), lambda i: i[0] == 1)
    )
    async with zipped.stream() as s:
        async for item in s:
            print(item)

trio.run(main)

The deadlock happens after one item has been received and the zip function gets stuck waiting for the second item forever. I am actually not entirely sure why, yet. But at least after a couple of days of debugging I have a simple example to reproduce it.

andersea commented 4 years ago

I managed to get it working.

Although I was not able to exactly analyze how the deadlocks occured, I am able to gather this much:

First off, there is lock contention. The two listeners will compete for the lock not allowing read because another coroutine is trying to write, or vise versa.

Secondly, the channel buffer maxes out. This happens because the producer can generate items, that whoever is listening is not interested in. These items pile op on the channel buffer. The only solution I see is to either prefilter the items, before they are allowed into the channel buffer, or else allow the channel buffer to grow to infinite size. In my use case, the consumers should always be able to catch up to the producer eventually, so although this isn't what I would ideally like, it is not likely to cause any further problems.

So it seems backpressure can be bad after all. :)

andersea commented 4 years ago

Here is my current implementation.

def broadcast(aiter):
    send_channels = []
    lock = trio.Lock()
    id = count()

    async def listen():
        send_channel, receive_channel = trio.open_memory_channel(math.inf)
        send_channels.append(send_channel)
        my_id = next(id)
        try:
            log.debug('Listener %s starting' % my_id)
            while True:
                try:
                    log.debug('Listener %s attempting to get next item.', my_id)
                    yield receive_channel.receive_nowait()
                except trio.WouldBlock:
                    # Receive channel is empty, try broadcasting the next value.
                    if lock.locked():
                        # Another listener is currently broadcasting.
                        # Wait for the next value to arrive.
                        log.debug('Listener %s would have blocked acquiring the lock. Wait for next value instead.', my_id)
                        yield await receive_channel.receive()
                    else:
                        log.debug('Listener %s queue empty. Acquiring lock.', my_id)
                        lock.acquire_nowait()
                        # We got the lock. Get next value and broadcast it.
                        log.debug('Listener %s acquired the lock. Getting next item.', my_id)
                        value = await aiter.__anext__()
                        log.debug('Listener %s broadcasting.', my_id)
                        for i, s in enumerate(send_channels):
                            # Will apply backpressure until all listeners
                            # have processed the previous value.
                            log.debug('Listener %s sending to listener %s.', my_id, i+1)
                            s.send_nowait(value)
                        log.debug('Listener %s completed broadcast. Releasing lock.', my_id)
                        lock.release()
        except trio.EndOfChannel:
            log.debug('Listener %s channel closed.', my_id)
        except StopAsyncIteration:
            log.debug('Listener %s exhausted the generator. Closing send channels.', my_id)
            for s in send_channels:
                await s.aclose()
        finally:
            log.debug('Listener %s quit. Removing own send channel.', my_id)
            send_channels.remove(send_channel)

    return listen