airtai / faststream

FastStream is a powerful and easy-to-use Python framework for building asynchronous services interacting with event streams such as Apache Kafka, RabbitMQ, NATS and Redis.
https://faststream.airt.ai/latest/
Apache License 2.0
2.36k stars 118 forks source link

Bug: FastAPI KafkaRouter seems to ignore retry_backoff_ms option #1721

Closed guillaume-lagay closed 2 weeks ago

guillaume-lagay commented 3 weeks ago

Describe the bug I'm creating a subscriber in a FastAPI app using KafkaRouter. When raising a NackMessage, the subscriber immediatly consumes back the failed message without any delay. I tried to add the "retry_backoff_ms" option to my KafkaRouter but it does not change anything and no delay has been considered.

How to reproduce The definition of my KafkaRouter :

from fastapi import FastAPI
from fastapi_lifespan_manager import LifespanManager
from typing import AsyncIterator
from faststream.kafka.fastapi import KafkaRouter
from api.conf import get_kafka_settings

kafka_settings = get_kafka_settings()
router = KafkaRouter(bootstrap_servers=kafka_settings.url, retry_backoff_ms=100000)

def setup_kafka_router(lifespan: LifespanManager):
    @lifespan.add
    async def setup(app: FastAPI) -> AsyncIterator[None]:
        print("Connecting Kafka Router...")
        await router.broker.connect()
        yield

    lifespan.add(router.lifespan_context)
    return router

The subscriber :

@router.subscriber(
    "custom_topic", group_id="group", auto_commit=False, retry=True, auto_offset_reset="earliest"
)
async def handler(
    message: Any,
    logger: Logger,
    service: CustomService = Depends(),
):
    try:
        await service.process(message)
        AckMessage()  # Acknowledge the message
        logger.info("API call succeeded.")
    except CustomError as err:
        raise NackMessage()

Environment

faststream -v
> Running FastStream 0.5.18 with CPython 3.10.2 on Linux

Note : I both tried to import KafkaRouter from aiokafka (faststream.kafka.fastapi) and confluent (faststream.confluent.fastapi) Thank you in advance

Lancetnik commented 3 weeks ago

I am not sure, how this parameter uses in confluent, but the current logic seems correct: you nacked the message, reading offset stay the same and FastStream ask for the next message (the same one due the same offset). Should we sleep after nacking this case? I am not sure

@kumaranvpl what do u think?

kumaranvpl commented 2 weeks ago

@Lancetnik I will look into this later. Isn't this a AIOKafka bug? Because you have tagged Confluent.

Lancetnik commented 2 weeks ago

@Lancetnik I will look into this later. Isn't this a AIOKafka bug? Because you have tagged Confluent.

Sorry, just missclicked

guillaume-lagay commented 2 weeks ago

I edited my original post and added this note btw : Note : I both tried to import KafkaRouter from aiokafka (faststream.kafka.fastapi) and confluent (faststream.confluent.fastapi)

kumaranvpl commented 2 weeks ago

I edited my original post and added this note btw : Note : I both tried to import KafkaRouter from aiokafka (faststream.kafka.fastapi) and confluent (faststream.confluent.fastapi)

Thank you.

Lancetnik commented 2 weeks ago

The only one way to fix it - add anyio.sleep(self.retry_backoff_msg) at consume exception here: https://github.com/airtai/faststream/blob/main/faststream/kafka/subscriber/usecase.py#L223 (it is not so easy due consume method does not return exception. Probably, we can use special middleware) But still not sure, that is required

kumaranvpl commented 2 weeks ago

@Lancetnik @guillaume-lagay

I have spent some time debugging this. I suspected that it is because of the combination of Kafka behaviour and limitation from aiokafka.

As mentioned in this kafka configuration docs, retry.backoff.ms is capped by retry.backoff.max.ms. If you set retry.backoff.ms greater than retry.backoff.max.ms then retry.backoff.max.ms will be used and retry.backoff.ms value will be ignored. In the above example, @guillaume-lagay set retry.backoff.ms value to 100000 but default retry.backoff.max.ms value itself is 1000, so retry.backoff.max.ms value would have been used. And to add salt to the injury, aiokafka does not provides a parameter to change/set the value of retry.backoff.max.ms - https://aiokafka.readthedocs.io/en/stable/api.html#consumer-class.

In theory, if you set retry.backoff.max.ms value greater than retry.backoff.ms then the retry mechanism should work. To test this theory, I have created following confluent app based on the code listed in this issue:

from fastapi import Depends, FastAPI
from pydantic import BaseModel
from faststream.exceptions import NackMessage

# from faststream.kafka.fastapi import KafkaRouter, Logger

from faststream.confluent.fastapi import KafkaRouter, Logger

config = {
    "retry.backoff.ms": 10000,
    "retry.backoff.max.ms": 100000,
}

router = KafkaRouter("localhost:9092", config=config)

class Incoming(BaseModel):
    m: dict

def call():
    return True

@router.subscriber("test", group_id="test", auto_commit=False, retry=True, auto_offset_reset="earliest")
async def hello(m: str, logger: Logger, d=Depends(call)):
    logger.info(m)
    raise NackMessage()

@router.get("/")
async def hello_http():
    return "Hello, HTTP!"

app = FastAPI(lifespan=router.lifespan_context)
app.include_router(router)

@router.after_startup
async def test(*args, **kwargs):
    await router.broker.publish("Hello!", "test")
    print("Published message")

When we raise NackMessage then the code should read the same message again and again but exponentially slower until it hits the retry.backoff.max.ms time. Unfortunately, it doesn't works as per the expectation. The code continuously consumes the message again and again.

It means we have some bug in our Ack, Nack flow.

kumaranvpl commented 2 weeks ago

@Lancetnik From the configuration docs, the description for retry.backoff.ms is

The amount of time to wait before attempting to retry a failed request to a given topic partition

There are no failures in our code, we request a message, we get the message and we seek to the message's offset. In this flow, nowhere a failure is happening. So, retry.backoff.ms may not be correct parameter to introduce delay between consumption of messages as @guillaume-lagay mentioned in the issue description

Lancetnik commented 2 weeks ago

@guillaume-lagay I think, that the framework behavior correct in this case. Please, reopen the Issue, if you still thinking, that we have a bug this place.

@kumaranvpl thank you for the investigation and explanation a lot!

guillaume-lagay commented 1 week ago

Hello,

Thank you for all your explanations, they're really clear. @kumaranvpl I tried the following code: https://github.com/airtai/faststream/issues/1721#issuecomment-2316852975, but I can confirm it doesn’t work either (using Confluent). Unless I’m mistaken, there does seem to be a bug in FastStream, as I understand from @kumaranvpl 's post.

I understand the limitations of Aiokafka, but wouldn't it work with Confluent?

Thank you in advance.

xavier-maisse commented 3 days ago

Hi,

I'm having the same problem. Is there a solution?

Thanks in advance

kumaranvpl commented 2 days ago

Hello @guillaume-lagay and @xavier-maisse,

As I mentioned earlier, according to the configuration docs, the description for retry.backoff.ms is

The amount of time to wait before attempting to retry a failed request to a given topic partition

This means that the retry.backoff.ms configuration value is used only when an error occurs while the Confluent client makes a request to Kafka. To prove that, I have replicated the above code using plain confluent-kafka-python :

import time
from confluent_kafka import Consumer, Producer, TopicPartition

config = {
    'bootstrap.servers': 'localhost:9092',
    'group.id': 'mygroup',
    'auto.offset.reset': 'earliest',
    "enable.auto.commit": False,

    "retry.backoff.ms": 10000, # 10 seconds
    "retry.backoff.max.ms": 100000, # 100 seconds
}

p = Producer(config)

def delivery_report(err, msg):
    """ Called once for each message produced to indicate delivery result.
        Triggered by poll() or flush(). """
    if err is not None:
        print('Message delivery failed: {}'.format(err))
    else:
        print('Message delivered to {} [{}]'.format(msg.topic(), msg.partition()))

i = 0
topic = f"mytopic{i}"

data = "Hello, this is a test message"
p.poll(0)
p.produce(topic, data.encode('utf-8'), callback=delivery_report)
p.flush()

c = Consumer(config)
c.subscribe([topic])

prev_time = time.time()
while True:
    msg = c.poll(1.0)
    if msg is None:
        continue
    if msg.error():
        print("Consumer error: {}".format(msg.error()))
        continue

    curr = time.time()
    print('Received message: {}'.format(msg.value().decode('utf-8')))
    print(f"Time taken: {curr - prev_time}")
    prev_time = curr

    tp = TopicPartition(topic=topic, partition=msg.partition(), offset=msg.offset())

    c.seek(partition=tp)

c.close()

Here, I produce a message, consume the same message, and seek the offset to offset of the message so that the same message is consumed again and again. Also, I have set retry.backoff.ms value to 10 seconds. Accordint to @guillaume-lagay's theory, Kafka should wait for retry.backoff.ms aka 10 seconds between consuming messages.

But, when I run the confluent client code, it produces the following output:

%4|1726045642.621|CONFWARN|rdkafka#producer-1| [thrd:app]: Configuration property group.id is a consumer property and will be ignored by this producer instance
%4|1726045642.621|CONFWARN|rdkafka#producer-1| [thrd:app]: Configuration property enable.auto.commit is a consumer property and will be ignored by this producer instance
%4|1726045642.621|CONFWARN|rdkafka#producer-1| [thrd:app]: Configuration property auto.offset.reset is a consumer property and will be ignored by this producer instance
Message delivered to mytopic3 [0]
%4|1726045643.625|CONFWARN|rdkafka#consumer-2| [thrd:app]: Configuration property retry.backoff.ms is a producer property and will be ignored by this consumer instance
%4|1726045643.625|CONFWARN|rdkafka#consumer-2| [thrd:app]: Configuration property retry.backoff.max.ms is a producer property and will be ignored by this consumer instance
Received message: Hello, this is a test message
Time taken: 3.110369920730591
Received message: Hello, this is a test message
Time taken: 0.502488374710083
Received message: Hello, this is a test message
Time taken: 0.5021016597747803
Received message: Hello, this is a test message
Time taken: 0.5027821063995361
Received message: Hello, this is a test message
Time taken: 0.5014510154724121
Received message: Hello, this is a test message
Time taken: 0.5029191970825195
Received message: Hello, this is a test message
Time taken: 0.5029010772705078
Received message: Hello, this is a test message
Time taken: 0.5024034976959229
Received message: Hello, this is a test message
Time taken: 0.502180814743042
Received message: Hello, this is a test message
Time taken: 0.5020129680633545
Received message: Hello, this is a test message
Time taken: 0.5027604103088379
Received message: Hello, this is a test message
Time taken: 0.5023820400238037

We can clearly see that plain confluent-kafka-python does not wait for 10 seconds between consuming messages. Therefore, @guillaume-lagay's theory is disproven. retry.backoff.ms is not the value you are looking for, as it only applies when an error occurs while connecting to Kafka.

Maybe the value you are looking for is fetch.min.bytes or something similar?