confluentinc / confluent-kafka-go

Confluent's Apache Kafka Golang client
Apache License 2.0
4.68k stars 661 forks source link

Facing blocking api call and performance degradation for readMessage #986

Open jforjava1981 opened 1 year ago

jforjava1981 commented 1 year ago

Description

We are trying to use seek/readMessage combination in a loop. We see that most of the time the readMessage causes a blocking behaviour on internal c lib's C._rk_queue_poll or some internal API. Each message takes around 500 milliseconds to be returned from readMessage. When I introduce a delay of 100 milliseconds between seek and readMessage, it reduces to 400 milliseconds for each message When I introduce a delay of 1000 milliseconds between seek and readMessage, it reduces to few mcroseoncds for each message.

Is this behavious expected? I know probably using seek/ReadMessage combo is not the natural of use cases.

How to reproduce

use below code as consumer for local kafka cluster with 1 broker, 3 partitions.

`for testRun := 0; testRun < testRuns; testRun++ { startIndex := rand.Intn(len(partitionRecords) - batchSize) endIndex := startIndex + batchSize batch := partitionRecords[startIndex:endIndex]

    if SortEnable {
        utils.SortByPartitionAndOffset(batch)
    }
    r := trace.StartRegion(context.Background(), fmt.Sprintf("Batch%d", testRun+1))
    readStart := time.Now()
    fmt.Println(len(batch))
    for i, record := range batch {

        // Parse the partition and offset values from the record
        partition := record[0]
        offset := record[1]

        // Convert the partition and offset values to integers
        partitionInt, err := strconv.Atoi(partition)
        if err != nil {
            panic(err)
        }

        offsetInt, err := strconv.Atoi(offset)
        if err != nil {
            panic(err)
        }
        // Create a new Kafka topic partition instance
        topicPartition := kafka.TopicPartition{
            Topic:     &topic,
            Partition: int32(partitionInt),
            Offset:    kafka.Offset(offsetInt),
        }
        // Use the Seek() method to fetch messages from the specified offset
        seekStart := time.Now()
        seekErr := p.KafkaWrapper.DoSeek(p.Consumer, topicPartition, 0)
        seekEnd := time.Now()
        seekTime := seekEnd.Sub(seekStart)
        fmt.Println("Record No:", i, "Seek Error: ", seekErr, " partition: ", topicPartition.Partition, " offset: ", topicPartition.Offset)
        // Read the message from each of those partition-offset pair
        time.Sleep(2000 * time.Millisecond)
        readMessageStart := time.Now()
        timeoutTime, _ := time.ParseDuration("10s")
        msg, err := p.KafkaWrapper.DoReadMessage(p.Consumer, timeoutTime)

        readMessageEnd := time.Now()
        if err != nil {
            fmt.Printf("Consumer error: %v (%v)\n", err, msg)
            continue
        }
        //  fmt.Println("Message ", string(msg.Value))
        readMessageTime := readMessageEnd.Sub(readMessageStart)
        //fmt.Println("ReadMessageTime: ", readMessageTime.Microseconds())
        _, err2 := logFile.WriteString("" + "," + strconv.Itoa(testRun+1) + "," + "" + "," + strconv.Itoa(int(topicPartition.Partition)) + "," + strconv.Itoa(int(topicPartition.Offset)) + "," + strconv.Itoa(int(seekTime.Microseconds())) + "," + strconv.Itoa(int(readMessageTime.Microseconds())) + "," + "" + "," + "" + "\n")
        if err2 != nil {
            fmt.Println("Write Error in log file", err)
        }
    }
    readEnd := time.Now()
    r.End()
    batchDuration := readEnd.Sub(readStart)
    currentTime := time.Now()
    _, err := logFile.WriteString(currentTime.Format("2006-01-02 15:04:05") + "," + "" + "," + strconv.Itoa(batchSize) + "," + "" + "," + "" + "," + "" + "," + "" + "," + strconv.Itoa(int(batchDuration.Microseconds())) + "\n")
    if err != nil {
        fmt.Println("Write Error in log file", err)
    }
}`

We have requiement of reading 100-500 such messages from particular offset and partition that is stored offline.

Checklist

Please provide the following information:

jforjava1981 commented 1 year ago

Update: I was using default values for properties fetch.max.wait.ms (500) fetch.min.bytes (1) fetch.max.bytes( default value ) partition.fetch.max.bytes ( default value)

When I faced above issue of waiting because some function blocked.

As I understand from above configuration, poll ( through readMessage ) should have returned when either of thresholds for fetch.max.wait.ms (500) or fetch.min.bytes was reached. Ideally as it was 1 byte by default it should return immediately? Also because we are sure the topic and partition has the data as we are seeking older messages which have long enough retention period using seek before poll. So why did it wait 500ms ( I assume as almost every message read took around 500 + few Ms). If it did not wait as per the logic was there blocking due to too many requests and every request brought only 1 byte? I am not sure.

When I changed property values from their default to below values it has performed much faster fetch.max.wait.ms (0) fetch.min.bytes (max value ) fetch.max.bytes(max ) partition.fetch.max.bytes (max )

I had to set last property above to max to make it perform with multiple consumers(= no of partitions n each reading from single partition). Running in goroutines.

Can you put some light on the behaviour? Any pitfalls in above values for my use case and code above?