confluentinc / confluent-kafka-go

Confluent's Apache Kafka Golang client
Apache License 2.0
4.59k stars 652 forks source link

Producer messages not flushing after Flush()? #1264

Closed deathbymochi closed 1 month ago

deathbymochi commented 1 month ago

Description

I am using V2.5.0 of the library and trying to set up a very basic Kafka producer in an AWS Lambda function that processes events from an API Gateway endpoint, but am running into problems getting my messages to actually flush. I'm hoping I'm missing something obvious, but I'm also quite stuck. Would appreciate some expert eyes šŸ™

How to reproduce

Here is my Lambda function handler:

func handleEventBasicKafka(request events.APIGatewayV2HTTPRequest) (events.APIGatewayV2HTTPResponse, error) {
    cfg := &Configuration{
        kafkaHost: os.Getenv("KAFKA_HOST"),
        kafkaPort: os.Getenv("KAFKA_PORT"),
    }
    producerClient, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": cfg.KafkaBootstrapServers()})

    if request.RequestContext.HTTP.Method == "POST" {
        var rawLogs []models.LogTest
        _ := json.Unmarshal([]byte(request.Body), &rawLogs)

        go func() {
            for e := range producerClient.Events() {
                switch ev := e.(type) {
                case *kafka.Message:
                    if ev.TopicPartition.Error != nil {
                        logger.Info(fmt.Sprintf("Failed to deliver message: %v\n", ev.TopicPartition))
                    } else {
                        logger.Info(
                            "Successfully produced record",
                            slog.String("topic", *ev.TopicPartition.Topic),
                            slog.Int("partition", int(ev.TopicPartition.Partition)),
                            slog.Any("offset", ev.TopicPartition.Offset))
                    }
                default:
                    logger.Info(fmt.Sprintf("Got some other event type (%v)", ev))
                }
            }
        }()

        var batchErr error
        for _, rawLog := range rawLogs {
            rawLogBytes, err := json.Marshal(rawLog)
            if err != nil {
                batchErr = multierr.Append(batchErr, err)
                logger.Warn("skipping raw log due to error", slog.Any("error", err))
                continue
            }
            topic := kafkaTopic
            msg := kafka.Message{
                TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},
                Value:          rawLogBytes,
                Key:            []byte(rawLog.SixpackClientId),
            }
            err = producerClient.Produce(&msg, nil)
            if err != nil {
                batchErr = multierr.Append(batchErr, err)
                logger.Warn("error publishing to kafka", slog.Any("error", err), slog.Any("msg", rawLogBytes))
            }
        }

        if batchErr != nil {
            logger.Warn(fmt.Sprintf("got errors processing batch (%d / %d errored)",
                len(multierr.Errors(batchErr)), len(rawLogs)))
        }

        // debug - check msgs b4 flushing
        msgsLeft := producerClient.Len()
        logger.Info(fmt.Sprintf("Messages left before flush: %d", msgsLeft))

        msgsLeftAfterFlush := producerClient.Flush(flushTimeoutSeconds * 1000)
        logger.Info(fmt.Sprintf("Messages left after flush: %d", msgsLeftAfterFlush))

        // in case there were any unflushed messages remaining, log it
        msgsLeft = producerClient.Len()
        if msgsLeft > 0 {
            logger.Warn(fmt.Sprintf("Had messages left after flush: %d", msgsLeft))
            datadog.Metric(
                "seatgeek.fastly_pageview.kafka.unsent_msg",
                float64(msgsLeft),
                "status:unflushed",
            )
        }

                // close the producer to see if that helps?
        producerClient.Close()

        resp := events.APIGatewayV2HTTPResponse{
            StatusCode: 200,
            Body:       request.Body,
        }
        return resp, nil
    } else {
        resp := events.APIGatewayV2HTTPResponse{
            StatusCode: 405,
            Body:       fmt.Sprintf("Method not allowed (%s) (%v)", request.RequestContext.HTTP.Method, request),
        }
        return resp, nil
    }
}

The logs I see are:

{
    "time": "2024-07-30T20:14:40.619969542Z",
    "level": "INFO",
    "msg": "Messages left before flush: 2"
}
{
    "time": "2024-07-30T20:14:45.620006853Z",
    "level": "INFO",
    "msg": "Messages left after flush: 2"
}
{
    "time": "2024-07-30T20:14:45.620030926Z",
    "level": "WARN",
    "msg": "Had messages left after flush: 2"
}
%4|1722370485.626|TERMINATE|rdkafka#producer-2| [thrd:app]: Producer terminating with 2 messages (144 bytes) still in queue or transit: use flush() to wait for outstanding message delivery

I'm stumped b/c it seems like it's not actually flushing, even though I'm calling Flush()? Am I missing something obvious?

Checklist

Please provide the following information:

deathbymochi commented 1 month ago

Ah! As it turns out, I hadn't noticed the topic wasn't created yet šŸ¤¦

Was able to suss that out via debug mode (adding "debug": "all" to the config map when creating the producer)