lovoo / goka

Goka is a compact yet powerful distributed stream processing library for Apache Kafka written in Go.
BSD 3-Clause "New" or "Revised" License
2.36k stars 173 forks source link

produce message larger than configured Producer.MaxMessageBytes #460

Closed BhautikChudasama closed 2 months ago

BhautikChudasama commented 2 months ago

Hello there, I have set the maxBytes in producer, why am I still getting below error

│ 2024/08/22 14:28:33 [Processor 9bsv0s31pclg036ue580-events-pod-group] Error running/stopping partition processor 0: 2 errors occurred:                                                                    │
│     * error processing message (partition=0): asynchronous error from callback: could not commit message with key '9bsv0s31pclg036ue580': 1 error occurred:                                               │
│     * kafka: Failed to produce message to topic 9bsv0s31pclg036ue580-events-pod-group-table: kafka: invalid configuration (Attempt to produce message larger than configured Producer.MaxMessageBytes: 10 │
│ 48653 > 1048576)                                                                                                                                                                                          │
│                                                                                                                                                                                                           │
│                                                                                                                                                                                                           │
│     * error processing message (partition=0): asynchronous error from callback: could not commit message with key '9bsv0s31pclg036ue580': 1 error occurred:                                               │
│     * kafka: Failed to produce message to topic 9bsv0s31pclg036ue580-events-pod-group-table: kafka: invalid configuration (Attempt to produce message larger than configured Producer.MaxMessageBytes: 10 │
│ 48653 > 1048576)                                                                                                                                                                                          │
│                                                                                                                                                                                                           │
│                                                                                                                                                                                                           │
│                                                                                                                                                                                                           │
│ 2024/08/22 14:28:33 [Processor 9bsv0s31pclg036ue580-events-pod-group] error during execution of consumer group: 2 errors occurred:                                                                        │
│     * error processing message (partition=0): asynchronous error from callback: could not commit message with key '9bsv0s31pclg036ue580': 1 error occurred:                                               │
│     * kafka: Failed to produce message to topic 9bsv0s31pclg036ue580-events-pod-group-table: kafka: invalid configuration (Attempt to produce message larger than configured Producer.MaxMessageBytes: 10 │
│ 48653 > 1048576)                                                                                                                                                                                          │
│                                                                                                                                                                                                           │
│                                                                                                                                                                                                           │
│     * error processing message (partition=0): asynchronous error from callback: could not commit message with key '9bsv0s31pclg036ue580': 1 error occurred:                                               │
│     * kafka: Failed to produce message to topic 9bsv0s31pclg036ue580-events-pod-group-table: kafka: invalid configuration (Attempt to produce message larger than configured Producer.MaxMessageBytes: 10 │
│ 48653 > 1048576)                                                                                                                                                                                          │
│                                                                                                                                                                                                           │
│                                                                                                                                                                                                           │
│                                                                                                                                                                                                           │
│ 2024-08-22T14:28:33.636Z    FATAL    kafka_steams/kubernetes_pod_runtime_calc_events.go:174    error running processor    {"error": "1 error occurred:\n\t* error consuming from group consumer: 2 errors │
│  occurred:\n\t* error processing message (partition=0): asynchronous error from callback: could not commit message with key '9bsv0s31pclg036ue580': 1 error occurred:\n\t* kafka: Failed to produce messa │
│ ge to topic 9bsv0s31pclg036ue580-events-pod-group-table: kafka: invalid configuration (Attempt to produce message larger than configured Producer.MaxMessageBytes: 1048653 > 1048576)\n\n\n\t* error proc │
│ essing message (partition=0): asynchronous error from callback: could not commit message with key '9bsv0s31pclg036ue580': 1 error occurred:\n\t* kafka: Failed to produce message to topic 9bsv0s31pclg03 │
│ 6ue580-events-pod-group-table: kafka: invalid configuration (Attempt to produce message larger than configured Producer.MaxMessageBytes: 1048653 > 1048576)\n\n\n\n\n\n"}                                 │

Here is my code

    emitterConfig := goka.DefaultConfig()
    emitterConfig.Producer.MaxMessageBytes = 1073741824 // 1GB

    emitter, err := goka.NewEmitter(cfg.KafkaBrokers, goka.Stream(outputTopic), new(structs.PodRuntimeCoded), goka.WithEmitterProducerBuilder(goka.ProducerBuilderWithConfig(emitterConfig)))
    if err != nil {
        return err
    }

Please help me to figure out what I am doing wrong. Thanks.

BhautikChudasama commented 2 months ago

@frairon I would greatly appreciate your assistance with this. Thank you!

BhautikChudasama commented 2 months ago

Fixed. Thanks.