airtai / faststream

FastStream is a powerful and easy-to-use Python framework for building asynchronous services interacting with event streams such as Apache Kafka, RabbitMQ, NATS and Redis.
https://faststream.airt.ai/latest/
Apache License 2.0
2.51k stars 128 forks source link

Bug: Confluent Kafka Broker with acks=0 hangs on publish without no_confirm flag #1783

Closed ntr-switchdin closed 3 weeks ago

ntr-switchdin commented 3 weeks ago

Describe the bug ~The method signature for the confluent broker's publish method does not include the no_confirm flag added in https://github.com/airtai/faststream/releases/tag/0.5.23~

Edit: sorry this is not true, my editor was showing a different version to what was running inside my container. Close this if you want, I maintain it was a bit too significant of a change to have in such a small version bump and the documentation still doesn't really reference the new flag. The conflict with acks=0 is also probably of interest


Without it being specified, our webserver was hanging when attempting to publish messages. I hadn't realised the new flag had been added, and our broker was configured to explicitly have 0 acks anyway.

We are using RedPanda for local dev where this was observed, I don't know if that affects the way that messages are confirmed but I certainly didn't expect the whole process to hang when attempting to produce a message without any kind of logging or timeout.

As an aside, a change to the default behaviour of the Confluent broker probably should have been enough to warrant a minor semver increase.

How to reproduce Include source code:

from faststream.confluent import KafkaBroker

broker = KafkaBroker(KAFKA_BOOTSTRAP_SERVERS, logger=logger, security=security, allow_auto_create_topics=False, acks=0)
await broker.connect()
await broker.publish(topic=kafka_topic.topic, message=msg)
# hangs...

Expected behavior Execution returns after publishing as broker is configured with 0 acks.

Observed behavior Publisher hangs awaiting confirmation future

Environment

This command doesn't work without faststream[cli] btw:

Include the output of the faststream -v command to display your current project and system environment.

pip show faststream
Name: faststream
Version: 0.5.23

Additional context

redpanda:v24.1.9

Lancetnik commented 3 weeks ago

Thank you for the report. We'll try to fix it soon

kumaranvpl commented 3 weeks ago

@Lancetnik, @ntr-switchdin I have tried following code against Kafka using confluent.KafkaBroker with acks=0. And it is working fine, both publish and subscribe.

import asyncio
import random

from faststream import FastStream, Logger

from faststream.confluent import KafkaBroker

from pydantic import BaseModel, Field

version = "0.1.0"
title = "My FastStream service"
description = "Description of my FastStream service"

class Name(BaseModel):
    name: str = Field(..., description="Name of the person")

class Greeting(BaseModel):
    greeting: str = Field(..., description="Greeting message")

broker = KafkaBroker(acks=0)
app = FastStream(broker, title=title, version=version, description=description)

to_greetings = broker.publisher(
    "greetings",
    description="Produces a message on greetings after receiving a message on names",
)

topic = "names_septem_2024"

@broker.subscriber(topic, description="Consumes messages from names topic and produces messages to greetings topic", auto_offset_reset="earliest")
async def on_names(msg: Name, logger: Logger) -> None:
    result = f"hello {msg.name}"
    logger.info(result)
    greeting = Greeting(greeting=result)
    await to_greetings.publish(greeting)

@app.after_startup
async def publish_names() -> None:
    async def _publish_names() -> None:
        names = [
            "Ana",
            "Mario",
            "Pedro",
            "João",
            "Gustavo",
            "Joana",
            "Mariana",
            "Juliana",
        ]
        while True:
            name = random.choice(names)  # nosec
            print(f"producing {name}")
            await broker.publish(Name(name=name), topic=topic)

            await asyncio.sleep(2)

    asyncio.create_task(_publish_names())

Could this be an issue with redpanda? Because redpanda is like a drop in replacement for kafka and it may not have all the functionalities yet. Let me know if I misunderstood anything.

Lancetnik commented 3 weeks ago

As an aside, a change to the default behaviour of the Confluent broker probably should have been enough to warrant a minor semver increase.

I think so too, but according to semantic versioning until 1.0.0 release all 0.X releases are majors, so we should use the "patch" versions for such changes. Sorry, that is occurred, we are trying to not break the things while it is possible. But previous behavior was a bug, so I decided to change it.

I hope, that experience has no effect on your relation to the project

ntr-switchdin commented 3 weeks ago

@kumaranvpl I ran your script against our redpanda broker and it worked fine, so maybe something else is at work here.

I have been creating these brokers within pytest and some ASGI test servers. When running outside of pytest our subscribers and publishers are all working as expected. The following test works (note it has neither no_confirm=True or acks=0)

import asyncio
from logging import getLogger
from pydantic import BaseModel, Field
import pytest
from asyncio import Event
from faststream.confluent import KafkaBroker

KAFKA_BOOTSTRAP_SERVERS = "redpanda-0:9092"

log = getLogger(__name__)

class Person(BaseModel):
    name: str = Field(..., description="Name of the person")

@pytest.mark.usefixtures("redpanda_online")
async def test_no_acks():
    message_received_event = Event()
    broker = KafkaBroker(KAFKA_BOOTSTRAP_SERVERS)
    TOPIC_NAME = "no_acks_test"

    async def subscriber(p: Person):
        assert p.name == "ntr"
        message_received_event.set()

    broker.subscriber(TOPIC_NAME, group_id="test_no_acks", auto_offset_reset="latest")(subscriber)
    await broker.start()

    # in the actual impl i post to a http server that does this publish
    await broker.publish(topic=TOPIC_NAME, message=Person(name="ntr"))

    try:
        await asyncio.wait_for(message_received_event.wait(), timeout=10.0)
    except asyncio.TimeoutError:
        log.error("Timed out waiting for message received event")

    assert message_received_event.is_set()

but when i add the actual integration which is effectively:

    res = await async_client.post(url=url, content=payload)
    assert res.status_code == 201

The test hangs while the FastAPI server is trying to publish the message.

So probably this is related to asyncio, httpx test clients, or agsi test servers somehow.


As this seems likely to be unrelated to faststream itself I'm going to close this issue, but if it is interesting we can continue discussion and I can supply some more information about our case.