confluentinc / confluent-kafka-go

Confluent's Apache Kafka Golang client
Apache License 2.0
4.66k stars 658 forks source link

golang confluent kafka producer client running into error - Failed to create thread: Resource temporarily unavailable (11) #1177

Open Mukthar-am opened 7 months ago

Mukthar-am commented 7 months ago

Description

I am running a golang based webserver which is a ingestion gateway for Kafka. This web service, receives HTTP connections from different clients and ingests into AWS MSK.

In my case, I have implemented batching, trying to ingest a batch of 50 messages where each message is of 1 KB and using async producer of golang confluent kafka.

My goal is to have higher ingestion rates so while perf testing, I run into the below error after a few min's. I could see that the ingestion rate pumps up to as high as 500 messages per second but later golang server crashes.

*** Error

Failed to create thread: Resource temporarily unavailable (11)

How to reproduce

func AsyncBatchProduce(messages [][]byte, topic string) { LOG.Debug("Init async BATCH kafka producer with producer configs", zap.Any("BrokerConfigs", kConf))

kafkaProducerInstance, producerError := kafka.NewProducer(kConf)
if producerError != nil {
    LOG.Error("failed to create new kafka async batch producer instance", zap.Any("create-producer-error", producerError))
} else {
    LOG.Info("successfully obtained kafka async batch producer instance")
}

// Produce messages to topic (asynchronously)
deliveryChan := make(chan kafka.Event, 10000)
defer close(deliveryChan)

// Channel to receive errors
produceErrChan := make(chan error)

// dump all the messages into the producer instance
for mId, message := range messages {
    LOG.Debug("batch producer iterator", zap.Any("message-id", mId))
    err := kafkaProducerInstance.Produce(
        &kafka.Message{
            TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},
            Value:          message,
        }, deliveryChan)

    if err != nil {
        LOG.Error("producer instance error", zap.Any("producer-error", err))
        produceErrChan <- err
    }
}

// Delivery report handler for produced messages
go func() {
    for e := range kafkaProducerInstance.Events() {

        switch ev := e.(type) {
        case *kafka.Message:
            part := ev.TopicPartition
            LOG.Debug(part.String())

            if ev.TopicPartition.Error != nil {
                LOG.Error("= Failed persisting message to Kafka",
                    zap.Any("topic", &ev.TopicPartition.Topic),
                    zap.Any("persistence-error", ev.TopicPartition.Error))
            } else {
                LOG.Info("= Successfully persisted message to ",
                    zap.Any("topic", &ev.TopicPartition.Topic),
                    zap.Any("partition", ev.TopicPartition.Partition),
                    zap.Any("offset", ev.TopicPartition.Offset))
            }

        case kafka.Error:
            LOG.Info("encountered kafka broker connection errors, opting fallback persistence route")

            // create kafkaConnectionError.txt file, flagging that the kafka system is down
            fileutils.WriteToFileIfNotExists(kafkaConnectionErrorFile, kafkaConnectionErrorFileContent)

            kafkaProducerInstance.Flush(produceTimeout)
            fileWriteChannelizer.HandleBatchedMessage(messages)

            return
        default:
            LOG.Error("Event Ignored", zap.Any("kafka.Message", ev))
        }
    }
}()

// Check for errors
select {
case err := <-produceErrChan:
    if opError, ok := err.(*kafka.Error); ok && opError.IsFatal() && opError.Code() == kafka.ErrLeaderNotAvailable {
        LOG.Error("Connection refused error", zap.Any("opError", opError))
    } else {
        LOG.Error("Error sending message", zap.Any("err", err))
    }
default:
    LOG.Debug("submitted message successfully to async message committer")
}

channelOut := <-deliveryChan
messageReport := channelOut.(*kafka.Message)

if messageReport.TopicPartition.Error != nil {
    LOG.Info("topic partition error", zap.Any("partition error", messageReport.TopicPartition.Error.Error()))

} else {
    // ensure to delete the kafkaConnectionErrorFile, flagging the cron to transfer s3 backed messages to kafka
    fileutils.DeleteFile(kafkaConnectionErrorFile)
    LOG.Info("successfully persisted message",
        zap.Any("batch-size", len(messages)),
        zap.Any("topic", messageReport.TopicPartition.Topic),
        zap.Any("offset", messageReport.TopicPartition.Offset),
        zap.Any("partition", messageReport.TopicPartition.Partition))
}

//defer close(deliveryChan)

// Wait for message deliveries before shutting down
kafkaProducerInstance.Flush(produceTimeout)

}

Checklist

Please provide the following information:

milindl commented 5 months ago

I can't see anything immediately wrong with the code. Is it possible that the producer delivery channel is getting too full and causing some sort of memory issue? You could try draining it from within a goroutine rather than later. What is the number of brokers in your test cluster, and what are the resource limits on the machine you're running this program on?

Is it possible to profile the cpu/memory usage of the program as it runs?