lovoo / goka

Goka is a compact yet powerful distributed stream processing library for Apache Kafka written in Go.
BSD 3-Clause "New" or "Revised" License
2.35k stars 175 forks source link

Kafka server: Message was too large, server rejected it to avoid allocation error #422

Open aditya-msd opened 1 year ago

aditya-msd commented 1 year ago

I am getting kafka server: Message was too large, server rejected it to avoid allocation error. Below are the logs that indicate the same . This is causing the processor to not commit and go into a loop sort of situation and blocking other messages in the topic . Is there any way to determine or catch the errors .

Increasing the topic size solves the issue , but I would like is to determine and prevent the loop . If this error happens , then I can side step the processing and prevent the loop and continue with the processing as usual.

2023/04/24 11:32:51 [Processor batch-worker-progress-processor > PartitionProcessor (0) > PartTable] finished building storage for topic batch-worker-progress-processor-table/0 in 0.0 minutes 2023/04/24 11:32:51 [Processor batch-worker-progress-processor > PartitionProcessor (0) > PartTable] topic manager gives us oldest: 327771, hwm: 342528 2023/04/24 11:32:51 [Processor batch-worker-progress-processor > PartitionProcessor (0)] catching up table done 2023/04/24 11:32:51 [Processor batch-worker-progress-processor] setup generation 401 ... done 2023/04/24 11:32:51 [Processor batch-worker-progress-processor > PartitionProcessor (0)] starting 2023/04/24 11:32:51 [Processor batch-worker-progress-processor] ConsumeClaim for topic/partition batch-worker-progress-topic/0, initialOffset=342523 2023/04/24 11:32:51 [Sarama] consumer/broker/0 added subscription to batch-worker-progress-topic/0 2023/04/24 11:32:51 Packet 1 received 2023/04/24 11:32:51 Workerstate Size :: 89983 2023/04/24 11:32:51 Total Processed Packets are 1523 out of 20000 2023/04/24 11:32:51 Packet 2 received 2023/04/24 11:32:51 Workerstate Size :: 89983 2023/04/24 11:32:51 Total Processed Packets are 1523 out of 20000 2023/04/24 11:32:51 [Processor batch-worker-progress-processor > PartitionProcessor (0)] Errors occurred asynchronously. Will exit partition processor 2023/04/24 11:32:51 [Processor batch-worker-progress-processor > PartitionProcessor (0)] stopped 2023/04/24 11:32:51 [Processor batch-worker-progress-processor > PartitionProcessor (0)] Run failed with error: Errors:

  • kafka server: Message was too large, server rejected it to avoid allocation error.
  • kafka server: Message was too large, server rejected it to avoid allocation error. 2023/04/24 11:32:51 [Processor batch-worker-progress-processor] ConsumeClaim done for topic/partition batch-worker-progress-topic/0 2023/04/24 11:32:51 kafka: error while consuming batch-worker-progress-topic/0: error processing message: Errors:
  • kafka server: Message was too large, server rejected it to avoid allocation error.
  • kafka server: Message was too large, server rejected it to avoid allocation error. 2023/04/24 11:32:51 [Sarama] consumer/broker/0 closed dead subscription to batch-worker-progress-topic/0 2023/04/24 11:32:51 [Processor batch-worker-progress-processor] Cleaning up for 401 2023/04/24 11:32:51 [Processor batch-worker-progress-processor > PartitionProcessor (0)] Stopping 2023/04/24 11:32:51 [Sarama] loop check partition number coroutine will exit, topics [batch-worker-progress-topic] 2023/04/24 11:32:51 [Processor batch-worker-progress-processor > PartitionProcessor (0)] ... Stopping done 2023/04/24 11:32:51 [Processor batch-worker-progress-processor] Cleaning up for 401 ... done 2023/04/24 11:32:51 [Processor batch-worker-progress-processor] consuming from consumer loop done 2023/04/24 11:32:51 [Processor batch-worker-progress-processor] Consumer group loop done, will stop here 2023/04/24 11:32:56 [Processor batch-worker-progress-processor] consuming from consumer loop 2023/04/24 11:32:56 [Sarama] client/metadata fetching metadata for [batch-worker-progress-topic] from broker kafka-0.kafka-headless.nightwing.svc.cluster.local:9092 2023/04/24 11:32:56 [Sarama] client/coordinator requesting coordinator for consumergroup batch-worker-progress-processor from kafka-1.kafka-headless.nightwing.svc.cluster.local:9092 2023/04/24 11:32:56 [Sarama] client/coordinator coordinator for consumergroup batch-worker-progress-processor is (kafka-1.kafka-headless.nightwing.svc.cluster.local:9092) 2023/04/24 11:32:56 [Processor batch-worker-progress-processor] setup generation 402, claims=map[string][]int32{"batch-worker-progress-topic":[]int32{0}} 2023/04/24 11:32:56 [Processor batch-worker-progress-processor] Creating partition processor for partition 0 2023/04/24 11:32:56 [Processor batch-worker-progress-processor > PartitionProcessor (0)] catching up table 2023/04/24 11:32:56 [Processor batch-worker-progress-processor > PartitionProcessor (0) > PartTable] finished building storage for topic batch-worker-progress-processor-table/0 in 0.0 minutes
frairon commented 1 year ago

Hey @aditya-msd ,

currently there is no possibility to drop those messages, because it rarely happens and if it does, the processor must solve its underlying issue (or it will corrupt its own data or lose data). However maybe it does make sense to handle messages that cause that behavior somehow. What would be your approach to "side step" the processing, as you said? Should it be silently dropped? Maybe we could add a callback being triggered when that error happens? Or any other ideas?

aditya-msd commented 1 year ago

For sidestepping :

  1. I can calculate the size of the message that is going to be set . But since the compressed data is what is going to be sent to kafka , I am unable to put a upper limit to this . As per the logs earlier , the Workerstate Size :: 89983 , this is the no of bytes . My topic size is set to max.message.bytes=10000 , which means , that the compressed data size is what is being checked .

  2. Right now , I know the error has occurred via the logs . Within the code , how to catch this error , so I can notify/manipulate the internal state as required .

Also you mentioned , we could add a callback being triggered when that error happens , can you provide code snippet as where to add this or any references that use this approach .

Else I have to figure some other means to detect this .

frairon commented 1 year ago

My point actually was that there is currently no way to detect or handle a failing emit. The processor shuts down, thats it. But we could build one, it doesn't sound too hard to do. That's why I was wondering if you already had an idea how a solution could look like? Anyway, once we find the time we'll take a look and maybe an obvious solution pops up.