dibbhatt / kafka-spark-consumer

High Performance Kafka Connector for Spark Streaming.Supports Multi Topic Fetch, Kafka Security. Reliable offset management in Zookeeper. No Data-loss. No dependency on HDFS and WAL. In-built PID rate controller. Support Message Handler . Offset Lag checker.
Apache License 2.0
635 stars 318 forks source link

May I have a Scala sample of messageHandler to filter out some playload which includes some strings? #59

Closed yufuxin closed 5 years ago

yufuxin commented 6 years ago

When I did like this: class MyMessageHandler() extends KafkaMessageHandler() { override def process(payload:Array[Byte]) :Array[Byte]={
val stream: ByteArrayOutputStream = new ByteArrayOutputStream() val oos = new ObjectOutputStream(stream) oos.writeObject(payload) oos.close() stream.toByteArray } }

Got Errors:

dibbhatt commented 6 years ago

Sorry for little delay. I will try this out tomorrow and let you know.

dibbhatt commented 6 years ago

Hi, this is what I tried and it worked

dibbhatt commented 6 years ago

This is the Handler ( filtering anything that have a string "test")

import consumer.kafka.KafkaMessageHandler

object MyMessageHandler extends KafkaMessageHandler[Array[Byte]] {

override def process(payload: Array[Byte]): Array[Byte] = { val s = new String(payload) if (!s.contains("test")) return s.getBytes else return "NA".getBytes } }

dibbhatt commented 6 years ago

val tmp_stream = ReceiverLauncher.launch(ssc, props, numberOfReceivers,StorageLevel.MEMORY_ONLY, MyMessageHandler)

val stream = tmp_stream.map(x => { val s = new String(x.getPayload); s }) stream.print()

yufuxin commented 6 years ago

Thanks for your response, I'll try it later. BTW, If I always has these message displayed, is it means the Executors don't have enough time to processing messages?

18/11/09 17:02:40 WARN PartitionManager: Offset reset to EarliestTime 321520763 for Topic tr69data24 partition 39 18/11/09 17:02:40 WARN PartitionManager: Offset reset to EarliestTime 326889487 for Topic tr69data24 partition 19 18/11/09 17:02:40 WARN PartitionManager: Offset reset to EarliestTime 388293805 for Topic tr69data24 partition 17 18/11/09 17:02:40 WARN PartitionManager: Offset reset to EarliestTime 322954893 for Topic tr69data24 partition 9 18/11/09 17:02:40 WARN PartitionManager: Offset reset to EarliestTime 697198469 for Topic tr69data24 partition 15 18/11/09 17:02:40 WARN PartitionManager: Offset reset to EarliestTime 212513821 for Topic tr69data24 partition 21 18/11/09 17:27:26 WARN PartitionManager: Offset reset to EarliestTime 402061188 for Topic tr69data24 partition 6 18/11/09 17:27:27 WARN PartitionManager: Offset reset to EarliestTime 424279888 for Topic tr69data24 partition 14

yufuxin commented 6 years ago

I have set up following parameters: "max.poll.records" -> "5", "consumer.fillfreqms" -> "200", "consumer.num_fetch_to_buffer" -> "1") and I convert Arvo format message from the Streaming

stream.foreachRDD((rdd, timeStamp) => {
 try {
   unioRddLock.lock()
   val rdd_data: RDD[(String, Row)] = rdd.map(avroRecord => {
    //Convert each rdd_data from Arvo format to  GenericRecord
    val parser: Schema.Parser = new Schema.Parser()
    val schema: Schema = parser.parse(USER_SCHEMA)
    val recordInjection = GenericAvroCodecs.toBinary(schema)
    val m = avroRecord.getPayload()
    val record: GenericRecord = recordInjection.invert(m).get

    (if (record.get("xxxx") != null) record.get("xxxxx").toString() else null, Row(
      if (record.get("xxxx") != null) (record.get("xxx")).toString().toString else null,
      if (record.get("xxxxx") != null) record.get("xxx").toString() else null,
      if (record.get("xxxxx") != null) record.get("xxx").toString() else null,
      if (record.get("xxxx") != null) record.get("xxxx").toString() else null,
      if (record.get("xxxx") != null) record.get("xxxx").toString() else null,
      if (record.get("xxxx") != null) record.get("xxx").toString() else null,
      if (record.get("xxxx") != null) record.get("xxx").toString() else null,

.....

     timeStamp.milliseconds.toString, ""))

  })
dibbhatt commented 6 years ago

by the way ,you can increase "consumer.num_fetch_to_buffer" much higher say 50 and see.

"max.poll.records" also can be much higher. Your settings mean, every 200 ms , it will pull 5 records. If your ingesting rate is higher than this, you may see Lag is increasing in Kafka. Set to the higher rate than your incoming. e.g. if you are getting 10,000 messages / seconds, you should keep max.poll.records as 2000 and fillFrequency can be 200 ms. That way it can match 10k/sec ingestion rate.

If your Batch Duration is say 30 seconds, you will basically have around 150 pull per partition every batch. ( 5 pull per partition per seconds as your pullFreq is 200 ms). So each Consumer thread will potentially contribute 150 RDD partition (each pull create one RDD Partition) . If you do num.fetch_to_buffer to say 50, you will see you will have around 3 RDD partition contributed by each consumer thread. fetch_to_buffer number controls the number of pull Consumer will keep in memory before writing to Spark Block Manager.

That mean, with fetch_to_buffer as 50, and Batch Duration to 30 sec, if your topic has say 5 partitions, you would see each Streaming jobs will run on RDD of around 5 x 3 = 15 partitions.

If you alter fetch_to_buffer as 10, and Batch Duration still 30 sec, with 5 partitions topic , you would see each Streaming jobs will run on RDD of around 15 x 3 = 45 partitions.

With fetch_to_buffer=1, Spark will adjust the RDD partition on its own. But you can control it with different values of fetch_to_buffer.

dibbhatt commented 6 years ago

You have offset out of range and offset reset to earliest time. What is happening is , as your processing rate is very less , your Lag in Kafka getting larger . If your have Kafka Retention is set to say X hours, then Kafka Deletes messages anything older than X hours . So that causes the Offset Reset to earliest time. i.e. when Consumer tries to fetch messages from offset say 100, those messages got purged from Kafka already. So consumer reset to Earliest available , say 125..... .

You need to bump up your consumer rate . If you are in Test mode, you can delete the ZK Path where offsets are saved, and start fresh with New Fetch settings.

By the way, you can see the Lag of your consumer like this ....This will show how far behind each consumer thread is and that will give an idea of Lag.

kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --broker-info x.x.x.x:9092 --group --topic --zookeeper y.y.y.y:2181

yufuxin commented 6 years ago

Our Kafka has 48 partitions and the Retention is set to one week, the outOfOffset exception said:

18/11/09 22:32:42 WARN PartitionManager: Got fetch request with offset out of range: 182828517 for Topic tr69data24 partition 25 18/11/09 22:32:42 WARN PartitionManager: Offset reset to EarliestTime 182828527 for Topic tr69data24 partition 25

Even I set the "consumer.num_fetch_to_buffer" -> "50", still has same problem. I doubt Kafka Broker has some problem, but the fact is this Kafka is supported by maintain team, I do not have any access to .... I did a try with kafka-consumer-groups.sh got error like this:

/usr/hdp/current/kafka-broker/bin/kafka-consumer-groups.sh --bootstrap-server qcr-hadoop-g901.oss.ads:6667 --list [2018-11-09 23:35:57,441] WARN Bootstrap broker qcr-hadoop-g901.oss.ads:6667 disconnected (org.apache.kafka.clients.NetworkClient) Error while executing consumer group command Request METADATA failed on brokers List(qcr-hadoop-g901.oss.ads:6667 (id: -1 rack: null)) java.lang.RuntimeException: Request METADATA failed on brokers List(qcr-hadoop-g901.oss.ads:6667 (id: -1 rack: null))

dibbhatt commented 6 years ago

try to see the Lag by executing the ConsumerOffsetChecker I given above. I think your consumer lag is too high. Let me know how much the lag is present . Are you running this in production now ? If you do not care about lag, then deleting Kafka Offsets stored in ZK and restart consumer will help. But you first need to find your incoming rate and adjust the consumer rate accordingly. If Lag is too high ( I suspect your consumer is almost 7 days behind) then whatever rate you set, you will see this error. First run the ConsumerOffsetChecker and share the offset Lag

yufuxin commented 6 years ago

when I submit ConsumerOffsetChecker command, got error "Exiting due to: Authentication failure", I need support from OPS team on next Monday, will let you know the result, thanks! Have a good weekend.

dibbhatt commented 6 years ago

Is scala message handler is now working ?

yufuxin commented 6 years ago

Sorry I still waiting OPS team to fix the Authentication issue, right now can not delete Offsets stored in ZK and can not see the Lag details.

yufuxin commented 6 years ago

The scala message handler is OK now

dibbhatt commented 6 years ago

ok, good to hear that.

To delete the offset details ,stop the Spark Streaming job , login to ZKCli, and delete this path

rmr /consumers/[your consumer id]/offsets/

Then restart the consumer with larger max.poll.records and higher fetch_to_buffer.

But getting the current Lag is most important, and you need to have monitoring on the Lag as well.

dibbhatt commented 5 years ago

@yufuxin any luck getting the Lag details ? You want me to keep this issue open ?

yufuxin commented 5 years ago

Hi Dibbhatt, After delete zookeeper offsets and re-set fetch_to_buffer =50 and max.poll.records = 500, the Lag less 500 for each partition, better than before, thanks ! I'll continue monitor it and close this issue.

dibbhatt commented 5 years ago

@yufuxin thanks . You need to keep monitoring lag. If the same increases, you may need to bump up max.poll.records to higher value.

yufuxin commented 5 years ago

ok, Thanks!