FrankChen021 / bithon

An observability platform mainly for Java
Apache License 2.0
15 stars 4 forks source link

Improve the kafka producer buffer limitation #646

Closed FrankChen021 closed 3 months ago

FrankChen021 commented 10 months ago
java.lang.RuntimeException:Buffer is limited to size of 4194304, but requires 7601915.
    at org.bithon.server.collector.sink.kafka.FixedSizeBuffer.ensureCapacity(FixedSizeBuffer.java:92)
    at org.bithon.server.collector.sink.kafka.FixedSizeBuffer.writeBytes(FixedSizeBuffer.java:46)
    at org.bithon.server.collector.sink.kafka.KafkaTraceSink.process(KafkaTraceSink.java:128)
    at org.bithon.server.collector.source.http.TraceHttpCollector.span(TraceHttpCollector.java:115)

Above exception was observed. The problem is that the buffer keeps the UN-COMPRESSED data, while we check the buffer size (currentSize) by the COMPRESSED data.

            int currentSize = AbstractRecords.estimateSizeInBytesUpperBound(RecordBatch.CURRENT_MAGIC_VALUE,
                                                                            this.compressionType,
                                                                            messageKey,
                                                                            messageBuffer.toByteBuffer(),
                                                                            new Header[]{header});

            // plus 2 to leave 2 bytes as margin
            if (currentSize + serializedSpan.length + 2 > messageBuffer.limit()) {
                send(messageKey, messageBuffer);

                messageBuffer.reset();
                messageBuffer.writeChar('[');
            }

            messageBuffer.writeBytes(serializedSpan);

It's a little bit tricky because the buffer size is initialized by the max-request-size which holds the compressed data.

FrankChen021 commented 5 months ago
  1. Flushes the buffer once the exception is raised. Or expose a method to check if the size is going to be overflow, if it's, then flushes the buffer.

  2. replace the implentation by using bytebuffer to save memory from the heap