aio-libs / aiokafka

asyncio client for kafka
http://aiokafka.readthedocs.io/
Apache License 2.0
1.13k stars 227 forks source link

Losing Messages Between Producer and Consumer #1031

Open hakan458 opened 2 months ago

hakan458 commented 2 months ago

Describe the bug Somehow messages are being lost between the Producer and Consumer.

I have an app in FastAPI which has an endpoint submit_batch which uses an aiokafka Producer to send messages to a certain topic. Looks like this:

async def submit_batch(batch: BatchData):
    topic = get_input_topic_name(batch.job_id)
    producer = AIOKafkaProducer(
        bootstrap_servers=settings.kafka_bootstrap_servers,
        value_serializer=lambda v: json.dumps(v).encode("utf-8"),
    )
    await producer.start()

    try:
        for record in batch.data:
            await producer.send_and_wait(topic, value=record)
    except UnknownTopicOrPartitionError:
        await producer.stop()
        raise HTTPException(
            status_code=500, detail=f"{topic=} for job {batch.job_id} not found"
        )
    finally:
        await producer.stop()

    return Response[BatchSubmitted](data=BatchSubmitted(job_id=batch.job_id))

On the other end is a aiokafka Consumer which reads messages until a timeout is reached.

while True:
    try:
        data_batch = await self.consumer.getmany(
            timeout_ms=self.timeout_ms, max_records=batch_size
        )
        for topic_partition, messages in data_batch.items():
            batch_data = [msg.value for msg in messages]

        logger.info(
            f"Received a batch of {len(batch_data)} records from Kafka topic {self.kafka_input_topic}"
        )
        if not data_batch:
            print_text("No more data in the environment. Exiting.")
            break
    except Exception as e:
        # TODO: environment should raise a specific exception + log error
        print_error(f"Error getting data batch from environment: {e}")
        break

When I send 10000 tasks using the submit_batch endpoint, on the consumer side I only end up getting 7000-8000 range usually. Occasionally, maybe 1/10 runs I will get all 10000. I have confirmed that all 10000 are being sent to the endpoint, and keeping a counter there yields 10000. Maybe they are not being sent successfully? But I figured send_and_wait would raise an exception if not? Is there a more deterministic way to check?

Expected behaviour I expect all 10000 messages sent by the Producer to be received by the Consumer

Environment (please complete the following information):

Reproducible example docker-compose.yml

# docker-compose.yml
services:
  kafka:
    restart: always
    image: bitnami/kafka
    ports:
      - "9093:9093"
    volumes:
      - "./server/kafka-data:/bitnami"
    environment:
      - KAFKA_ENABLE_KRAFT=yes
      - KAFKA_CFG_PROCESS_ROLES=broker,controller
      - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
      - KAFKA_CFG_LISTENERS=PLAINTEXT://:9093,CONTROLLER://:2181
      - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
      - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:9093
      - KAFKA_BROKER_ID=1
      - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=1@127.0.0.1:2181
      - ALLOW_PLAINTEXT_LISTENER=yes
      - KAFKA_CFG_NODE_ID=1
      - KAFKA_KRAFT_CLUSTER_ID=MkU3OEVBNTcwNTJENDM2Qk
      - KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=false

Have not broken out the problem into its own script.

hakan458 commented 2 months ago

Whats strange is if I put a small time.sleep(0.1) in the for loop we are sending records in, I get all 10000 messages in the consumer.

        for record in batch.data:
            await producer.send_and_wait(topic, value=record)
            time.sleep(0.1)

I wouldnt expect that sending messages quickly would be an issue. I tried this because in the same app, we are processing all messages, and sending them through another kafka topic, and we never lose a single message in that case. Since it takes some times for records to be processed thats the only difference I could think of (rate of sending messages) and I added the time.sleep(0.1) which seems to solve the issue, but why?

hakan458 commented 2 months ago

Some more info it does seem that all 10k are being sent if the offset is to be trusted

{"message": "<AIOKafkaConnection host=127.0.0.1 port=9093> Response 1001: ProduceResponse_v7(topics=[(topic='adala-input-ab599429-c094-4eae-a71c-85de414aaf98', partitions=[(partition=0, error_code=0, offset=9999, timestamp=-1, log_start_offset=0)])], throttle_time_ms=0)"}
hakan458 commented 2 months ago

Another note is that the first queue which we lose messages in somehow, we consumer messages at a much slower rate than the second queue in which we lose none. Not sure if that would make any difference either just trying to think of anything at this point.

hakan458 commented 2 months ago

One more thing to note - if I send the 10k records to submit_batch in batch sizes of 10 - which means we will hit the endpoint 1000 times for 10k messages, we again lose many many messages. If I send 10k to submit_batch + have the time.sleep(0.1) thats the only way it seems to work so far. Creating a producer on every call is probably not the best idea.