airtai / faststream

FastStream is a powerful and easy-to-use Python framework for building asynchronous services interacting with event streams such as Apache Kafka, RabbitMQ, NATS and Redis.
https://faststream.airt.ai/latest/
Apache License 2.0
2.12k stars 99 forks source link

Bug: StreamMessage.reject() got an unexpected keyword argument 'redis' #1544

Closed allanwakes closed 1 week ago

allanwakes commented 1 week ago

Describe the bug I tried to use raise NackMessage() in my code, but the exception StreamMessage.reject() got an unexpected keyword argument 'redis' pops out.

How to reproduce Include source code:

The subscriber side:

import time
import asyncio

from faststream import FastStream, Logger
from faststream.redis import RedisBroker, StreamSub
from faststream.redis.annotations import RedisMessage, Redis
from faststream.exceptions import NackMessage

broker = RedisBroker("redis://default:password@localhost:6379")
app = FastStream(broker)

async def execute_order(order):
    print(f"Executing order: {order['order_id']}")
    await asyncio.sleep(1)
    print(f"Order executed: {order['order_id']}")

@broker.subscriber(stream=StreamSub("test-stream", group="test-group", consumer="1"))
async def handle(body: dict, logger: Logger, msg: RedisMessage, redis: Redis):
    logger.info(f">>>> I'm 1 con, {body}")
    current_time = int(time.time())
    if current_time >= body.get("execution_time"):
        await execute_order(body)
        await msg.ack(redis)
        logger.info(f">>>> I'm 1, {body} is executed and acked.")
    else:
        # I thought by nacking this message, this consumer would be notified by the same message later (cause it's not processed), 
        # but I was wrong.
        # this message went back to stream, and never came back to consumer 1.
        # await msg.nack()
        logger.info(f">> I'm 1, {body} is not executed and acked.")
        raise NackMessage()

The publisher side:

import asyncio
import random
import time

from faststream.redis import RedisBroker

def my_random(d):
    ''' Generates a random number with d digits '''
    return random.randrange(10**(d-1), 10**d)

async def main(msg: dict):
    async with RedisBroker("redis://default:password@localhost:6379") as br:
        await br.publish(msg, stream="test-stream")

if __name__ == "__main__":
    delay_minutes = 5
    payload = {"order_id": str(my_random(6)), "execution_time": int(time.time()) + (delay_minutes * 60)}
    asyncio.run(main(payload))

Expected behavior after 5 min, this order will be executed...

Observed behavior StreamMessage.reject() got an unexpected keyword argument 'redis'

Screenshots N/A

Environment Running FastStream 0.5.12 with CPython 3.10.13 on Linux

Additional context N/A

Lancetnik commented 1 week ago

Thank you for the report, I think, we can upload the fix today

Lancetnik commented 1 week ago

Btw, you can do not call await msg.ack(redis) - it will be called automatically after your function execution if there is no exceptions