linkedin / li-apache-kafka-clients

li-apache-kafka-clients is a wrapper library for the Apache Kafka vanilla clients. It provides additional features such as large message support and auditing to the Java producer and consumer in the open source Apache Kafka.
BSD 2-Clause "Simplified" License
131 stars 52 forks source link

Consumer can't retrieve large message from Kafka #95

Open JsonFn opened 6 years ago

JsonFn commented 6 years ago

Kafka version: 1.1.1 Scala version: 2.11.12 sbt version: 0.13.16 li-apache-kafka-clients version: 0.0.15

It seems that I can produce large String message to Kafka broker, but consumer (with LiKafkaConsumerImpl) fails to retrieve the message. There is no error. Only the returned ConsumerRecords.count() size is 0, so it keeps in looping for polling message out of Kafka broker.

However I notice if consuming with KafkaConsumer instance, ConsumerRecords.count() contains size like 500.

Is following configuration correct?

* LiKafkaProducerImpl
    * Config
        * "large.message.enabled" -> "true"
        * "segment.serializer" -> classOf[DefaultSegmentSerializer].getName
        * "auditor.class" -> classOf[LoggingAuditor[_,_]].getName
        * "max.message.segment.size" -> 200 (or "max.message.segment.bytes" -> 200)

 * LiKafkaConsumerImpl
     * Config
         * "message.assembler.buffer.capacity" -> "20971520"
         * "message.assembler.expiration.offset.gap" -> "10000"
         * "exception.on.message.dropped" -> "true"
         * "max.tracked.messages.per.partition" -> "10000"

There are several warnings for both Producer and Consumer that looks like not harmful. Below are one of them

2018-07-26 14:13:59 WARN  ConsumerConfig:287 - The configuration 'exception.on.message.dropped' was supplied but isn't a known config.

Log messages seems to show that Producer works ok.

2018-07-26 14:13:59 INFO  LoggingAuditor:46 - [Thu Jul 26 14:10:00 CEST 2018 - Thu Jul 26 14:20:00 CEST 2018] : (testLargeMessageTopic,2554345,ATTEMPT), (1 messages, 2097172 bytes)
2018-07-26 14:13:59 INFO  LoggingAuditor:46 - [Thu Jul 26 14:10:00 CEST 2018 - Thu Jul 26 14:20:00 CEST 2018] : (testLargeMessageTopic,2554345,SUCCESS), (1 messages, 2097172 bytes)
2018-07-26 14:13:59 INFO  KafkaProducer:1054 - [Producer clientId=producer-1] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
2018-07-26 14:13:59 INFO  LiKafkaProducerImpl:301 - Shutdown complete in 5 millis

Code that generate large string

  def msg(): String = {
    val builder = new StringBuilder()
    for(idx <- 0 to (2 * 1024 * 1024)) builder.append((idx % 10))
    builder.toString
  }

Code that produce and consume large message

  val prodProps = ... // producer properties described above 
  val producer = new LiKafkaProducer(prodProps)
  producer.send("testLargeMessageTopic", "testLargeMessageKey", largeString)
  try { producer.flush } finally { producer.close }

  val conProps = ... // consumer properties describe above 
  val consumer = new LiKafkaConsumer(conProps)
  consumer.subscribe("testLargeMessageTopic")
  var flag = true
  breakable {
      while(flag) {
          val records = consumer.poll(100L)
          println(s"${records.count} records ...") // this always returns 0 so it never falls in stoping foreach loop

          records.asScala.foreach { case record => flag = op(record);  if(!flag) break }
      }
  }

But initialize with KafkaConsumer, it does returns records, except it's just segment data e.g.

topic: testLargeMessageTopic, key: testLargeMessageKey, value: mM�_P�H��/�4Z��^(� 01234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789

What could be missing when creating LiKafkaConsumer for consuming large message?

Thanks