snowflakedb / snowflake-jdbc

Snowflake JDBC Driver
Apache License 2.0
178 stars 170 forks source link

Subject: KafkaConsumer API Performance Challenges and Optimization Suggestions ( Kafka clients 3.6.1) #1971

Closed gmungi closed 8 hours ago

gmungi commented 5 days ago

Hi All,

We have been observing that the KafkaConsumer API is significantly slower compared to the previous low-level Kafka API we were using (e.g., FetchRequest, FetchResponse, ByteBufferMessageSet). Below is a detailed overview of the issue and the current implementation, along with an explanation of the bottlenecks and potential optimization suggestions.

Performance Issues Use Case:

The application requires fetching 1,000 messages starting from a specific user-provided offset and returning the next offset (1001) in the response. This offset will then be used as input for subsequent requests. Despite using MAX_POLL_RECORDS_CONFIG=1000, the consumer API fetches only ~300 records per poll in ~2 seconds. Fetching 1,000 records typically takes ~4 polls, resulting in a total time of ~8–10 second

I have tried different consumer settings like

MAX_PARTITION_FETCH_BYTES_CONFIG,FETCH_MIN_BYTES_CONFIG,MAX_POLL_RECORDS_CONFIG etc I have tried to increase max poll records..In 2 seconds it is not able to fetch 1000 records and returning 0 records.

Observed Delays:

Consumer Assignment and Seeking: The time taken for consumer.assign() and consumer.seek() operations adds to the overall latency. Polling: The consumer.poll() call often returns fewer records than expected, resulting in multiple iterations to achieve the required batch size.

Comparison with Low-Level API: The low-level Kafka API (e.g., FetchRequest and FetchResponse) performs better, with reduced latency for fetching records. It appears to bypass some of the high-level abstractions (e.g., consumer group coordination and offset management) that introduce overhead.

Consumer Creation Method j

public static KafkaConsumer<String, String> createConsumer(String clientName, int fetchSize) { Properties props = new Properties(); props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBrokerStr); props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId); props.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, clientName); props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); props.setProperty(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "1000"); // Security and additional properties... return new KafkaConsumer<>(props); }

public static KafkaConsumer<String,String> createConsumer(String clientName,int fetchSize) { Properties props = new Properties(); String kafkaBrokerStr = Config.getConsumerPropValue("kafkabrokerslist"); String groupId = Config.getConsumerPropValue("group.id"); props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBrokerStr); props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

        props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.setProperty(ConsumerConfig.CLIENT_ID_CONFIG,clientName);
        props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
        props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        String jaasTemplate = "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"%s\" password=\"%s\";";
        String uname = System.getenv(NextGenKafkaConstants.KAFKA_USER);
        String pwrd = System.getenv(NextGenKafkaConstants.KAFKA_PASS);
        log.info("KafkaUser:{} kafkaBrokerStr:{} GroupID:{}",uname,kafkaBrokerStr,groupId);
        String jaasCfg = String.format(jaasTemplate, uname, pwrd);
        props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
        props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
        props.put(SaslConfigs.SASL_JAAS_CONFIG, jaasCfg);
        //props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, fetchSize);
        //props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, "1024");
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "1000");
        //props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, "500");
        return new KafkaConsumer<String,String>(props);

}

public List consume(long offset, String topicName,int partition,CEConsumeRequest inputReq) throws CustomException { List msglist = new ArrayList();

    int waitTime = Integer.valueOf(Config
            .getConsumerPropValue("pull.wait.time"));
    int limit = Integer.valueOf(Config.getConsumerPropValue("pull.size.limit"));
    int emptyLoopLimit = Integer.valueOf(Config
            .getConsumerPropValue("pull.empty.loop.limit"));
    int fetchSize = Integer.valueOf(Config
            .getConsumerPropValue("pull.each.fetch.size"));

    TopicPartition topicPartition = new TopicPartition(topicName, partition);
    /*
     * User input offset.
     */

    long readOffset = offset;
    clientName = "crt_consumer_" + Thread.currentThread().getName();
    try (KafkaConsumer<String,String> consumer = KafkaConsumerFactory.createConsumer(clientName,fetchSize)){
        consumer.assign(Collections.singletonList(topicPartition));

    if (readOffset == 0 && readOffset < kafkaEarliestOffset) {
        log.warn("Resetting the offset to earliest available offset in kafka.");
        readOffset = kafkaEarliestOffset;
    }

    boolean end = false;
    long startTime = Calendar.getInstance().getTimeInMillis();
    long endTime = Calendar.getInstance().getTimeInMillis();
    int emptyFetchCount = 0;
    consumer.seek(topicPartition, readOffset);
    do {
        JSONObject obj = null;
        ConsumerRecords < String, String > records = consumer.poll(Duration.ofMillis(2000));
        log.info("Poll Records Count :{}",records.count());
          for (ConsumerRecord< String, String > consumerRecord: records) {
              long currentOffset = consumerRecord.offset();
              if (currentOffset < readOffset) {
                    log.warn("Found an old offset: {}, Expecting: {}", currentOffset, readOffset);
                    continue;
                }
              String message = consumerRecord.value();
              log.debug(
                        "client name : {} , Offset is : {} , Message is : {} ",
                        clientName, readOffset,
                        message);
              CONSUME_LOG.debug(topicName + "\t" + partition + "\t"
                        + String.valueOf(currentOffset));
              obj = new JSONObject(message);
               msglist.add(obj);
                readOffset = currentOffset + 1;
          }
        endTime = Calendar.getInstance().getTimeInMillis();
        if (msglist.size() >= Math.round(limit
                / inputReq.getApplicationArea().getReqInfo().size())
                || (endTime - startTime) >= waitTime) {
            log.info(
                    "Wait condition has been met... exiting the fetch loop. recordCount - {}, time exhausted - {} ms.",
                    msglist.size(), (endTime - startTime));
            end = true;
            consumer.commitSync();
        } else if (records.isEmpty()) {
            emptyFetchCount++;
            try {
                if(emptyFetchCount == emptyLoopLimit) {
                    log.info("No messages were found in 3 successive fetches. Stopping the consume process here.");
                    end = true;
                } else {
                    Thread.sleep(1000);
                }
            } catch (InterruptedException ie) {
                CONSUME_LOG.warn(ie.getMessage(), ie);
            }
        } else {
            emptyFetchCount = 0;
        }
    } while (!end);

    return msglist;
}

Please suggest on how to improve this..

sfc-gh-dszmolka commented 4 days ago

hi - i'm wondering this Issue was intended for the Snowflake Kafka connector library instead?

gmungi commented 4 days ago

hi - i'm wondering this Issue was intended for the Snowflake Kafka connector library instead?

@sfc-gh-dszmolka Iam not able to get the kafka consumer link..can u please hep on this

sfc-gh-sghosh commented 1 day ago

Hello @gmungi ,

Could you please create the issue in https://github.com/snowflakedb/snowflake-kafka-connector

Regards, Sujan

sfc-gh-sghosh commented 1 day ago

Hello @gmungi ,

This issue falls outside the scope of Snowflake Kafka Connector support, as it pertains to tuning the Kafka Connect framework versus other Kafka APIs. For these types of questions, Confluent support is a good resource.

Regards, Sujan

sfc-gh-dszmolka commented 8 hours ago

closing this issue as it should not be here.