ballerina-platform / ballerina-library

The Ballerina Library
https://ballerina.io/learn/api-docs/ballerina/
Apache License 2.0
136 stars 64 forks source link

Kafka consumer does not support concurrent access #6700

Closed Nuvindu closed 3 months ago

Nuvindu commented 4 months ago

Description: Getting the following exception when using kafka:Consumer to consume messages from a Kafka topic concurrently.

The error:

Exception in thread "balx-kafka-consumer-network-thread" java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access. currentThread(name: balx-kafka-consumer-network-thread, id: 58) otherThread(id: 54)
        at org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:2551)
        at org.apache.kafka.clients.consumer.KafkaConsumer.acquireAndEnsureOpen(KafkaConsumer.java:2532)
        at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1234)
        at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1227)
        at io.ballerina.stdlib.kafka.nativeimpl.consumer.Poll.lambda$pollPayload$1(Poll.java:99)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
        at java.base/java.lang.Thread.run(Thread.java:833)

Steps to reproduce:

  1. Start a Kafka broker instance

  2. Run the Ballerina service

    
    import ballerina/http;
    import ballerinax/kafka;

public type Movie record {| string title; int year; string director; |};

service / on new http:Listener(8080) { private final kafka:Producer producer; private final kafka:Consumer consumer;

function init() returns error? {
    self.producer = check new (kafka:DEFAULT_URL);
    self.consumer = check new (kafka:DEFAULT_URL, {
        groupId: "new-releases-id",
        topics: "new-releases"
    });
}

resource function post releases(Movie movie) returns error? {
    check self.producer->send({
        topic: "new-releases",
        value: movie
    });
}

resource function get releases() returns string|error {
    Movie[] movies = check self.consumer->pollPayload(15);
    return movies.toString();
}

}


3. Execute the cURL command twice in separate terminals.
```curl
curl http://localhost:8080/releases
dilanSachi commented 4 months ago

This is because even thought the kafka:Consumer is isolated, the underlying java consumer is not threadsafe. Therefore, it has to be synchronized manually.