ozangunalp / kafka-native

Kafka broker compiled to native using Quarkus and GraalVM.
Apache License 2.0
104 stars 13 forks source link

Broker error when compression.type=zstd in producer #203

Open furushchev opened 4 months ago

furushchev commented 4 months ago

Hi, I'm using the kafka-native docker image and have a producer where the compression.type is set to zstd. Every time the message is sent, the error message shows up on the docker container:

kafka_zstd_error

Is there anything that I should do to avoid this error on the docker side?

k-wall commented 4 months ago

hello, thanks for raising the issue. I took a quick look today. I find that compression.type gzip and lz4 are working, both zstd and snappy fail with a JNI error.

My reproducer (testing on main)

  1. Compile natively
    mvn clean package -Dnative
  2. Create a topic configured for compression
    kafka-topics --create --topic snappy-compressed-topic --bootstrap-server  localhost:9092  --config compression.type=snappy
  3. Send an uncompressed record to the broker, forcing the broker to compress.
    echo hello | kafka-console-producer --bootstrap-server localhost:9092   --topic snappy-compressed-topic
  4. Broker will log the stacktrace
    2024-07-22 17:20:39,081 ERROR [kaf.ser.ReplicaManager] (data-plane-kafka-request-handler-7) [ReplicaManager broker=1] Error processing append operation on partition snappy-compressed-topic-0: org.apache.kafka.common.KafkaException: java.lang.UnsatisfiedLinkError: org.xerial.snappy.SnappyNative.maxCompressedLength(I)I [symbol: Java_org_xerial_snappy_SnappyNative_maxCompressedLength or Java_org_xerial_snappy_SnappyNative_maxCompressedLength__I]
    at org.apache.kafka.common.compress.SnappyFactory.wrapForOutput(SnappyFactory.java:38)
    at org.apache.kafka.common.record.CompressionType$3.wrapForOutput(CompressionType.java:98)
    at org.apache.kafka.common.record.MemoryRecordsBuilder.<init>(MemoryRecordsBuilder.java:140)
    at org.apache.kafka.common.record.MemoryRecordsBuilder.<init>(MemoryRecordsBuilder.java:160)
    at org.apache.kafka.common.record.MemoryRecordsBuilder.<init>(MemoryRecordsBuilder.java:198)
    at org.apache.kafka.common.record.MemoryRecords.builder(MemoryRecords.java:591)
    at org.apache.kafka.common.record.MemoryRecords.builder(MemoryRecords.java:573)
    at org.apache.kafka.storage.internals.log.LogValidator.buildRecordsAndAssignOffsets(LogValidator.java:451)
    at org.apache.kafka.storage.internals.log.LogValidator.validateMessagesAndAssignOffsetsCompressed(LogValidator.java:411)
    at org.apache.kafka.storage.internals.log.LogValidator.validateMessagesAndAssignOffsets(LogValidator.java:165)
    at kafka.log.UnifiedLog.append(UnifiedLog.scala:809)
    at kafka.log.UnifiedLog.appendAsLeader(UnifiedLog.scala:722)
    at kafka.cluster.Partition.$anonfun$appendRecordsToLeader$1(Partition.scala:1380)
    at kafka.cluster.Partition.appendRecordsToLeader(Partition.scala:1368)
    at kafka.server.ReplicaManager.$anonfun$appendToLocalLog$6(ReplicaManager.scala:1536)
    at scala.collection.StrictOptimizedMapOps.map(StrictOptimizedMapOps.scala:28)
    at scala.collection.StrictOptimizedMapOps.map$(StrictOptimizedMapOps.scala:27)
    at scala.collection.mutable.HashMap.map(HashMap.scala:35)
    at kafka.server.ReplicaManager.appendToLocalLog(ReplicaManager.scala:1522)
    at kafka.server.ReplicaManager.appendRecords(ReplicaManager.scala:861)
    at kafka.server.KafkaApis.handleProduceRequest(KafkaApis.scala:720)
    at kafka.server.KafkaApis.handle(KafkaApis.scala:184)
    at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:160)
    at java.base@21/java.lang.Thread.runWith(Thread.java:1596)
    at java.base@21/java.lang.Thread.run(Thread.java:1583)
    at org.graalvm.nativeimage.builder/com.oracle.svm.core.thread.PlatformThreads.threadStartRoutine(PlatformThreads.java:833)
    at org.graalvm.nativeimage.builder/com.oracle.svm.core.posix.thread.PosixPlatformThreads.pthreadStartRoutine(PosixPlatformThreads.java:211)
    Caused by: java.lang.UnsatisfiedLinkError: org.xerial.snappy.SnappyNative.maxCompressedLength(I)I [symbol: Java_org_xerial_snappy_SnappyNative_maxCompressedLength or Java_org_xerial_snappy_SnappyNative_maxCompressedLength__I]
    at org.graalvm.nativeimage.builder/com.oracle.svm.core.jni.access.JNINativeLinkage.getOrFindEntryPoint(JNINativeLinkage.java:152)
    at org.graalvm.nativeimage.builder/com.oracle.svm.core.jni.JNIGeneratedMethodSupport.nativeCallAddress(JNIGeneratedMethodSupport.java:54)
    at org.xerial.snappy.SnappyNative.maxCompressedLength(Native Method)
    at org.xerial.snappy.Snappy.maxCompressedLength(Snappy.java:406)
    at org.xerial.snappy.SnappyOutputStream.<init>(SnappyOutputStream.java:103)
    at org.xerial.snappy.SnappyOutputStream.<init>(SnappyOutputStream.java:92)
    at org.xerial.snappy.SnappyOutputStream.<init>(SnappyOutputStream.java:82)
    at org.apache.kafka.common.compress.SnappyFactory.wrapForOutput(SnappyFactory.java:36)
    ... 26 more

I'm not sure of the right way to fix this. I wonder whether https://github.com/quarkiverse/quarkus-snappy/blob/main/deployment/src/main/java/io/quarkiverse/snappy/deployment/SnappyProcessor.java is way?