Closed alonshoshani closed 5 years ago
Hi @alonshoshani can you share your code snippet please how you are using this
Hi, @dibbhatt yes sure :) If you available here, could you please tell me how can I extract the min/max offsets of each JavaRDD, I want each micro batch will create his own file in s3( the offsests will be in the name of the file), in case of failure it will overwrite the file with the same name and this is how I prevent duplicates in my data.
`
public static JavaSparkContext sc;
public static void main(String[] args) throws IOException {
Properties props = new Properties();
props.put("zookeeper.hosts", "172.12.1.2");
props.put("zookeeper.port", "2181");
props.put("kafka.topic", "my-events");
props.put("kafka.consumer.id", "my-events-consumer");
props.put("bootstrap.servers", "ec2-1-2-3-4.us-west-2.compute.amazonaws.com:9092");
props.put("max.poll.records", "250");
props.put("consumer.fillfreqms", "1000");
SparkConf sparkConf = new SparkConf();
sparkConf.setAppName("ss");
sparkConf.setMaster("local[2]");
JavaStreamingContext jsc = new JavaStreamingContext(sparkConf, Durations.seconds(30));
int numberOfReceivers = 3;
JavaDStream<MessageAndMetadata<byte[]>> unionStreams = ReceiverLauncher.launch(
jsc, props, numberOfReceivers, StorageLevel.MEMORY_ONLY());
JavaPairDStream<Integer, Iterable<Long>> partitonOffset = ProcessedOffsetManager
.getPartitionOffset(unionStreams, props);
final SparkSession sparkSession =
SparkSession.builder()
.config(sparkConf).getOrCreate();
unionStreams.foreachRDD(new VoidFunction<JavaRDD<MessageAndMetadata<byte[]>>>() {
@Override
public void call(JavaRDD<MessageAndMetadata<byte[]>> rdd) throws Exception {
// trying getting the min/max offesetes of the rdd
JavaPairRDD<Integer, Long> integerLongJavaPairRDD = rdd.mapPartitionsToPair(new PartitionOffsetPair<>());
JavaPairRDD<Integer, Iterable<Long>> integerIterableJavaPairRDD = integerLongJavaPairRDD.groupByKey(1);
List<MessageAndMetadata<byte[]>> rddList = rdd.collect();
System.out.println(" Number of records in this batch " + rddList.size());
Dataset<Row> dataFrame = sparkSession.createDataFrame(rdd, null);
// dataFrame.write().partitionBy("tim").parquet();
}
});
ProcessedOffsetManager.persists(partitonOffset, props);
try {
jsc.start();
jsc.awaitTermination();
} catch (Exception ex) {
jsc.ssc().sc().cancelAllJobs();
jsc.stop(true, false);
System.exit(-1);
}
}
}
`
Hi @alonshoshani , the MessageAndMetadata object has the offset details of the message. You can do getOffset to get the corresponding offset of the message. The RDD has many partitions , and all Partitions has data from multiple Kafka Partitions. So if you need to know the Offset Ranges (min/max) for each Kafka Partitions, you need to write some custom code. You can see some reference how it is done in ProcessedOffsetManager.persists call. It basically gets the Max offset for each Kafka Partitions and commits to ZK. You can refer that code.
thank you for your reply, i'll try it, I still get the java.lang.NoClassDefFoundError: kafka/api/OffsetRequest exception.. any idea?
Can you share the full Stacktrace please
`objc[85483]: Class JavaLaunchHelper is implemented in both /Library/Java/JavaVirtualMachines/jdk1.8.0_91.jdk/Contents/Home/bin/java (0x10b6144c0) and /Library/Java/JavaVirtualMachines/jdk1.8.0_91.jdk/Contents/Home/jre/lib/libinstrument.dylib (0x10b6dc4e0). One of the two will be used. Which one is undefined.
Exception in thread "main" java.lang.NoClassDefFoundError: kafka/api/OffsetRequest
at consumer.kafka.KafkaConfig.
Process finished with exit code 1`
About the offsetes parts, in the comments it says "For Each RDD partition belongs to one Kafka Partition"
So in the code below the
JavaPairRDD<Integer, Long> integerLongJavaPairRDD = rdd.mapPartitionsToPair(new PartitionOffsetPair<>());
this line will map the same partition number to the offset no? lets say partition number 1 have first 10 message so the map will look like this,
{1, 0}
{1, 1}
{1, 2}
...
{1, 9}
no?
`objc[85483]: Class JavaLaunchHelper is implemented in both /Library/Java/JavaVirtualMachines/jdk1.8.0_91.jdk/Contents/Home/bin/java (0x10b6144c0) and /Library/Java/JavaVirtualMachines/jdk1.8.0_91.jdk/Contents/Home/jre/lib/libinstrument.dylib (0x10b6dc4e0). One of the two will be used. Which one is undefined. Exception in thread "main" java.lang.NoClassDefFoundError: kafka/api/OffsetRequest at consumer.kafka.KafkaConfig.(KafkaConfig.java:50) at consumer.kafka.ReceiverLauncher.createStream(ReceiverLauncher.java:81) at consumer.kafka.ReceiverLauncher.launch(ReceiverLauncher.java:69) at io.oribi.SparkStreamExample.main(SparkStreamExample.java:53) Caused by: java.lang.ClassNotFoundException: kafka.api.OffsetRequest at java.net.URLClassLoader.findClass(URLClassLoader.java:381) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) ... 4 more
Process finished with exit code 1`
I see. I guess Kafka has removed the OffsetRequest API in 0.10.2.0. Can you please use the latest kafka-consumer version (1.0.17)
Now I get this exception although this partition is exists, maybe it because the consumer 1.0.17 is not compatible with kafka 0.10.2 ( my kafka ) ? ` objc[85552]: Class JavaLaunchHelper is implemented in both /Library/Java/JavaVirtualMachines/jdk1.8.0_91.jdk/Contents/Home/bin/java (0x1055704c0) and /Library/Java/JavaVirtualMachines/jdk1.8.0_91.jdk/Contents/Home/jre/lib/libinstrument.dylib (0x10761d4e0). One of the two will be used. Which one is undefined. Connected to the target VM, address: '127.0.0.1:57343', transport: 'socket' Exception in thread "main" java.lang.RuntimeException: org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = NoNode for /brokers/topics/processed-events/partitions at consumer.kafka.ReceiverLauncher.getNumPartitions(ReceiverLauncher.java:147) at consumer.kafka.ReceiverLauncher.createStream(ReceiverLauncher.java:90) at consumer.kafka.ReceiverLauncher.launch(ReceiverLauncher.java:69) at io.oribi.SparkStreamExample.main(SparkStreamExample.java:53) Caused by: org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = NoNode for /brokers/topics/processed-events/partitions at org.apache.zookeeper.KeeperException.create(KeeperException.java:111) at org.apache.zookeeper.KeeperException.create(KeeperException.java:51) at org.apache.zookeeper.ZooKeeper.getChildren(ZooKeeper.java:1590) at org.apache.curator.framework.imps.GetChildrenBuilderImpl$3.call(GetChildrenBuilderImpl.java:214) at org.apache.curator.framework.imps.GetChildrenBuilderImpl$3.call(GetChildrenBuilderImpl.java:203) at org.apache.curator.RetryLoop.callWithRetry(RetryLoop.java:107) at org.apache.curator.framework.imps.GetChildrenBuilderImpl.pathInForeground(GetChildrenBuilderImpl.java:199) at org.apache.curator.framework.imps.GetChildrenBuilderImpl.forPath(GetChildrenBuilderImpl.java:191) at org.apache.curator.framework.imps.GetChildrenBuilderImpl.forPath(GetChildrenBuilderImpl.java:38) at consumer.kafka.ReceiverLauncher.getNumPartitions(ReceiverLauncher.java:144) ... 3 more Disconnected from the target VM, address: '127.0.0.1:57343', transport: 'socket'
Process finished with exit code 1 `
About the offsetes parts, in the comments it says "For Each RDD partition belongs to one Kafka Partition"
So in the code below the
JavaPairRDD<Integer, Long> integerLongJavaPairRDD = rdd.mapPartitionsToPair(new PartitionOffsetPair<>());
this line will map the same partition number to the offset no? lets say partition number 1 have first 10 message so the map will look like this, {1, 0} {1, 1} {1, 2} ... {1, 9}no?
So this line will get the Pair of {partition , offset} for every RDD Partition. Let say RDD Partition has a message from Kafka Partition 1 , offset 100, and RDD Partition 2 has another messages from same kafka partition 1, but offset 150, this call will return {1,150}..and same for all kafka partitions. You can see the PartitionOffsetPair.java
Ok, but the messages in each RDD from the same Kafka partition?, because I want to create for every micro batch ( which have data only from one Kafka partition ) a file with the min/max offset which already processed.
You can very well do that if you do MapPartitionToPair with {partition_id, MessageAndMetaData} . ( partition id you can get from MessageAndMetaData object). Then you do groupBy partition_id . So now all your RDD Partitions have groups which belongs to Same Kafka Partitions. On this groupBy rdd, you apply your logic of writing every file for every group. if you wish, I can provide you a snippet of code for your reference
It will be great for anexample! thx a lot! what do you think about that issue? https://github.com/dibbhatt/kafka-spark-consumer/issues/63#issuecomment-500751525
I guess Kafka has removed the OffsetRequest API in 0.10.2.0. Can you please use the latest kafka-consumer version (1.0.17). I will share the code snippet in a while
I already did it, but then I get this exception although, this partition is exist, maybe it because the consumer 1.0.17 is not compatible with kafka 0.10.2 ( my kafka ) ? ` objc[85552]: Class JavaLaunchHelper is implemented in both /Library/Java/JavaVirtualMachines/jdk1.8.0_91.jdk/Contents/Home/bin/java (0x1055704c0) and /Library/Java/JavaVirtualMachines/jdk1.8.0_91.jdk/Contents/Home/jre/lib/libinstrument.dylib (0x10761d4e0). One of the two will be used. Which one is undefined. Connected to the target VM, address: '127.0.0.1:57343', transport: 'socket' Exception in thread "main" java.lang.RuntimeException: org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = NoNode for /brokers/topics/processed-events/partitions at consumer.kafka.ReceiverLauncher.getNumPartitions(ReceiverLauncher.java:147) at consumer.kafka.ReceiverLauncher.createStream(ReceiverLauncher.java:90) at consumer.kafka.ReceiverLauncher.launch(ReceiverLauncher.java:69) at io.oribi.SparkStreamExample.main(SparkStreamExample.java:53) Caused by: org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = NoNode for /brokers/topics/processed-events/partitions at org.apache.zookeeper.KeeperException.create(KeeperException.java:111) at org.apache.zookeeper.KeeperException.create(KeeperException.java:51) at org.apache.zookeeper.ZooKeeper.getChildren(ZooKeeper.java:1590) at org.apache.curator.framework.imps.GetChildrenBuilderImpl$3.call(GetChildrenBuilderImpl.java:214) at org.apache.curator.framework.imps.GetChildrenBuilderImpl$3.call(GetChildrenBuilderImpl.java:203) at org.apache.curator.RetryLoop.callWithRetry(RetryLoop.java:107) at org.apache.curator.framework.imps.GetChildrenBuilderImpl.pathInForeground(GetChildrenBuilderImpl.java:199) at org.apache.curator.framework.imps.GetChildrenBuilderImpl.forPath(GetChildrenBuilderImpl.java:191) at org.apache.curator.framework.imps.GetChildrenBuilderImpl.forPath(GetChildrenBuilderImpl.java:38) at consumer.kafka.ReceiverLauncher.getNumPartitions(ReceiverLauncher.java:144) ... 3 more Disconnected from the target VM, address: '127.0.0.1:57343', transport: 'socket'
Process finished with exit code 1 `
It should not fail. By defaults Kafka topics path is under /brokers
In your case did you created topic in different path in ZK ?
if yes, you need to specify that path in zookeeper.broker.path property
So in your Kafka server.properties , you can see entry like this
zookeeper.connect=
If you see that , all your topics details goes under this path. So in the consumer , you specify
zookeeper.broker.path=path_name/brokers
Hi,
Ok, so I manage to connect the zookeeper (found the right path) and the batch is running BUT the batch is empty (the topic have a lot of events ) and for any reason, it wrote warnings
Any Idea why?
Connected to the target VM, address: '127.0.0.1:60819', transport: 'socket' Number of records in this batch 0 WARN : consumer.kafka.ReceiverStreamListener - stop consumer as pending queue 4 greater than configured limit 3 WARN : consumer.kafka.ReceiverStreamListener - stop consumer as pending queue 5 greater than configured limit 3 WARN : consumer.kafka.ReceiverStreamListener - stop consumer as pending queue 6 greater than configured limit 3 WARN : consumer.kafka.ReceiverStreamListener - stop consumer as pending queue 7 greater than configured limit 3 WARN : consumer.kafka.ReceiverStreamListener - stop consumer as pending queue 8 greater than configured limit 3 WARN : consumer.kafka.ReceiverStreamListener - stop consumer as pending queue 9 greater than configured limit 3 WARN : consumer.kafka.ReceiverStreamListener - stop consumer as pending queue 10 greater than configured limit 3
Ok, So this is happening due to Back Pressure. I believe your Streaming batch taking longer than Batch Duration and hence Consumer is Throttling . There is a settings consumer.queue.to.throttle, default value is 3. i.e. if there are 3 Batch queued up, Consumer will throttle. You can set this property to higher number, but actual issue is your batches are running slow.
You may need to bump up resources for the job . i.e. need to give more executors , executor cores and executor memory .
Hi @alonshoshani , is your problem resolved or still you have any open issues ?
For now resolved Thx a lot
On Thu, 13 Jun 2019, 05:54 Dibyendu Bhattacharya, notifications@github.com wrote:
Hi @alonshoshani https://github.com/alonshoshani , is your problem resolved or still you have any open issues ?
— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub https://github.com/dibbhatt/kafka-spark-consumer/issues/63?email_source=notifications&email_token=AAIQ4X6JJJ33WV4MRKYTPI3P2GZGTA5CNFSM4HWYVGOKYY3PNVWWK3TUL52HS4DFVREXG43VMVBW63LNMVXHJKTDN5WW2ZLOORPWSZGODXSLRSI#issuecomment-501528777, or mute the thread https://github.com/notifications/unsubscribe-auth/AAIQ4XYCHKTTH4FIASLVRSDP2GZGTANCNFSM4HWYVGOA .
Ok cool. Then I am closing this issue. If you face any other issues, please open a new ticket.
Hi, I'm trying using the consumer with