dibbhatt / kafka-spark-consumer

High Performance Kafka Connector for Spark Streaming.Supports Multi Topic Fetch, Kafka Security. Reliable offset management in Zookeeper. No Data-loss. No dependency on HDFS and WAL. In-built PID rate controller. Support Message Handler . Offset Lag checker.
Apache License 2.0
635 stars 318 forks source link

app client consumer doesn´t consume offsets from kafka topic #39

Closed jorgeakanieves closed 7 years ago

jorgeakanieves commented 7 years ago

I´m submitting java jobs to yarn by use of yarn-client mode in a single node.

The job has a simple logic:

I send the jar file from console to jobserver (works as a proxy with yarn) and jobserver send the job to yarn working as yarn-client.

All is in the same host but working as docker containers: Yarn is inside a docker (cloudera docker) Jobserver is inside other docker

But I´ve a weird behaviour with Kafka consumer because messages are sent, committed but not processed.

I tried with KafkaUtils createStream and also got the same issue but not with KafkaUtils createDirectStream which runs right.

I´d need to control the offsets read to know from which one start to read so I have to use kafka consumer...

I found the issue when tried to check spark jobserver context...Inside the job logic, If I stopped the current context and create a new one in spark local mode, it works fine, but it´s not the desired behaviour because two context are created and the second one doesn´t run in yarn...

I know it´s a specific case but perhaps You have found a similar issue and could help me with it because I saw the log trace and there´s no error consuming offsets, it shows that messages are committed but never read...

The current fw versions are: kafka spark consumer 1.0.8 spark streaming 1.6.0 kafka 0.10.0 (tried also 0.8.2.1) jobserver 0.6.1 jdk 1.8

Let me know if you need any other info.

Thanks

dibbhatt commented 7 years ago

Can you please share some more details like, how many Executors you are running , with how much capacity and how many receivers ? I have tested this consumer in Yarn mode and it works fine. Here are the Yarn Spark-Submit ( just a reference)

sudo /usr/share/spark/bin/spark-submit --queue default --num-executors 10 --executor-cores 3 --executor-memory 3g --class x.y.z --master yarn --deploy-mode cluster --driver-memory 2g --driver-cores 2 myjar.jar

With this, Yarn will 10 containers (executors) , each with 3 cores and 3g memory.

Also there are few Yarn related properties I changed to limit the maximum number of cores/memory any container can request (yarn.scheduler.maximum-allocation-mb and yarn.scheduler.maximum-allocation-vcores) .

Also changed settings related to Yarn CapacityScheduler , to use org.apache.hadoop.yarn.util.resource.DominantResourceCalculator

You can find some details of this DominantResourceCalculator here :

http://hortonworks.com/blog/managing-cpu-resources-in-your-hadoop-yarn-clusters/

dibbhatt commented 7 years ago

Any luck ?

jorgeakanieves commented 7 years ago

Not really :(

I´ve tried a lot of combinations but with no success. I think it´s something related with the jobserver context and client-server mode (yarn client) execution. It´s pretty hard to trace log and the only thing that I can see on execution when I store a new offset in input topic is:

2016-12-22 17:16:00 INFO BlockManagerInfo:58 - Added input-0-1482426864608 in memory on quickstart-cloudera:39525 (size: 400.0 B, free: 318.4 MB)

I also added a log trace from yarn execution showing how offset is commited but not consumed

And pushed all the project to git: https://github.com/jorgeakanieves/kafka-consumer

KafkaConsumerNoSjs runs ok but KafkaConsumerSjs does not read offsets...

I run velvia jobserver docker: https://hub.docker.com/r/velvia/spark-jobserver/ and cloudera yarn docker https://hub.docker.com/r/cloudera/quickstart/

Thanks dibbhatt.

dibbhatt commented 7 years ago

Hi, Sorry to ask you if you can tell the difference between KafkaConsumerNoSjs and KafkaConsumerSjs ?

jorgeakanieves commented 7 years ago

KafkaConsumerNoSjs is built to work as spark standalone but KafkaConsumerSjs is created to run job with spark jobserver (https://github.com/spark-jobserver/spark-jobserver) it works as a service with its api rest. In this case the context is sent in runJob method as You can see in the class. I can use it to send jobs from a web client without implement management logic and it's easy to use.

jorgeakanieves commented 7 years ago

I added first to demonstrate that works without jobserver but same source with jobserver doesn't

dibbhatt commented 7 years ago

I see. I have not tried this with Job-Server. But just looking at Job-Server , it seems to run a Streaming Job, you need to extend SparkStreamingJob ? You need to specify streaming.batch_interval in job-server's .conf file.

https://github.com/spark-jobserver/spark-jobserver/blob/master/job-server-extras/src/main/scala/spark/jobserver/SparkStreamingJob.scala

dibbhatt commented 7 years ago

some examples are here : https://github.com/spark-jobserver/spark-jobserver/blob/master/job-server-extras/src/main/scala/spark/jobserver/StreamingTestJob.scala

jorgeakanieves commented 7 years ago

Hi dibbhatt,

I just tried to run main class as StreamingJob (added KafkaConsumerSjsStreaming.java to repo) but with no success. This class implements SparkStreamingJob because for java dependency there´s not a class "SparkStreamingJob" as scala. In fact, in the 0.6.2 dependency, there´s only one trait SparkJob...

optisoft-consulting commented 7 years ago

I have a similar (maybe simpler) problem. I have a very simple driver / consumer that works in master = local[*] mode but does not consume messages properly under master = yarn_client. The job will consume some outstanding messages when it starts and then also, when signaled to shutdown consume more. In between it appears waiting. All is running inside the HDP 2.5 Sandbox.

For example, this works:

./bin/spark-submit \ --class com.xxx.spark.driver.usersegment.UserSegmentConsumerDriver \ --master local[*] \ --deploy-mode client \ --num-executors 8 \ --driver-memory 512m \ --executor-memory 2G \ --executor-cores 4 \ --verbose \ --driver-java-options -DSPARK_LOGS=${SPARK_LOGS} \ /opt/usersegment/com.xxx.spark-1.0-SNAPSHOT.jar

but this does not:

./bin/spark-submit --class com.xxx.spark.driver.usersegment.UserSegmentConsumerDriver \ --master yarn-client \ --num-executors 3 \ --driver-memory 512m \ --executor-memory 512m \ --executor-cores 1 \ --driver-java-options -DSPARK_LOGS=${SPARK_LOGS} \ --conf spark.executor.extraJavaOptions=-Dlog4j.configuration="file:log4j-worker.xml -DSPARK_LOGS=${SPARK_LOGS}" \ --files /opt/usersegment/log4j-worker.xml \ /opt/usersegment/com.xxx.spark-1.0-SNAPSHOT.jar

optisoft-consulting commented 7 years ago

I've also tried with yarn-cluster, seems like same issue.

./bin/spark-submit --class com.xxx.spark.driver.usersegment.UserSegmentConsumerDriver \ --master yarn-cluster \ --num-executors 3 \ --driver-memory 512m \ --executor-memory 512m \ --executor-cores 1 \ --driver-java-options -DSPARK_LOGS=${SPARK_LOGS} \ --conf spark.driver.extraJavaOptions=-Dlog4j.configuration="file:log4j.xml -DSPARK_LOGS=${SPARK_LOGS}" \ --conf spark.executor.extraJavaOptions=-Dlog4j.configuration="file:log4j-worker.xml -DSPARK_LOGS=${SPARK_LOGS}" \ --files /opt/usersegment/log4j-worker.xml,/opt/usersegment/log4j.xml,/opt/usersegment/spark.properties \ --driver-class-path /opt/usersegment/ \ /opt/usersegment/com.xxx.spark-1.0-SNAPSHOT.jar

dibbhatt commented 7 years ago

@optisoftjh Whats the issue you see with yarn-cluster mode ? Can you check log and share more details.

In yarn-cluster mode, the path for properties file needs to be a shared location ( hdfs) which all containers can access. I guess in your case , it is not able load the property file . You may need to modify your driver class to do that .

dibbhatt commented 7 years ago

@jorgeakanieves I guess then the version of job-server you are using (0.6.2) does not have the support for Spark Streaming . Can you use latest job-server from master and try ?

jorgeakanieves commented 7 years ago

But master version does not have any class for steaming jobs. I think it's the same as SparkJob class. For java the lastest dependency version is 0.6.2. I didn't see any more for newers. Perhaps it's something related to yarn client mode as @optisoftjh issue...

optisoft-consulting commented 7 years ago

Sorry, was away for a bit. No issue with loading property files, to the executors I send the properties in code via a Spark broadcast. All logging works too as provided (see above) from the command line. The consumer doesn't seem to consistently consume messages, i.e.: I don't see output in my logs. I will try and post some output in a bit.

optisoft-consulting commented 7 years ago

Assume the following code (I want to process each binary record on the topic). The unionStreams is created as per your Consumer example code.

    //Start Application Logic
    unionStreams.foreachRDD(new VoidFunction<JavaRDD<MessageAndMetadata>>() {
        @Override
        public void call(JavaRDD<MessageAndMetadata> rdd) throws Exception {
            log("--- New RDD with " + rdd.partitions().size()
                    + " partitions and " + rdd.count() + " records");
            rdd.foreach(new VoidFunction<MessageAndMetadata>() {
                @Override
                public void call(MessageAndMetadata messageAndMetadata) throws Exception {
                    // log(new String(messageAndMetadata.getPayload()));
                }
            });
        }
    });
    // End Application Logic

In master = local[*] mode this code 'ticks' every 5 seconds (spark duration) and prints. In yarn-cluster mode it only 'ticks' once, then does not consume. I am using the same HDP sandbox with Kafka, etc. either from my local desktop or deploying within the sandbox ('cluster' mode).

I wrote equivalent code (not shown here) using the plain Spark streaming KafkaUtils and it works in yarn-cluster.

dibbhatt commented 7 years ago

Can you remove the below log statement and try ?

log("--- New RDD with " + rdd.partitions().size() + " partitions and " + rdd.count() + " records");

optisoft-consulting commented 7 years ago

Hi - I can but then how will I know it is being called?

optisoft-consulting commented 7 years ago

I deployed your SampleConsumer to the sandbox using yarn-cluster. I only see the message -

'Number of records in this batch 0'

printed once. Sent 50000 messages to the topic and it does not consume.

Works fine in local[*] mode. Output looks like this:

Number of records in this batch 0 Number of records in this batch 7525 Number of records in this batch 37608 Number of records in this batch 37540 Number of records in this batch 37562 Number of records in this batch 37567 Number of records in this batch 37585 Number of records in this batch 37575 Number of records in this batch 37577 Number of records in this batch 37624 Number of records in this batch 37574 Number of records in this batch 37566 Number of records in this batch 37619 Number of records in this batch 37523 Number of records in this batch 37463 Number of records in this batch 4092 Number of records in this batch 0

optisoft-consulting commented 7 years ago

Additional comment - offset saving seems to work - the local consumer deployed cleaned up the messages that were pending.

dibbhatt commented 7 years ago

Hi, Can you please share the Executor Log of Yarn mode. If you give more exeuctors ( and hence more cores) will it work ?

jorgeakanieves commented 7 years ago

Hi @dibbhatt, did You finally try your library with Spark job server?

dibbhatt commented 7 years ago

Hi, sorry, no I have not tried with Spark Job Server. I will take a look over this weekend .

jorgeakanieves commented 7 years ago

@dibbhatt I tried to deploy jobserver out of docker container and I got the same issue... Offset is committed but not consumed :(

dibbhatt commented 7 years ago

I think the issue with the available cores with the Containers. How much Core is given when you launch Spark Job ? As each Receiver task needs dedicated core, you need to have enough cores for Processing.

jorgeakanieves commented 7 years ago

ummmmm really?

Here are the params for job execution set on jobserver: num-cpu-cores = 1 # Number of cores to allocate. Required. (default) memory-per-node = 1024m # Executor memory per node, -Xmx style eg 512m, #1G, etc.

spark.executor.instances = 1 spark.executor.memory = 1024m spark.driver.memory = 756m

-XX:MaxPermSize=512m

dibbhatt commented 7 years ago

Set CPU Cores as 3 and try . Set the Receiver count as 1, so it will use 1 Core and remaining 2 will be for RDD processing

dibbhatt commented 7 years ago

@jorgeakanieves Any luck with more Cores ?

jorgeakanieves commented 7 years ago

Hi @dibbhatt, It didn´t work, I changed num-cpu-cores = 3 on docker.conf for jobserver as You suggested but with no success...Still doesn´t consume offsets..Did You try this and it worked??

Anyway I see that it´s only 1 core at yarn execution: {{JAVA_HOME}}/bin/java -server -XX:OnOutOfMemoryError='kill %p' -Xms1024m -Xmx1024m '-Dlog4j.configuration=file:/app/log4j-server.properties' '-DLOG_DIR=/tmp/job-server/jobserver-a1d2a862-com.mypackage.KafkaConsumerSjs615586817627942750' -Djava.io.tmpdir={{PWD}}/tmp '-Dspark.driver.port=42425' '-Dspark.ui.port=0' '-Dspark.akka.threads=12' -Dspark.yarn.app.container.log.dir= org.apache.spark.executor.CoarseGrainedExecutorBackend --driver-url spark://CoarseGrainedScheduler@172.18.0.7:42425 --executor-id 1 --hostname stm-master.gft.com --cores 1 --app-id application_1486646486022_0005 --user-class-path file:$PWD/app.jar 1> /stdout 2> /stderr

dibbhatt commented 7 years ago

We are running with Yarn and it works fine. I never tried to Spark JobServer though. I think JobServer Docker container comes with single core, so num-cpu-cores = 3 does not have any affect.

This is kind of Spark-Submit for Yarn mode...

sudo /usr/share/spark/bin/spark-submit --queue default --num-executors 10 --executor-cores 3 --executor-memory 3g --class x.y.z.Main --master yarn --deploy-mode cluster --driver-memory 2g --driver-cores 2 X.jar -f Y.properties

Can you try installing JobServer in cluster mode .

jorgeakanieves commented 7 years ago

@dibbhatt as I often say that playing directly to yarn or in local mode works fine. If I make the same test through jobserver, it does not work. If We don´t test it in same enviroment We´ll never know... Jobserver doesn´t allow yarn cluster mode.

jorgeakanieves commented 7 years ago

@dibbhatt I could go ahead with this. There was a parameter spark.executor.instances set to 1 so dynamic allocation for jobs was disabled and this was a memory limit for jobs execution...Now, jobs are executed with 4096mb allocation memory and 4 vcores but...

...The behaviour is pretty weird: I run my job but when I produce some offsets fron kafka client I check the topic in kafka manager frontend and see that the job consumer has a lag of offsets. If I kill the job, the offsets lag is set to 0...So during the execution consumer doesn´t consume any and at the end or even at job starting, the lag is set to 0...

I use cdh 5.8, spark 1.6.0, kafka server 2.0, java client 0.10.0 and 3 hosts with zookeeper instances in all of them, 3 brokers, one per each host. Each topic is created only filling partition:1 and replication;3. The job is executed as yarn client in the master (jobserver) so the driver is in the master node and the job is sent to yarn that sends the job to an executor in one of the slaves...

Do You know which could be the reason of this issue?

dibbhatt commented 7 years ago

I did not fully get it . You mean when you stopped the job, the Kafka Manager shows the offset lag as Zero ? How do you measure the offset lag of the Consumer ? When you see in Spark UI, you won't any batch is having any records ?

jorgeakanieves commented 7 years ago

I follow these steps:

  1. Run the job
  2. Add two messages to topic
  3. The consumer show a lag of two messages and never consume them
  4. Stop the job
  5. The lag show 0 messages

I see the consumer and lag offsets from kafka manager: https://github.com/sheepkiller/kafka-manager-docker

dibbhatt commented 7 years ago

Are you using the latest version ? Would you mind sharing your driver code ( which calls ReceiverLauncher.launch)

jorgeakanieves commented 7 years ago

I think so.. The driver is: https://github.com/jorgeakanieves/kafka-consumer/blob/master/src/main/java/com/mypackage/KafkaConsumerSjs.java

dibbhatt commented 7 years ago

You are using Consumer version 1.0.8. It does not have the functionality of supporting Consumer offset Lag out of the box . Did you modify the Kafka Manager code to read the offsets for Lag check ?

Nevertheless, I see one issue with the Driver code.

Your ProcessedOffsetManager.persists(partitionOffset, props); should come after DStream is processed , i.e. after Line 128

Can you make that change and let me know if things works.

jorgeakanieves commented 7 years ago

Yes, for sure. Anyway it works without jobserver...¿? I´ll give You back feedback

dibbhatt commented 7 years ago

@jorgeakanieves any luck ?

jorgeakanieves commented 7 years ago

hi @dibbhatt after several tests I´m sure that the most of issues come from java 8, special impact into the driver (spark job server) due to garabage collector heaps... I guess I´ll resolve it in the next days...

dibbhatt commented 7 years ago

Nice. Thanks for finding root cause of this.

dibbhatt commented 7 years ago

Hi @jorgeakanieves , is there any update on this ? If this issue is resolved , can you please close it .

dibbhatt commented 7 years ago

thanks @jorgeakanieves