aio-libs / aiokafka

asyncio client for kafka
http://aiokafka.readthedocs.io/
Apache License 2.0
1.09k stars 224 forks source link

Duplicate Message Producing after Offset Commit #921

Closed jainal09 closed 7 months ago

jainal09 commented 10 months ago

Describe the bug I have the script which fetches a message and produces the message to a new topic and commit the offset to a transaction, The problem is that I am receiving duplicate messages in the new topic and that is because it is committed but, when I fetch the new message with incremented offsets I am receiving the previously committed message

Expected behaviour A message should be consumed and produced to the new topic. Then the offsets needs to be committed and then a new message should be consumed with the updated offsets.

Environment (please complete the following information):

Reproducible example

import asyncio

from aiokafka import AIOKafkaConsumer, AIOKafkaProducer, TopicPartition
from kafka.coordinator.assignors.range import RangePartitionAssignor
from kafka.errors import IllegalStateError
from kafka.errors import KafkaConnectionError

from fault_tolerant_aio_kafka.consumers.handlers import RebalancedListener
from fault_tolerant_aio_kafka.logs import logger  # avoid circular imports

async def consume_messages() -> None:

    consumer = AIOKafkaConsumer(
        "my_topic",
        bootstrap_servers="localhost:9092",
        group_id = "my_group_id",
        max_poll_records = 1,
        enable_auto_commit = False,
        auto_offset_reset = "latest",
        partition_assignment_strategy = [RangePartitionAssignor],
        metadata_max_age_ms = 30000
    )
    consumer.max_poll_records = 1
    listener = RebalancedListener(consumer)
    print("consumer started")
    consumer.subscribe(topics=["my_topic"], listener=listener)
    try:
        await consumer.start()
        logger.info("Consumer started")
    except KafkaConnectionError as e:
        logger.error(e)
        logger.error(
            f"Error while connecting to Kafka: localhost:9092. Is it running? {e}"
        )
        await consumer.stop()
        # exit the consumer
        return
    while True:
        try:
            producer_obj = AIOKafkaProducer(
                bootstrap_servers="localhost:9092",
                transactional_id="my_transactional_id1",
                enable_idempotence=True,
            )

            message = await asyncio.wait_for(consumer.getone(), timeout=1.0)
            logger.info(f"Received message: {message.value}")
            logger.info(f"Received message from Partition: {message.partition}")
            await producer_obj.start()
            async with producer_obj.transaction():
                tp = TopicPartition('my_topic', message.partition)
                logger.info(f"Message key: {message.key}")
                logger.info(f"Message value: {message.value}")
                await producer_obj.send_and_wait(
                    topic="new_topic",
                    value=message.value,
                    key=message.key,
                )
                logger.info("message sent")
                await producer_obj.send_offsets_to_transaction(
                    {tp: message.offset + 1}, "my_group_id"
                )
                logger.info("offset sent")
                consumer.seek(tp, message.offset + 1)
                logger.info("seek done")
            await producer_obj.flush()
            await producer_obj.stop()
            logger.info("message produced")

        except IllegalStateError:
            logger.info("Consumer stopped probably due to a partition mismanagement.")
            break
        except asyncio.TimeoutError:
            await producer_obj.flush()
            await producer_obj.stop()
            # no message fetched within 1 second
            continue
    await producer_obj.flush()
    await producer_obj.stop()
    await consumer.stop()

async def start():
    loop = asyncio.get_event_loop()
    tasks = [loop.create_task(consume_messages()) for _ in range(1)]
    await asyncio.gather(*tasks)

if __name__ == "__main__":
    asyncio.run(start())

Producer Code

from aiokafka import AIOKafkaProducer
import asyncio
import random
import string

# Function to generate a random string of fixed length
def random_string(length):
    letters = string.ascii_lowercase
    return ''.join(random.choice(letters) for i in range(length))

async def main():
    # Create Kafka Producer
    producer = AIOKafkaProducer(bootstrap_servers='localhost:9092', enable_idempotence=True)

    # Start the producer
    await producer.start()

    try:
        # Generate 20 messages of random 4 characters
        for i in range(2000):
            message = random_string(4)
            await producer.send_and_wait('my_topic', message.encode('utf-8'))
    finally:
        # Stop the producer
        await producer.stop()

# Run the main function
asyncio.run(main())

Consumer which checks for duplicate messages

from aiokafka import AIOKafkaConsumer
import asyncio

async def check_duplicates():
    consumer = AIOKafkaConsumer(
        'new_topic',
        bootstrap_servers='localhost:9092',
        group_id="my-group1",
        isolation_level="read_committed",
        )

    # Start the consumer
    await consumer.start()

    try:
        # Create a set to store the messages
        messages_set = set()

        # Consume messages
        async for msg in consumer:
            message = msg.value.decode('utf-8')

            # Check if message is in set
            if message in messages_set:
                print(f"Duplicate message detected: {message}")
            else:
                # print(f"Received message: {message}")
                messages_set.add(message)

    finally:
        # Stop the consumer
        await consumer.stop()

# Run the main function
asyncio.run(check_duplicates())
jainal09 commented 10 months ago

Even modifying your Transactional Consume-Process-Produce example a little generates duplicate message. Below is the reproducible example:

import asyncio

from aiokafka import TopicPartition, AIOKafkaConsumer, AIOKafkaProducer
from fault_tolerant_aio_kafka.logs import logger

IN_TOPIC = "my_topic"
GROUP_ID = "my_group_id"
OUT_TOPIC = "new_topic"
TRANSACTIONAL_ID = "my-txn-id"
BOOTSTRAP_SERVERS = "localhost:9092"

POLL_TIMEOUT = 60_000

def process_batch(msgs):
    # Group by key do simple count sampling by a minute window
    # buckets_by_key = defaultdict(Counter)
    # for msg in msgs:
    #     timestamp = (msg.timestamp // 60_000) * 60
    #     buckets_by_key[msg.key][timestamp] += 1

    res = []
    # for key, counts in buckets_by_key.items():
    #     for timestamp, count in counts.items():
    #         value = str(count).encode()
    #         res.append((key, value, timestamp))
    for msg in msgs:
        res.append(msg.value)

    return res

async def transactional_process():
    consumer = AIOKafkaConsumer(
        IN_TOPIC,
        bootstrap_servers=BOOTSTRAP_SERVERS,
        enable_auto_commit=False,
        group_id=GROUP_ID,
        isolation_level="read_committed"  # <-- This will filter aborted txn's
    )
    await consumer.start()
    print("consumer started")

    producer = AIOKafkaProducer(
        bootstrap_servers=BOOTSTRAP_SERVERS,
        transactional_id=TRANSACTIONAL_ID
    )
    await producer.start()

    try:
        while True:
            msg_batch = await consumer.getmany(timeout_ms=POLL_TIMEOUT)

            async with producer.transaction():
                commit_offsets = {}
                in_msgs = []
                for tp, msgs in msg_batch.items():
                    in_msgs.extend(msgs)
                    commit_offsets[tp] = msgs[-1].offset + 1

                out_msgs = process_batch(in_msgs)
                for msg in out_msgs:
                    logger.info(f"Received message: {msg}")
                    await producer.send(
                        OUT_TOPIC, value=msg
                    )
                # We commit through the producer because we want the commit
                # to only succeed if the whole transaction is done
                # successfully.
                await producer.send_offsets_to_transaction(
                    commit_offsets, GROUP_ID)
    finally:
        await consumer.stop()
        await producer.stop()

if __name__ == "__main__":
    asyncio.run(transactional_process())
jainal09 commented 10 months ago

At this point I am really confused what I am doing wrong and if @tvoinarovskyi, @fabregas, @ods, @multani, @selevit could please explain and help me resolve this it will be really helpful!

vmaurin commented 8 months ago

@jainal09 How many messages are you producing, what is the duplicated rate ? You random generator function is only producing 456976 unique strings. With 100 messages, you will have around a probability of 1% duplicates, 1000 messages, 66% (approximation of the "birthday problem"). Even better, you can run the same test but producing a unique incrementing sequence

jainal09 commented 7 months ago

I believe you are right and using uuid.uuid4 gave me no duplicate values