airtai / faststream

FastStream is a powerful and easy-to-use Python framework for building asynchronous services interacting with event streams such as Apache Kafka, RabbitMQ, NATS and Redis.
https://faststream.airt.ai/latest/
Apache License 2.0
3.04k stars 154 forks source link

Bug: OffsetCommit failed for mixed subscribtion on Kafka topic #1571

Closed dem214 closed 2 months ago

dem214 commented 4 months ago

Describe the bug When subscribing to a topic through a coordinator and to partitions directly using one group_id, an error occurs, making consuming of messages is imposible.

How to reproduce Include source code:

import logging

from faststream import FastStream
from faststream.kafka import KafkaRouter, TopicPartition, KafkaBroker

router = KafkaRouter()

@router.subscriber(
    't1',
    group_id='cg1',
)
async def handle_group(msg):
    print('handle_group')

@router.subscriber(
    group_id='cg1',
    partitions=[TopicPartition('t2', 0), TopicPartition('t2', 1)]
)
async def handle_partition(msg):
    print('handle_partition')

broker = KafkaBroker(
    log_level=logging.INFO,
    graceful_timeout=5.0,
)
broker.include_router(router, prefix='rtr_')

app = FastStream(broker)

And/Or steps to reproduce the behavior:

  1. ...

Expected behavior Works fine and consume messages from both topics

Observed behavior Got an error in logs:

24-07-05 16:52:04,221 INFO     - FastStream app starting...
2024-07-05 16:52:04,228 INFO     - rtr_t1            | cg1 |            - `HandleGroup` waiting for messages
Topic rtr_t1 not found in cluster metadata
2024-07-05 16:52:04,343 INFO     - rtr_t2-0,rtr_t2-1 | cg1 |            - `HandlePartition` waiting for messages
Topic rtr_t2 not found in cluster metadata
2024-07-05 16:52:04,451 INFO     - FastStream app started successfully! To exit, press CTRL+C
OffsetCommit failed for group cg1 due to group error ([Error 25] UnknownMemberIdError: cg1), will rejoin
OffsetCommit failed for group cg1 due to group error ([Error 25] UnknownMemberIdError: cg1), will rejoin
Auto offset commit failed: [Error 25] UnknownMemberIdError: cg1
OffsetCommit failed for group cg1 due to group error ([Error 25] UnknownMemberIdError: cg1), will rejoin
OffsetCommit failed for group cg1 due to group error ([Error 25] UnknownMemberIdError: cg1), will rejoin
Auto offset commit failed: [Error 25] UnknownMemberIdError: cg1
OffsetCommit failed for group cg1 due to group error ([Error 25] UnknownMemberIdError: cg1), will rejoin
OffsetCommit failed for group cg1 due to group error ([Error 25] UnknownMemberIdError: cg1), will rejoin
Auto offset commit failed: [Error 25] UnknownMemberIdError: cg1

Environment Running FastStream 0.5.14 with CPython 3.11.4 on Linux

spataphore1337 commented 3 months ago

Your example is inherently flawed because you are specifying two different topics for the same consumer group. Use a single topic for one consumer group.

Lancetnik commented 2 months ago

Seems like you are using Kafka in a wrong way and this problem does not related to FastStream iself. Please, reopen the Issue if you don't agree

dem214 commented 2 months ago

Is there a problem to use one consumer group ID for several Kafka topics? I didn't find any restrictions in the Kafka architecture or its documentation. If you have any other information, please let me know.

kumaranvpl commented 2 months ago

Is there a problem to use one consumer group ID for several Kafka topics? I didn't find any restrictions in the Kafka architecture or its documentation. If you have any other information, please let me know.

This looks like an issue with aiokafka rather than an issue with FastStream or a restriction from Kafka. There are multiple issues from multiple time period indicating the same issue in aiokafka - https://github.com/aio-libs/aiokafka/issues/880, https://github.com/aio-libs/aiokafka/issues/575, https://github.com/aio-libs/aiokafka/issues/727.

@dem214 The example you have mentioned works with confluent-kafka library without any change.

import asyncio
import logging

from faststream import FastStream
# import from faststream.confluent instead of faststream.kafka
from faststream.confluent import KafkaRouter, TopicPartition, KafkaBroker

router = KafkaRouter()

@router.subscriber(
    't1',
    group_id='cg1',
)
async def handle_group(msg):
    print('handle_group')

@router.subscriber(
    group_id='cg1',
    partitions=[TopicPartition('t2', 0), TopicPartition('t2', 1)]
)
async def handle_partition(msg):
    print('handle_partition')

broker = KafkaBroker(
    log_level=logging.INFO,
    graceful_timeout=5.0,
)
broker.include_router(router, prefix='rtr_')

app = FastStream(broker)

# Added to publish a message to topics rtr_t1 and rtr_t2
@app.after_startup
async def publish_msgs() -> None:
    async def _publish_msgs() -> None:
        await asyncio.sleep(5)
        print("Publishing messages")
        await broker.publish({"name": "Alice"}, topic="rtr_t1")
        await broker.publish({"name": "Bob"}, topic="rtr_t2")

    asyncio.create_task(_publish_msgs())

If you need the behaviour you have mentioned, you could use confluent-kafka instead of aiokafka.