airtai / faststream

FastStream is a powerful and easy-to-use Python framework for building asynchronous services interacting with event streams such as Apache Kafka, RabbitMQ, NATS and Redis.
https://faststream.airt.ai/latest/
Apache License 2.0
3.03k stars 154 forks source link

Bug: `auto_commit=False` and `retry=True` in confluent-kafka doesn't result in retries #1446

Closed andreaimprovised closed 2 months ago

andreaimprovised commented 5 months ago

Describe the bug

Retries do not work for the confluent-kafka library as far as I can tell.

Seems related to https://github.com/airtai/faststream/issues/1001 in some way.

How to reproduce

Include source code:

# app.py
from collections import defaultdict

from faststream import FastStream, Logger
from faststream.confluent.annotations import KafkaMessage
from faststream.confluent import KafkaBroker

broker = KafkaBroker("localhost:30100")
body_to_times_to_die = defaultdict(int, {"hello": 3})

@broker.subscriber(
    "hello_world",
    group_id="foo",
    auto_commit=False,
    retry=True
)
async def async_subscriber(body: str, logger: Logger, msg: KafkaMessage):
    global body_to_times_to_die
    logger.info(body)
    body_to_times_to_die[body] -= 1
    if body_to_times_to_die[body] >= 0:
        raise Exception()
    await msg.ack()

async def main():
    app = FastStream(broker)
    await app.run()  # blocking method

if __name__ == "__main__":
    import asyncio
    asyncio.run(main())

And/Or steps to reproduce the behavior:

  1. Run kafka.
  2. Run the consumer.
  3. Use kafka ui to produce a single "hello" message on the topic.
  4. Use kafka ui to produce a single "hi" message on the topic.

Expected behavior I expect the "hello" message to be processed 4 times. The first three times should raise the exception, and the 4th should succeed. I then expect the "hi" message to be processed without exception.

Observed behavior The "hello" message is processed once and raises an exception. Then the "hi" message is processed once without exception.

This is my console output (I forgot to run step (3) for this run)

python -m app
2024-05-14 15:59:12,852 INFO     - FastStream app starting...
%3|1715727552.863|FAIL|faststream-0.5.5#producer-1| [thrd:localhost:30100/bootstrap]: localhost:30100/bootstrap: Connect to ipv6#[::1]:30100 failed: Connection refused (after 0ms in state CONNECT)
%3|1715727553.869|FAIL|faststream-0.5.5#producer-1| [thrd:localhost:30100/bootstrap]: localhost:30100/bootstrap: Connect to ipv6#[::1]:30100 failed: Connection refused (after 2ms in state CONNECT, 1 identical error(s) suppressed)
2024-05-14 15:59:14,891 INFO     - hello_world | foo |            - `AsyncSubscriber` waiting for messages
%3|1715727554.893|FAIL|faststream-0.5.5#producer-2| [thrd:localhost:30100/bootstrap]: localhost:30100/bootstrap: Connect to ipv6#[::1]:30100 failed: Connection refused (after 1ms in state CONNECT)
2024-05-14 15:59:15,956 INFO     - Topic `hello_world` created.
2024-05-14 15:59:15,960 INFO     - FastStream app started successfully! To exit, press CTRL+C
2024-05-14 15:59:29,786 INFO     - hello_world | foo | 0-17157275 - Received
2024-05-14 15:59:29,793 INFO     - hello_world | foo | 0-17157275 - hello
2024-05-14 15:59:29,793 ERROR    - hello_world | foo | 0-17157275 - Exception:
Traceback (most recent call last):
  File "lib/python3.11/site-packages/faststream/broker/subscriber/usecase.py", line 331, in consume
    result_msg = await h.call(
                 ^^^^^^^^^^^^^
  File "lib/python3.11/site-packages/faststream/broker/subscriber/call_item.py", line 172, in call
    raise e
  File "lib/python3.11/site-packages/faststream/broker/subscriber/call_item.py", line 164, in call
    result = await call(message)
             ^^^^^^^^^^^^^^^^^^^
  File "lib/python3.11/site-packages/faststream/broker/middlewares/base.py", line 73, in consume_scope
    await self.after_consume(err)
  File "lib/python3.11/site-packages/faststream/broker/middlewares/base.py", line 54, in after_consume
    raise err
  File "lib/python3.11/site-packages/faststream/broker/middlewares/base.py", line 64, in consume_scope
    result = await call_next(await self.on_consume(msg))
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "lib/python3.11/site-packages/faststream/broker/wrapper/call.py", line 201, in decode_wrapper
    return await func(msg)
           ^^^^^^^^^^^^^^^
  File "lib/python3.11/site-packages/fast_depends/use.py", line 146, in injected_wrapper
    r = await real_model.asolve(
        ^^^^^^^^^^^^^^^^^^^^^^^^
  File "lib/python3.11/site-packages/fast_depends/core/model.py", line 529, in asolve
    response = await run_async(call, *final_args, **final_kwargs)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "lib/python3.11/site-packages/fast_depends/utils.py", line 48, in run_async
    return await cast(Callable[P, Awaitable[T]], func)(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "lib/python3.11/site-packages/faststream/utils/functions.py", line 53, in to_async_wrapper
    return await call_or_await(func, *args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "lib/python3.11/site-packages/fast_depends/utils.py", line 48, in run_async
    return await cast(Callable[P, Awaitable[T]], func)(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "app.py", line 22, in async_subscriber
    raise Exception()
Exception
2024-05-14 15:59:29,804 INFO     - hello_world | foo | 0-17157275 - Processed

Screenshots Not much else to share.

Environment Running FastStream 0.5.5 with CPython 3.11.8 on Darwin

Additional context

This seems to be a workaround if I do it right before I raise the exception:

            raw_message = msg.raw_message[0]
            topic_partition = TopicPartition(
                raw_message.topic(),
                raw_message.partition(),
                raw_message.offset()
            )
            msg.consumer.consumer.seek(topic_partition)
kumaranvpl commented 2 months ago

Hello @andreaimprovised,

This bug has been fixed as part of https://github.com/airtai/faststream/releases/tag/0.5.18 release.

Following is a sample code


from typing import Any, Dict
from faststream import FastStream
from faststream.confluent import KafkaBroker
from faststream.confluent.annotations import (
    KafkaMessage,
    Logger
)
import json
import asyncio

broker = KafkaBroker("localhost:9092")
topic = "confluent-nack-test"

app = FastStream(broker)

@broker.subscriber(topic, group_id="foo", auto_commit=False, auto_offset_reset="earliest")
async def async_subscriber(body: Dict[str,Any], logger: Logger, msg: KafkaMessage):
    logger.info(body)
    await msg.nack()

@app.after_startup
async def publish_something() -> None:
    async def _publish_something() -> None:
        i = 10
        print(f"Sleeping for {i} seconds")
        await asyncio.sleep(i)
        message = {"hi": "there"}
        await broker.publish(message, topic=topic)    
        print("Published message" + json.dumps(message))

    asyncio.create_task(_publish_something())