Closed mirkub closed 8 years ago
You can test it with: (cat last_req.data; sleep 2) | nc broker_ip 6667 | hexdump -C
You shouldn't see ff ff ff ff in response.
Do you have any stack trace or jvm crash log? I need to know where the exception is thrown. And also could you explain how did you create last_req.data.gz?
[2016-05-24 09:53:08,503] ERROR [Replica Manager on Broker 1001]: Error processing append operation on partition ss7,3 kafka.common.KafkaException: at kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:94) at kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:64) at kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:66) at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:58) at kafka.message.ByteBufferMessageSet$$anon$2.makeNextOuter(ByteBufferMessageSet.scala:207) at kafka.message.ByteBufferMessageSet$$anon$2.makeNext(ByteBufferMessageSet.scala:219) at kafka.message.ByteBufferMessageSet$$anon$2.makeNext(ByteBufferMessageSet.scala:173) at kafka.utils.IteratorTemplate.maybeComputeNext(IteratorTemplate.scala:66) at kafka.utils.IteratorTemplate.hasNext(IteratorTemplate.scala:58) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) at scala.collection.AbstractIterator.to(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) at kafka.message.ByteBufferMessageSet.validateMessagesAndAssignOffsets(ByteBufferMessageSet.scala:267) at kafka.log.Log.liftedTree1$1(Log.scala:327) at kafka.log.Log.append(Log.scala:326) at kafka.cluster.Partition$$anonfun$9.apply(Partition.scala:442) at kafka.cluster.Partition$$anonfun$9.apply(Partition.scala:428) at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262) at kafka.utils.CoreUtils$.inReadLock(CoreUtils.scala:268) at kafka.cluster.Partition.appendMessagesToLeader(Partition.scala:428) at kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(ReplicaManager.scala:401) at kafka.server.ReplicaManager$$anonfun$appendToLocalLog$2.apply(ReplicaManager.scala:386) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98) at scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226) at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39) at scala.collection.mutable.HashMap.foreach(HashMap.scala:98) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.AbstractTraversable.map(Traversable.scala:105) at kafka.server.ReplicaManager.appendToLocalLog(ReplicaManager.scala:386) at kafka.server.ReplicaManager.appendMessages(ReplicaManager.scala:322) at kafka.server.KafkaApis.handleProducerRequest(KafkaApis.scala:366) at kafka.server.KafkaApis.handle(KafkaApis.scala:68) at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:60) at java.lang.Thread.run(Thread.java:722) Caused by: java.io.IOException: failed to read chunk at org.xerial.snappy.SnappyInputStream.hasNextChunk(SnappyInputStream.java:416) at org.xerial.snappy.SnappyInputStream.rawRead(SnappyInputStream.java:182) at org.xerial.snappy.SnappyInputStream.read(SnappyInputStream.java:163) at java.io.DataInputStream.readFully(DataInputStream.java:195) at java.io.DataInputStream.readLong(DataInputStream.java:416) at kafka.message.ByteBufferMessageSet$$anon$1.makeNext(ByteBufferMessageSet.scala:72) ... 43 more
The file last_req.data is recorded traffic coming from our kafka producer (written in c++). The producer doesn't use {0x82, 'S', 'N', 'A', 'P', 'P', 'Y', 0} header. In this file the snappy compression data starts with the 0x82 character and make the troubles.
OK. I think we should perform the full header check here. I'll fix it
Kafka 0.10.0 seems to be suffering from this: https://issues.apache.org/jira/browse/KAFKA-3764
Any progress on a possible fix?
Released snappy-java-1.1.2.6, which fixes this issue.
In recent version we see occurrences of exception which seems to be due to bad handling of the MAGIC HEADER in SnappyInputStream.java readHeader():
if (readBytes < header.length || header[0] != SnappyCodec.MAGIC_HEADER[0])
This checks just the 1st byte and if it's set (by accident) then the exception happens. Could you fix this or revert latest changes? It was fine in 1.1.1.7