airtai / faststream

FastStream is a powerful and easy-to-use Python framework for building asynchronous services interacting with event streams such as Apache Kafka, RabbitMQ, NATS and Redis.
https://faststream.airt.ai/latest/
Apache License 2.0
3.12k stars 160 forks source link

feature: concurrent Redis consuming #1507

Open ryanrain2016 opened 5 months ago

ryanrain2016 commented 5 months ago

Describe the bug It seems tasks don't run in parallel

How to reproduce Include source code:

import asyncio

from faststream import FastStream
from faststream.redis import RedisBroker
from pydantic import BaseModel

redis_dsn = 'xxxx'
rb = RedisBroker(redis_dsn)

class User(BaseModel):
    name: str
    age: int = 0

@rb.subscriber(list="users")
async def my_listener(user: User):
    await asyncio.sleep(3)
    print(user, 'from faststream')

async def producer():
    for i in range(10):
        await rb.publish(User(name="Bob", age=i), list="users")

async def main():
    await rb.connect()
    asyncio.create_task(producer())
    app = FastStream(rb)
    await app.run()

if __name__ == '__main__':
    asyncio.run(main())

And/Or steps to reproduce the behavior:

run the script above

Expected behavior task should run in parallel

Observed behavior task run one after one

Lancetnik commented 5 months ago

Sure, this is how RedisLists pub/sub works - we consumes the one message, process it and then take the next one. We can figure out about concurrency, but it can leads to undefined consuming logic and I can't promise this feature possibility

Lancetnik commented 5 months ago

Anyway, you can consume messages in batches - it should speed up your services working.

ryanrain2016 commented 5 months ago

Anyway, you can consume messages in batches - it should speed up your services working. Do you mean code like this?


@rb.subscriber(list=ListSub("users", batch=True))
async def my_listener(user: List[User]):
await asyncio.sleep(3)
print(user, 'from faststream')

async def producer():

for i in range(10):

#     await rb.publish_batch(User(name="Bob", age=i), list="users")
await rb.publish_batch(*[User(name="Bob", age=i) for i in range(10)], list="users")
I doesn't work. No message cosumed, with the output below.

2024-06-07 14:24:44,921 INFO - FastStream app starting... 2024-06-07 14:24:44,921 INFO - users | - MyListener waiting for messages 2024-06-07 14:24:44,922 INFO - FastStream app started successfully! To exit, press CTRL+C

ryanrain2016 commented 5 months ago

messages in redis like this image

ryanrain2016 commented 5 months ago

I find a way. Just push the message to taskiq in a faststream task. The taskiq works just as expected.

Lancetnik commented 5 months ago

I don't sure why your batch example doesn't work, but my works as expected:

from faststream import FastStream
from faststream.redis import RedisBroker, ListSub

rb = RedisBroker()
app = FastStream(rb)

@rb.subscriber(list=ListSub("users", batch=True))
async def my_listener(user: list[str]):
    print(user, "from faststream")

@app.after_startup
async def producer():
    await rb.publish_batch(*["bob"] * 10, list="users")

With the output:

2024-06-07 17:59:34,313 INFO     - FastStream app starting...
2024-06-07 17:59:34,314 INFO     - users |            - `MyListener` waiting for messages
2024-06-07 17:59:34,319 INFO     - FastStream app started successfully! To exit, press CTRL+C
2024-06-07 17:59:34,320 INFO     - users | 34df713e-6 - Received
['bob', 'bob', 'bob', 'bob', 'bob', 'bob', 'bob', 'bob', 'bob', 'bob'] from faststream
2024-06-07 17:59:34,320 INFO     - users | 34df713e-6 - Processed
Lancetnik commented 5 months ago

Anyway, I'll add max_workers option the same with NATS subscriber way to support concurency

theobouwman commented 1 month ago

If you start multiple instances of the script @Lancetnik , would they automatically distribute the messages across the instances?

Lancetnik commented 1 month ago

If you start multiple instances of the script @Lancetnik , would they automatically distribute the messages across the instances?

In the List subscription case - yes

ludaavics commented 6 days ago

If you start multiple instances of the script @Lancetnik , would they automatically distribute the messages across the instances?

In the List subscription case - yes

@Lancetnik Could you re-confirm this ? When running with multiple workers, I see that each worker processes each of the jobs, as opposed to the load being split among the workers.

Taking a step back, what's the recommended way of achieving concurrent execution with a redis backend (with a single worker)? Can it be achieved with stream consumer groups? @ryanrain2016 is like you mind sharing your taskiq approach?

Thanks!

Lancetnik commented 5 days ago

@ludaavics - one more time

Redis Pub/Sub broadcasts messages to all listeners. So, in the following case all workers will consume the same message

@broker.subscriber(channel="test")
async def handler(msg):
    print(msg)
...
await broker.publish("Hi!", channel="test")

Redis List allows to deliver messages concurrently - the message will be delivered only to the one of workers

@broker.subscriber(list="test")
async def handler(msg):
    print(msg)
...
await broker.publish("Hi!", list="test")

All Redis Stream subscribers consumes messages separately by default, so your workers will consume copies of messages (like in Pub/Sub case)

@broker.subscriber(stream="test")
async def handler(msg):
    print(msg)
...
await broker.publish("Hi!", stream="test")

But, if you are using queue and consumer names - Redis known, that is the same subsriber and can manage messages around them (the message will be delivered only to the single worker)

@broker.subscriber(stream=StreamSub("test", group="group", consumer="my consumer"))
async def handler(msg):
    print(msg)
...
await broker.publish("Hi!", stream="test")

All these information exists in FastStream documentation already Also, I don't reccomend you to use Redis as a broker any case. If you faced with such problems, probably, you have already outgrown the toolю

ludaavics commented 5 days ago

@Lancetnik Thank you so much for taking the time to respond so thoroughly, especially given all the effort that's already gone into the documentation.

Agreed, I expected Pub/Sub to deliver one copy to each worker and list to act as a FIFO queue, delivering a single copy to an available worker. However, in my experiment, even with list, messages get delivered to every worker. Confusion ensues.

What am I missing?

import asyncio
from toad import settings

from faststream import FastStream
from faststream.redis import RedisBroker
from pydantic import BaseModel

broker = RedisBroker(settings.redis_url)
app = FastStream(broker)

class User(BaseModel):
    name: str
    age: int = 0

@broker.subscriber(list="list-users")
async def my_listener(user: User):
    print(user, "starting work")
    await asyncio.sleep(2)
    print(user, "done working")

@app.after_startup
async def producer():
    for i in range(5):
        await broker.publish(User(name="Bob", age=i), list="list-users")
❯ faststream --version
Running FastStream 0.5.28 with CPython 3.11.10 on Linux
❯ faststream run concur:app --workers 2
2024-11-07 17:18:49,960 INFO     - Started parent process [782803]
2024-11-07 17:18:49,977 INFO     - Started child process [782889]
2024-11-07 17:18:49,979 INFO     - Started child process [782890]
2024-11-07 17:18:55,645 INFO     - list-users |            - `MyListener` waiting for messages
2024-11-07 17:18:55,645 INFO     - list-users |            - `MyListener` waiting for messages
2024-11-07 17:18:55,648 INFO     - list-users | 819b0722-2 - Received
name='Bob' age=0 starting work
2024-11-07 17:18:55,748 INFO     - list-users | 71c71ef2-b - Received
name='Bob' age=0 starting work
name='Bob' age=0 done working
2024-11-07 17:18:57,648 INFO     - list-users | 819b0722-2 - Processed
2024-11-07 17:18:57,649 INFO     - list-users | d8c69c4f-4 - Received
name='Bob' age=1 starting work
name='Bob' age=0 done working
2024-11-07 17:18:57,749 INFO     - list-users | 71c71ef2-b - Processed
2024-11-07 17:18:57,749 INFO     - list-users | e0887636-c - Received
name='Bob' age=2 starting work
name='Bob' age=1 done working
2024-11-07 17:18:59,649 INFO     - list-users | d8c69c4f-4 - Processed
2024-11-07 17:18:59,649 INFO     - list-users | 11e9089f-c - Received
name='Bob' age=3 starting work
name='Bob' age=2 done working
2024-11-07 17:18:59,750 INFO     - list-users | e0887636-c - Processed
2024-11-07 17:18:59,750 INFO     - list-users | 77251547-3 - Received
name='Bob' age=4 starting work
name='Bob' age=3 done working
2024-11-07 17:19:01,650 INFO     - list-users | 11e9089f-c - Processed
2024-11-07 17:19:01,651 INFO     - list-users | 96108f0e-0 - Received
name='Bob' age=1 starting work
name='Bob' age=4 done working
2024-11-07 17:19:01,751 INFO     - list-users | 77251547-3 - Processed
2024-11-07 17:19:01,751 INFO     - list-users | 18efb89e-a - Received
name='Bob' age=2 starting work
name='Bob' age=1 done working
2024-11-07 17:19:03,651 INFO     - list-users | 96108f0e-0 - Processed
2024-11-07 17:19:03,652 INFO     - list-users | 68342783-c - Received
name='Bob' age=3 starting work
name='Bob' age=2 done working
2024-11-07 17:19:03,751 INFO     - list-users | 18efb89e-a - Processed
2024-11-07 17:19:03,752 INFO     - list-users | f7693f0d-4 - Received
name='Bob' age=4 starting work
name='Bob' age=3 done working
2024-11-07 17:19:05,652 INFO     - list-users | 68342783-c - Processed
name='Bob' age=4 done working
2024-11-07 17:19:05,752 INFO     - list-users | f7693f0d-4 - Processed

I am less familiar with stream consumer groups, but if i follow blindly the template above, I get a similar outcome: each message is sent to each worker:

@broker.subscriber(
    stream=StreamSub("stream-users", group="group", consumer="my consumer")
)
async def my_listener(user: User):
    print(user, "starting work")
    await asyncio.sleep(2)
    print(user, "done working")

@app.after_startup
async def producer():
    for i in range(5):
        await broker.publish(User(name="Bob", age=i), stream="stream-users")
❯ faststream run concur:app --workers 2
2024-11-07 17:29:03,509 INFO     - Started parent process [788488]
2024-11-07 17:29:03,520 INFO     - Started child process [788566]
2024-11-07 17:29:03,522 INFO     - Started child process [788567]
2024-11-07 17:29:09,224 INFO     - stream-users |            - `MyListener` waiting for messages
2024-11-07 17:29:09,230 INFO     - stream-users |            - `MyListener` waiting for messages
2024-11-07 17:29:09,410 INFO     - stream-users | d44f7426-a - Received
2024-11-07 17:29:09,410 INFO     - stream-users | 80f5a68e-4 - Received
name='Bob' age=0 starting work
name='Bob' age=0 starting work
name='Bob' age=0 done working
name='Bob' age=0 done working
2024-11-07 17:29:11,411 INFO     - stream-users | 80f5a68e-4 - Processed
2024-11-07 17:29:11,411 INFO     - stream-users | d44f7426-a - Processed
2024-11-07 17:29:11,412 INFO     - stream-users | 7f749459-3 - Received
name='Bob' age=1 starting work
name='Bob' age=1 done working
2024-11-07 17:29:13,412 INFO     - stream-users | 7f749459-3 - Processed
2024-11-07 17:29:13,413 INFO     - stream-users | 96262ed2-7 - Received
name='Bob' age=1 starting work
name='Bob' age=1 done working
2024-11-07 17:29:15,412 INFO     - stream-users | 96262ed2-7 - Processed
2024-11-07 17:29:15,413 INFO     - stream-users | 4913c148-0 - Received
name='Bob' age=2 starting work
name='Bob' age=2 done working
2024-11-07 17:29:17,414 INFO     - stream-users | 4913c148-0 - Processed
2024-11-07 17:29:17,414 INFO     - stream-users | b6718b7e-e - Received
name='Bob' age=3 starting work
name='Bob' age=3 done working
2024-11-07 17:29:19,415 INFO     - stream-users | b6718b7e-e - Processed
2024-11-07 17:29:19,416 INFO     - stream-users | fde7bec3-9 - Received
name='Bob' age=2 starting work
name='Bob' age=2 done working
2024-11-07 17:29:21,416 INFO     - stream-users | fde7bec3-9 - Processed
2024-11-07 17:29:21,417 INFO     - stream-users | f118af29-6 - Received
name='Bob' age=4 starting work
name='Bob' age=4 done working
2024-11-07 17:29:23,417 INFO     - stream-users | f118af29-6 - Processed
2024-11-07 17:29:23,418 INFO     - stream-users | 3ab4337f-e - Received
name='Bob' age=3 starting work
name='Bob' age=3 done working
2024-11-07 17:29:25,418 INFO     - stream-users | 3ab4337f-e - Processed
2024-11-07 17:29:25,419 INFO     - stream-users | 9ecc78dd-e - Received
name='Bob' age=4 starting work
name='Bob' age=4 done working
Lancetnik commented 5 days ago

@ludaavics there is no magic, workers option scale whole application by multiprocessing, so your "after_startup" hooks is calling twice

ludaavics commented 5 days ago

This explains that. Thanks a lot !