Closed ErlendFax closed 4 months ago
Swapped to confluent kafka go and no producer problems. I still prefer franz-go though.
@ErlendFax could you capture broker 2 logs around the EOF? EOF usually indicates that SASL is missing, but if br1 and br3 are working, something else is up here. I've never seen a situation like this.
The only main network difference I can think of between this client and confluent-kafka-go is the dialer under the hood.
Thank you for replying! When looking again I did see more clues from the producer logs.
Log entry from producer:
2024/05/02 14:00:03 Error producing message: CORRUPT_MESSAGE: This message has failed its CRC checksum, exceeds the valid size, has a null key for a compacted topic, or is otherwise corrupt.
Log entry from redpanda-2:
ERROR 2024-05-02 14:00:03,199 [shard 0] kafka - kafka_batch_adapter.cc:130 - Cannot validate Kafka record batch. Missmatching CRC. Expected:1950350817, Got:4140228419
ERROR 2024-05-02T14:00:03.199527673Z [resource.labels.podName: redpanda-2] [resource.labels.containerName: redpanda] ERROR 2024-05-02 14:00:03,199 [shard 0] kafka - kafka_batch_adapter.cc:171 - batch has invalid CRC: {header_crc:0, size_bytes:1339, base_offset:{0}, type:batch_type::raft_data, crc:1950350817, attrs:{compression:none, type:CreateTime, transactional: 0, control: 0}, last_offset_delta:5, first_timestamp:{timestamp: 1714658396642}, max_timestamp:{timestamp: 1714658396642} .......
Included some logs, lots have been filtered out. Tried to find the relevant stuff.
Are you able to reliably recreate the batch that triggers this error? I'd like to repro this on my side, if possible. A CRC error is pretty fundamental so I'd have expected this to be raised earlier.
Not easily. Suggestions on how I can get this batch? Log it this error hits? Is there a callback func or something that can handle this and log?
If you're able to get some set of records to reproduce this even infrequently, I can go on that. There's no way to know what exactly was in one batch or the next, but if you see this error maybe 1/100 times on some given set of input, then I can likely go from there.
Thank you for your help on this issue. So I used the callback func on the producer to print the message sent if an error occurred.
Tried parsing it but didn't really see anything odd. Curious if anyone else does, this is the first of in total 130 similar log entries:
2024/05/03 16:07:38 Failed to produce message: CORRUPT_MESSAGE: This message has failed its CRC checksum, exceeds the valid size, has a null key for a compacted topic, or is otherwise corrupt., &{[] [123 34 116 111 112 105 99 34 58 34 108 116 101 47 99 115 118 47 54 57 55 48 54 51 49 52 48 48 54 54 53 48 48 53 47 108 49 34 44 34 99 111 110 116 101 110 116 34 58 34 101 121 74 107 90 88 90 112 89 50 86 102 97 87 81 105 79 105 74 109 83 106 89 53 89 109 53 85 87 83 73 115 73 109 82 48 98 83 73 54 73 106 73 119 77 106 81 116 77 68 85 116 77 68 78 85 77 84 103 54 77 68 89 54 77 106 103 105 76 67 74 121 99 51 78 112 73 106 111 116 77 84 65 49 76 67 74 49 98 109 108 52 73 106 111 120 78 122 69 48 78 122 85 121 77 122 103 52 76 67 74 119 97 83 73 54 78 84 85 49 77 121 52 119 102 81 61 61 34 125] [] 0001-01-01 00:00:00 +0000 UTC test_slett 0 {0} 0 0 0 0 signal.NotifyContext(context.Background, [interrupt terminated])}
[123 34 116 111 112 105 99 34 58 34 108 116 101 47 99 115 118 47 54 57 55 48 54 51 49 52 48 48 54 54 53 48 48 53 47 108 49 34 44 34 99 111 110 116 101 110 116 34 58 34 101 121 74 107 90 88 90 112 89 50 86 102 97 87 81 105 79 105 74 109 83 106 89 53 89 109 53 85 87 83 73 115 73 109 82 48 98 83 73 54 73 106 73 119 77 106 81 116 77 68 85 116 77 68 78 85 77 84 103 54 77 68 89 54 77 106 103 105 76 67 74 121 99 51 78 112 73 106 111 116 77 84 65 49 76 67 74 49 98 109 108 52 73 106 111 120 78 122 69 48 78 122 85 121 77 122 103 52 76 67 74 119 97 83 73 54 78 84 85 49 77 121 52 119 102 81 61 61 34 125]
I thought it would write several messages in one batch ... I would guess it does under the hood, but still confusing that this returns like this - one single message.
I just tried writing that exact message to a local test-slett topic on Redpanda with the same producer settings, but I'm not getting an error. Are you able to reliably trigger the error with that input, or is there more to it (and maybe that's just one message failing in the middle)?
Please let me know if you have some program that I can run to reproduce this; I'll close this issue for now otherwise. The only other issue that has a CORRUPT_MESSAGE error in it is #459, and the problem there was the user was mutating the record while it was being produced.
We are running redpanda self-hosted with three brokers.
Our app consume from 1 topic, process messages, and produce to 3 topics. Consuming and processing takes no times, but producing back to topics works OK for a few messages, but then fails. It's possible I've gotten a bit confused here, but to me it looks like this specifically happens when writing to broker nr. 2.
We have a few other apps connected to the cluster, they all run fine. Some of them in go.
I am not sure what happens here, just reaching out to see if anyone got suggestions.
I tried to fiddle around with produce config, buffer sizes, etc. but no luck. Is my config wrong? Any ideas?
Snippets
```golang ... configConsumer := []kgo.Opt{ kgo.SeedBrokers(config.Redpanda.BrokerUrl), kgo.ConsumeTopics(config.Redpanda.ConsumerTopic), kgo.ConsumerGroup(config.Redpanda.GroupId), kgo.ClientID("lte-m-csv-parser-consumer"), kgo.Balancers(kgo.RoundRobinBalancer()), kgo.WithLogger(loggerConsumer), } consumerClient, err := kgo.NewClient(configConsumer...) if err != nil { log.Fatalf("Failed to connect to Redpanda: %v", err) } defer consumerClient.Close() configProducer := []kgo.Opt{ kgo.SeedBrokers(config.Redpanda.BrokerUrl), kgo.MaxBufferedBytes(10 * 1024), // 10 KB maximum buffer size kgo.ProducerBatchCompression(kgo.NoCompression()), kgo.WithLogger(loggerProducer), kgo.ClientID("lte-m-csv-parser-producer"), } producerClient, err := kgo.NewClient(configProducer...) if err != nil { log.Fatalf("Failed to connect to Redpanda: %v", err) } defer producerClient.Close() ... ``` ```golang ... // both of the below snippets are started in separate goroutines in main. for { fetches := service.consumerClient.PollRecords(service.ctx, 100) fetches.EachError(func(topic string, partition int32, err error) { log.Printf("Error fetching from topic %s partition %d: %v", topic, partition, err) }) fetches.EachRecord(func(record *kgo.Record) { // Process record messages := service.parseMessage(record.Value) for _, message := range messages { // send message to channel service.channel <- message messageCount++ } }) } } ... // read from channel and produce to redpanda for message := range service.channel { service.producerClient.Produce(service.ctx, &message, callbackProducer) } ```
Logs
INFO 2024-05-02T14:31:13.563229821Z [resource.labels.containerName: lte-m-csv-parser] PRODUCER[DEBUG] wrote Produce v7; broker: 2, bytes_written: 1039, write_wait: 97.652µs, time_to_write: 79.068µs, err: