RedisLabs / spark-redis

A connector for Spark that allows reading and writing to/from Redis cluster
BSD 3-Clause "New" or "Revised" License
940 stars 372 forks source link

Read timeout for getHash function #98

Open beyhangl opened 6 years ago

beyhangl commented 6 years ago

Hello,

While i'm reading table from redis getting this below error.

Below code normally working well.

val readDF= spark.sparkContext.fromRedisKeyPattern(tableName,5).getHash().toDS()

Normally it's working for less than 2 million rows. But if i'm reading big table getting this error.

18/10/11 17:08:25 ERROR Executor: Exception in task 37.0 in stage 3.0 (TID 338) redis.clients.jedis.exceptions.JedisConnectionException: java.net.SocketTimeoutException: Read timed out at redis.clients.util.RedisInputStream.ensureFill(RedisInputStream.java:202) at redis.clients.util.RedisInputStream.readByte(RedisInputStream.java:40)

val redis = spark.sparkContext.fromRedisKeyPattern(tableName,100).getHash().toDS()

I also changed some settings on redis but i think it's not about that. Do you know how can i solve this problem ?

fe2s commented 6 years ago

Hello,

Could you please provide some details on your setup - how many Spark executors and Redis nodes you use.

beyhangl commented 6 years ago

Hello

Redis installed only 1 node and spark working like standalone.

Is it because of that ? 2 million rows comes back in seconds but 7 million rows gives me this error. It can be memory size of redis driver?

itamarhaber commented 6 years ago

xref: https://stackoverflow.com/questions/52762354/spark-streaming-redis-read-time-out-with-scala

fe2s commented 6 years ago
val redis =
spark.sparkContext.fromRedisKeyPattern(tableName,100).getHash().toDS()

I don't think that 100 partitions makes sense if you are running a single Spark executor. Reading keys doesn't scale, see this link for explanation https://github.com/RedisLabs/spark-redis/issues/86#issuecomment-422512175 Try to change to 1 partition: spark.sparkContext.fromRedisKeyPattern(tableName,1).getHash()

I inserted 9M hashes to local Redis with default settings. Redis memory: used_memory_human:1.21G Running Spark in a local mode with -Xmx8G. Reading hashes took 410 sec for me.

val rdd = spark.sparkContext.fromRedisKeyPattern("person:*", 1).getHash()
println(rdd.count())
fe2s commented 6 years ago

@beyhangl, could you please also include a full stacktrace of the exception?

beyhangl commented 6 years ago

@fe2s Hello, here is the full log

`/usr/lib/jvm/java-8-oracle/bin/java -javaagent:/home/kahin/Downloads/idea-IC-182.3684.101/lib/idea_rt.jar=40953:/home/kahin/Downloads/idea-IC-182.3684.101/bin -Dfile.encoding=UTF-8 -classpath /usr/lib/jvm/java-8-oracle/jre/lib/charsets.jar:/usr/lib/jvm/java-8-oracle/jre/lib/deploy.jar:/usr/lib/jvm/java-8-oracle/jre/lib/ext/cldrdata.jar:/usr/lib/jvm/java-8-oracle/jre/lib/ext/dnsns.jar:/usr/lib/jvm/java-8-oracle/jre/lib/ext/jaccess.jar:/usr/lib/jvm/java-8-oracle/jre/lib/ext/jfxrt.jar:/usr/lib/jvm/java-8-oracle/jre/lib/ext/localedata.jar:/usr/lib/jvm/java-8-oracle/jre/lib/ext/nashorn.jar:/usr/lib/jvm/java-8-oracle/jre/lib/ext/sunec.jar:/usr/lib/jvm/java-8-oracle/jre/lib/ext/sunjce_provider.jar:/usr/lib/jvm/java-8-oracle/jre/lib/ext/sunpkcs11.jar:/usr/lib/jvm/java-8-oracle/jre/lib/ext/zipfs.jar:/usr/lib/jvm/java-8-oracle/jre/lib/javaws.jar:/usr/lib/jvm/java-8-oracle/jre/lib/jce.jar:/usr/lib/jvm/java-8-oracle/jre/lib/jfr.jar:/usr/lib/jvm/java-8-oracle/jre/lib/jfxswt.jar:/usr/lib/jvm/java-8-oracle/jre/lib/jsse.jar:/usr/lib/jvm/java-8-oracle/jre/lib/management-agent.jar:/usr/lib/jvm/java-8-oracle/jre/lib/plugin.jar:/usr/lib/jvm/java-8-oracle/jre/lib/resources.jar:/usr/lib/jvm/java-8-oracle/jre/lib/rt.jar:/home/kahin/Downloads/SparkStKafka/target/scala-2.11/classes:/home/kahin/.ivy2/cache/org.scala-lang/scala-library/jars/scala-library-2.11.8.jar:/home/kahin/.ivy2/cache/redis.clients/jedis/jars/jedis-2.9.0.jar:/home/kahin/.ivy2/cache/org.apache.commons/commons-pool2/jars/commons-pool2-2.5.0.jar:/home/kahin/.ivy2/cache/org.slf4j/slf4j-log4j12/jars/slf4j-log4j12-1.7.16.jar:/home/kahin/.ivy2/cache/org.slf4j/slf4j-api/jars/slf4j-api-1.7.25.jar:/home/kahin/.ivy2/cache/org.lz4/lz4-java/jars/lz4-java-1.4.0.jar:/home/kahin/.ivy2/cache/org.codehaus.janino/janino/jars/janino-3.0.8.jar:/home/kahin/.ivy2/cache/org.codehaus.janino/commons-compiler/jars/commons-compiler-3.0.8.jar:/home/kahin/.ivy2/cache/org.bouncycastle/bcprov-jdk15on/jars/bcprov-jdk15on-1.52.jar:/home/kahin/.ivy2/cache/org.apache.spark/spark-unsafe_2.11/jars/spark-unsafe_2.11-2.3.1.jar:/home/kahin/.ivy2/cache/org.apache.spark/spark-tags_2.11/jars/spark-tags_2.11-2.3.1.jar:/home/kahin/.ivy2/cache/org.apache.spark/spark-streaming_2.11/jars/spark-streaming_2.11-2.3.1.jar:/home/kahin/.ivy2/cache/org.apache.spark/spark-sql_2.11/jars/spark-sql_2.11-2.3.1.jar:/home/kahin/.ivy2/cache/org.apache.spark/spark-sketch_2.11/jars/spark-sketch_2.11-2.3.1.jar:/home/kahin/.ivy2/cache/org.apache.spark/spark-network-shuffle_2.11/jars/spark-network-shuffle_2.11-2.3.1.jar:/home/kahin/.ivy2/cache/org.apache.spark/spark-network-common_2.11/jars/spark-network-common_2.11-2.3.1.jar:/home/kahin/.ivy2/cache/org.apache.spark/spark-launcher_2.11/jars/spark-launcher_2.11-2.3.1.jar:/home/kahin/.ivy2/cache/org.apache.spark/spark-kvstore_2.11/jars/spark-kvstore_2.11-2.3.1.jar:/home/kahin/.ivy2/cache/org.apache.spark/spark-core_2.11/jars/spark-core_2.11-2.3.1.jar:/home/kahin/.ivy2/cache/org.apache.spark/spark-catalyst_2.11/jars/spark-catalyst_2.11-2.3.1.jar:/home/kahin/.ivy2/cache/org.apache.parquet/parquet-jackson/jars/parquet-jackson-1.8.3.jar:/home/kahin/.ivy2/cache/org.apache.parquet/parquet-hadoop/jars/parquet-hadoop-1.8.3.jar:/home/kahin/.ivy2/cache/org.apache.parquet/parquet-encoding/jars/parquet-encoding-1.8.3.jar:/home/kahin/.ivy2/cache/org.apache.parquet/parquet-common/jars/parquet-common-1.8.3.jar:/home/kahin/.ivy2/cache/org.apache.parquet/parquet-column/jars/parquet-column-1.8.3.jar:/home/kahin/.ivy2/cache/org.apache.orc/orc-mapreduce/jars/orc-mapreduce-1.4.4-nohive.jar:/home/kahin/.ivy2/cache/org.apache.orc/orc-core/jars/orc-core-1.4.4-nohive.jar:/home/kahin/.ivy2/cache/org.apache.httpcomponents/httpcore/jars/httpcore-4.4.1.jar:/home/kahin/.ivy2/cache/org.apache.httpcomponents/httpclient/jars/httpclient-4.5.jar:/home/kahin/.ivy2/cache/org.apache.arrow/arrow-vector/jars/arrow-vector-0.8.0.jar:/home/kahin/.ivy2/cache/org.apache.arrow/arrow-memory/jars/arrow-memory-0.8.0.jar:/home/kahin/.ivy2/cache/org.apache.arrow/arrow-format/jars/arrow-format-0.8.0.jar:/home/kahin/.ivy2/cache/org.antlr/antlr4-runtime/jars/antlr4-runtime-4.7.jar:/home/kahin/.ivy2/cache/net.sf.py4j/py4j/jars/py4j-0.10.7.jar:/home/kahin/.ivy2/cache/net.java.dev.jets3t/jets3t/jars/jets3t-0.9.4.jar:/home/kahin/.ivy2/cache/joda-time/joda-time/jars/joda-time-2.9.9.jar:/home/kahin/.ivy2/cache/io.netty/netty-all/jars/netty-all-4.1.17.Final.jar:/home/kahin/.ivy2/cache/io.dropwizard.metrics/metrics-jvm/bundles/metrics-jvm-3.1.5.jar:/home/kahin/.ivy2/cache/io.dropwizard.metrics/metrics-json/bundles/metrics-json-3.1.5.jar:/home/kahin/.ivy2/cache/io.dropwizard.metrics/metrics-graphite/bundles/metrics-graphite-3.1.5.jar:/home/kahin/.ivy2/cache/io.dropwizard.metrics/metrics-core/bundles/metrics-core-3.1.5.jar:/home/kahin/.ivy2/cache/io.airlift/aircompressor/jars/aircompressor-0.8.jar:/home/kahin/.ivy2/cache/commons-codec/commons-codec/jars/commons-codec-1.11.jar:/home/kahin/.ivy2/cache/com.vlkan/flatbuffers/jars/flatbuffers-1.2.0-3f79e055.jar:/home/kahin/.ivy2/cache/com.univocity/univocity-parsers/jars/univocity-parsers-2.5.9.jar:/home/kahin/.ivy2/cache/com.typesafe.scala-logging/scala-logging_2.11/bundles/scala-logging_2.11-3.8.0.jar:/home/kahin/.ivy2/cache/com.twitter/chill_2.11/jars/chill_2.11-0.8.4.jar:/home/kahin/.ivy2/cache/com.twitter/chill-java/jars/chill-java-0.8.4.jar:/home/kahin/.ivy2/cache/com.thoughtworks.paranamer/paranamer/bundles/paranamer-2.8.jar:/home/kahin/.ivy2/cache/com.jamesmurty.utils/java-xmlbuilder/jars/java-xmlbuilder-1.1.jar:/home/kahin/.ivy2/cache/com.google.code.findbugs/jsr305/jars/jsr305-3.0.2.jar:/home/kahin/.ivy2/cache/com.github.luben/zstd-jni/bundles/zstd-jni-1.3.2-2.jar:/home/kahin/.ivy2/cache/com.fasterxml.jackson.module/jackson-module-scala_2.11/bundles/jackson-module-scala_2.11-2.6.7.1.jar:/home/kahin/.ivy2/cache/com.fasterxml.jackson.module/jackson-module-paranamer/bundles/jackson-module-paranamer-2.7.9.jar:/home/kahin/.ivy2/cache/com.fasterxml.jackson.core/jackson-databind/bundles/jackson-databind-2.6.7.1.jar:/home/kahin/.ivy2/cache/com.fasterxml.jackson.core/jackson-core/bundles/jackson-core-2.7.9.jar:/home/kahin/.ivy2/cache/com.fasterxml.jackson.core/jackson-annotations/bundles/jackson-annotations-2.6.7.jar:/home/kahin/.ivy2/cache/com.carrotsearch/hppc/bundles/hppc-0.7.2.jar:/home/kahin/.ivy2/cache/aopalliance/aopalliance/jars/aopalliance-1.0.jar:/home/kahin/.ivy2/cache/xmlenc/xmlenc/jars/xmlenc-0.52.jar:/home/kahin/.ivy2/cache/oro/oro/jars/oro-2.0.8.jar:/home/kahin/.ivy2/cache/org.xerial.snappy/snappy-java/bundles/snappy-java-1.1.2.6.jar:/home/kahin/.ivy2/cache/org.tukaani/xz/jars/xz-1.0.jar:/home/kahin/.ivy2/cache/org.spark-project.spark/unused/jars/unused-1.0.0.jar:/home/kahin/.ivy2/cache/org.sonatype.sisu.inject/cglib/jars/cglib-2.2.1-v20090111.jar:/home/kahin/.ivy2/cache/org.slf4j/jul-to-slf4j/jars/jul-to-slf4j-1.7.16.jar:/home/kahin/.ivy2/cache/org.slf4j/jcl-over-slf4j/jars/jcl-over-slf4j-1.7.16.jar:/home/kahin/.ivy2/cache/org.scala-lang.modules/scala-xml_2.11/bundles/scala-xml_2.11-1.0.4.jar:/home/kahin/.ivy2/cache/org.scala-lang.modules/scala-parser-combinators_2.11/bundles/scala-parser-combinators_2.11-1.0.4.jar:/home/kahin/.ivy2/cache/org.scala-lang/scalap/jars/scalap-2.11.8.jar:/home/kahin/.ivy2/cache/org.scala-lang/scala-reflect/jars/scala-reflect-2.11.8.jar:/home/kahin/.ivy2/cache/org.scala-lang/scala-compiler/jars/scala-compiler-2.11.8.jar:/home/kahin/.ivy2/cache/org.roaringbitmap/RoaringBitmap/bundles/RoaringBitmap-0.5.11.jar:/home/kahin/.ivy2/cache/org.objenesis/objenesis/jars/objenesis-2.1.jar:/home/kahin/.ivy2/cache/org.mortbay.jetty/jetty-util/jars/jetty-util-6.1.26.jar:/home/kahin/.ivy2/cache/org.json4s/json4s-jackson_2.11/jars/json4s-jackson_2.11-3.2.11.jar:/home/kahin/.ivy2/cache/org.json4s/json4s-core_2.11/jars/json4s-core_2.11-3.2.11.jar:/home/kahin/.ivy2/cache/org.json4s/json4s-ast_2.11/jars/json4s-ast_2.11-3.2.11.jar:/home/kahin/.ivy2/cache/org.javassist/javassist/bundles/javassist-3.18.1-GA.jar:/home/kahin/.ivy2/cache/org.glassfish.jersey.media/jersey-media-jaxb/jars/jersey-media-jaxb-2.22.2.jar:/home/kahin/.ivy2/cache/org.glassfish.jersey.core/jersey-server/jars/jersey-server-2.22.2.jar:/home/kahin/.ivy2/cache/org.glassfish.jersey.core/jersey-common/jars/jersey-common-2.22.2.jar:/home/kahin/.ivy2/cache/org.glassfish.jersey.core/jersey-client/jars/jersey-client-2.22.2.jar:/home/kahin/.ivy2/cache/org.glassfish.jersey.containers/jersey-container-servlet-core/jars/jersey-container-servlet-core-2.22.2.jar:/home/kahin/.ivy2/cache/org.glassfish.jersey.containers/jersey-container-servlet/jars/jersey-container-servlet-2.22.2.jar:/home/kahin/.ivy2/cache/org.glassfish.jersey.bundles.repackaged/jersey-guava/bundles/jersey-guava-2.22.2.jar:/home/kahin/.ivy2/cache/org.glassfish.hk2.external/javax.inject/jars/javax.inject-2.4.0-b34.jar:/home/kahin/.ivy2/cache/org.glassfish.hk2.external/aopalliance-repackaged/jars/aopalliance-repackaged-2.4.0-b34.jar:/home/kahin/.ivy2/cache/org.glassfish.hk2/osgi-resource-locator/jars/osgi-resource-locator-1.0.1.jar:/home/kahin/.ivy2/cache/org.glassfish.hk2/hk2-utils/jars/hk2-utils-2.4.0-b34.jar:/home/kahin/.ivy2/cache/org.glassfish.hk2/hk2-locator/jars/hk2-locator-2.4.0-b34.jar:/home/kahin/.ivy2/cache/org.glassfish.hk2/hk2-api/jars/hk2-api-2.4.0-b34.jar:/home/kahin/.ivy2/cache/org.fusesource.leveldbjni/leveldbjni-all/bundles/leveldbjni-all-1.8.jar:/home/kahin/.ivy2/cache/org.codehaus.jackson/jackson-mapper-asl/jars/jackson-mapper-asl-1.9.13.jar:/home/kahin/.ivy2/cache/org.codehaus.jackson/jackson-core-asl/jars/jackson-core-asl-1.9.13.jar:/home/kahin/.ivy2/cache/org.apache.xbean/xbean-asm5-shaded/bundles/xbean-asm5-shaded-4.4.jar:/home/kahin/.ivy2/cache/org.apache.ivy/ivy/jars/ivy-2.4.0.jar:/home/kahin/.ivy2/cache/org.apache.commons/commons-math3/jars/commons-math3-3.4.1.jar:/home/kahin/.ivy2/cache/org.apache.commons/commons-compress/jars/commons-compress-1.4.1.jar:/home/kahin/.ivy2/cache/org.apache.avro/avro-mapred/jars/avro-mapred-1.7.7-hadoop2.jar:/home/kahin/.ivy2/cache/org.apache.avro/avro-ipc/jars/avro-ipc-1.7.7-tests.jar:/home/kahin/.ivy2/cache/org.apache.avro/avro-ipc/jars/avro-ipc-1.7.7.jar:/home/kahin/.ivy2/cache/org.apache.avro/avro/jars/avro-1.7.7.jar:/home/kahin/.ivy2/cache/net.razorvine/pyrolite/jars/pyrolite-4.13.jar:/home/kahin/.ivy2/cache/log4j/log4j/bundles/log4j-1.2.17.jar:/home/kahin/.ivy2/cache/javax.ws.rs/javax.ws.rs-api/jars/javax.ws.rs-api-2.0.1.jar:/home/kahin/.ivy2/cache/javax.validation/validation-api/jars/validation-api-1.1.0.Final.jar:/home/kahin/.ivy2/cache/javax.servlet/javax.servlet-api/jars/javax.servlet-api-3.1.0.jar:/home/kahin/.ivy2/cache/javax.inject/javax.inject/jars/javax.inject-1.jar:/home/kahin/.ivy2/cache/javax.annotation/javax.annotation-api/jars/javax.annotation-api-1.2.jar:/home/kahin/.ivy2/cache/commons-net/commons-net/jars/commons-net-2.2.jar:/home/kahin/.ivy2/cache/commons-httpclient/commons-httpclient/jars/commons-httpclient-3.1.jar:/home/kahin/.ivy2/cache/commons-digester/commons-digester/jars/commons-digester-1.8.jar:/home/kahin/.ivy2/cache/commons-configuration/commons-configuration/jars/commons-configuration-1.6.jar:/home/kahin/.ivy2/cache/commons-cli/commons-cli/jars/commons-cli-1.2.jar:/home/kahin/.ivy2/cache/commons-beanutils/commons-beanutils-core/jars/commons-beanutils-core-1.8.0.jar:/home/kahin/.ivy2/cache/commons-beanutils/commons-beanutils/jars/commons-beanutils-1.7.0.jar:/home/kahin/.ivy2/cache/com.ning/compress-lzf/bundles/compress-lzf-1.0.3.jar:/home/kahin/.ivy2/cache/com.google.protobuf/protobuf-java/bundles/protobuf-java-2.5.0.jar:/home/kahin/.ivy2/cache/com.google.inject/guice/jars/guice-3.0.jar:/home/kahin/.ivy2/cache/com.esotericsoftware/minlog/bundles/minlog-1.3.0.jar:/home/kahin/.ivy2/cache/com.esotericsoftware/kryo-shaded/bundles/kryo-shaded-3.0.3.jar:/home/kahin/.ivy2/cache/com.clearspring.analytics/stream/jars/stream-2.7.0.jar:/home/kahin/.ivy2/cache/com.google.code.gson/gson/jars/gson-2.2.4.jar:/home/kahin/.ivy2/cache/com.google.guava/guava/jars/guava-11.0.2.jar:/home/kahin/.ivy2/cache/commons-collections/commons-collections/jars/commons-collections-3.2.2.jar:/home/kahin/.ivy2/cache/commons-io/commons-io/jars/commons-io-2.4.jar:/home/kahin/.ivy2/cache/commons-lang/commons-lang/jars/commons-lang-2.6.jar:/home/kahin/.ivy2/cache/io.netty/netty/bundles/netty-3.9.9.Final.jar:/home/kahin/.ivy2/cache/javax.activation/activation/jars/activation-1.1.1.jar:/home/kahin/.ivy2/cache/javax.xml.bind/jaxb-api/jars/jaxb-api-2.2.2.jar:/home/kahin/.ivy2/cache/javax.xml.stream/stax-api/jars/stax-api-1.0-2.jar:/home/kahin/.ivy2/cache/jline/jline/jars/jline-0.9.94.jar:/home/kahin/.ivy2/cache/net.iharder/base64/jars/base64-2.3.8.jar:/home/kahin/.ivy2/cache/org.apache.commons/commons-crypto/jars/commons-crypto-1.0.0.jar:/home/kahin/.ivy2/cache/org.apache.commons/commons-lang3/jars/commons-lang3-3.5.jar:/home/kahin/.ivy2/cache/org.apache.curator/curator-client/bundles/curator-client-2.6.0.jar:/home/kahin/.ivy2/cache/org.apache.curator/curator-framework/bundles/curator-framework-2.6.0.jar:/home/kahin/.ivy2/cache/org.apache.curator/curator-recipes/bundles/curator-recipes-2.6.0.jar:/home/kahin/.ivy2/cache/org.apache.directory.api/api-asn1-api/bundles/api-asn1-api-1.0.0-M20.jar:/home/kahin/.ivy2/cache/org.apache.directory.api/api-util/bundles/api-util-1.0.0-M20.jar:/home/kahin/.ivy2/cache/org.apache.directory.server/apacheds-i18n/bundles/apacheds-i18n-2.0.0-M15.jar:/home/kahin/.ivy2/cache/org.apache.directory.server/apacheds-kerberos-codec/bundles/apacheds-kerberos-codec-2.0.0-M15.jar:/home/kahin/.ivy2/cache/org.apache.hadoop/hadoop-annotations/jars/hadoop-annotations-2.6.5.jar:/home/kahin/.ivy2/cache/org.apache.hadoop/hadoop-auth/jars/hadoop-auth-2.6.5.jar:/home/kahin/.ivy2/cache/org.apache.hadoop/hadoop-client/jars/hadoop-client-2.6.5.jar:/home/kahin/.ivy2/cache/org.apache.hadoop/hadoop-common/jars/hadoop-common-2.6.5.jar:/home/kahin/.ivy2/cache/org.apache.hadoop/hadoop-hdfs/jars/hadoop-hdfs-2.6.5.jar:/home/kahin/.ivy2/cache/org.apache.hadoop/hadoop-mapreduce-client-app/jars/hadoop-mapreduce-client-app-2.6.5.jar:/home/kahin/.ivy2/cache/org.apache.hadoop/hadoop-mapreduce-client-common/jars/hadoop-mapreduce-client-common-2.6.5.jar:/home/kahin/.ivy2/cache/org.apache.hadoop/hadoop-mapreduce-client-core/jars/hadoop-mapreduce-client-core-2.6.5.jar:/home/kahin/.ivy2/cache/org.apache.hadoop/hadoop-mapreduce-client-jobclient/jars/hadoop-mapreduce-client-jobclient-2.6.5.jar:/home/kahin/.ivy2/cache/org.apache.hadoop/hadoop-mapreduce-client-shuffle/jars/hadoop-mapreduce-client-shuffle-2.6.5.jar:/home/kahin/.ivy2/cache/org.apache.hadoop/hadoop-yarn-api/jars/hadoop-yarn-api-2.6.5.jar:/home/kahin/.ivy2/cache/org.apache.hadoop/hadoop-yarn-client/jars/hadoop-yarn-client-2.6.5.jar:/home/kahin/.ivy2/cache/org.apache.hadoop/hadoop-yarn-common/jars/hadoop-yarn-common-2.6.5.jar:/home/kahin/.ivy2/cache/org.apache.hadoop/hadoop-yarn-server-common/jars/hadoop-yarn-server-common-2.6.5.jar:/home/kahin/.ivy2/cache/org.apache.zookeeper/zookeeper/jars/zookeeper-3.4.6.jar:/home/kahin/.ivy2/cache/org.codehaus.jackson/jackson-jaxrs/jars/jackson-jaxrs-1.9.13.jar:/home/kahin/.ivy2/cache/org.codehaus.jackson/jackson-xc/jars/jackson-xc-1.9.13.jar:/home/kahin/.ivy2/cache/org.codehaus.jettison/jettison/bundles/jettison-1.1.jar:/home/kahin/.ivy2/cache/org.htrace/htrace-core/jars/htrace-core-3.0.4.jar:/home/kahin/.ivy2/cache/xerces/xercesImpl/jars/xercesImpl-2.9.1.jar:/home/kahin/.ivy2/cache/xml-apis/xml-apis/jars/xml-apis-1.3.04.jar:/home/kahin/.ivy2/cache/org.apache.parquet/parquet-format/jars/parquet-format-2.3.1.jar:/home/kahin/.ivy2/cache/RedisLabs/spark-redis/jars/spark-redis-0.3.2.jar:/home/kahin/.ivy2/cache/net.jpountz.lz4/lz4/jars/lz4-1.3.0.jar:/home/kahin/.ivy2/cache/org.apache.kafka/kafka-clients/jars/kafka-clients-0.10.0.1.jar:/home/kahin/.ivy2/cache/org.apache.spark/spark-sql-kafka-0-10_2.11/jars/spark-sql-kafka-0-10_2.11-2.3.0.jar:/home/kahin/Downloads/SparkStKafka/ojdbc6.jar MainStreaming Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties 18/10/15 17:29:27 WARN Utils: Your hostname, kahin resolves to a loopback address: 127.0.1.1; using 192.168.34.216 instead (on interface enp33s0) 18/10/15 17:29:27 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address 18/10/15 17:29:27 INFO SparkContext: Running Spark version 2.3.1 18/10/15 17:29:28 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 18/10/15 17:29:28 INFO SparkContext: Submitted application: MainStreaming 18/10/15 17:29:28 INFO SecurityManager: Changing view acls to: kahin 18/10/15 17:29:28 INFO SecurityManager: Changing modify acls to: kahin 18/10/15 17:29:28 INFO SecurityManager: Changing view acls groups to: 18/10/15 17:29:28 INFO SecurityManager: Changing modify acls groups to: 18/10/15 17:29:28 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(kahin); groups with view permissions: Set(); users with modify permissions: Set(kahin); groups with modify permissions: Set() 18/10/15 17:29:28 INFO Utils: Successfully started service 'sparkDriver' on port 37343. 18/10/15 17:29:28 INFO SparkEnv: Registering MapOutputTracker 18/10/15 17:29:28 INFO SparkEnv: Registering BlockManagerMaster 18/10/15 17:29:28 INFO BlockManagerMasterEndpoint: Using org.apache.spark.storage.DefaultTopologyMapper for getting topology information 18/10/15 17:29:28 INFO BlockManagerMasterEndpoint: BlockManagerMasterEndpoint up 18/10/15 17:29:28 INFO DiskBlockManager: Created local directory at /tmp/blockmgr-ff18929a-4ff2-40db-894c-86f937840dca 18/10/15 17:29:28 INFO MemoryStore: MemoryStore started with capacity 1962.0 MB 18/10/15 17:29:28 INFO SparkEnv: Registering OutputCommitCoordinator 18/10/15 17:29:28 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041. 18/10/15 17:29:28 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042. 18/10/15 17:29:28 WARN Utils: Service 'SparkUI' could not bind on port 4042. Attempting port 4043. 18/10/15 17:29:28 INFO Utils: Successfully started service 'SparkUI' on port 4043. 18/10/15 17:29:28 INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at http://192.168.34.216:4043 18/10/15 17:29:29 INFO Executor: Starting executor ID driver on host localhost 18/10/15 17:29:29 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 40639. 18/10/15 17:29:29 INFO NettyBlockTransferService: Server created on 192.168.34.216:40639 18/10/15 17:29:29 INFO BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy 18/10/15 17:29:29 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(driver, 192.168.34.216, 40639, None) 18/10/15 17:29:29 INFO BlockManagerMasterEndpoint: Registering block manager 192.168.34.216:40639 with 1962.0 MB RAM, BlockManagerId(driver, 192.168.34.216, 40639, None) 18/10/15 17:29:29 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, 192.168.34.216, 40639, None) 18/10/15 17:29:29 INFO BlockManager: Initialized BlockManager: BlockManagerId(driver, 192.168.34.216, 40639, None) 18/10/15 17:29:29 INFO SharedState: Setting hive.metastore.warehouse.dir ('null') to the value of spark.sql.warehouse.dir ('file:/home/kahin/Downloads/SparkStKafka/spark-warehouse/'). 18/10/15 17:29:29 INFO SharedState: Warehouse path is 'file:/home/kahin/Downloads/SparkStKafka/spark-warehouse/'. 18/10/15 17:29:29 INFO StateStoreCoordinatorRef: Registered StateStoreCoordinator endpoint 18/10/15 17:29:30 INFO ConsumerConfig: ConsumerConfig values: metric.reporters = [] metadata.max.age.ms = 300000 partition.assignment.strategy = [org.apache.kafka.clients.consumer.RangeAssignor] reconnect.backoff.ms = 50 sasl.kerberos.ticket.renew.window.factor = 0.8 max.partition.fetch.bytes = 1048576 bootstrap.servers = [192.168.34.216:9092] ssl.keystore.type = JKS enable.auto.commit = false sasl.mechanism = GSSAPI interceptor.classes = null exclude.internal.topics = true ssl.truststore.password = null client.id = ssl.endpoint.identification.algorithm = null max.poll.records = 1 check.crcs = true request.timeout.ms = 40000 heartbeat.interval.ms = 3000 auto.commit.interval.ms = 5000 receive.buffer.bytes = 65536 ssl.truststore.type = JKS ssl.truststore.location = null ssl.keystore.password = null fetch.min.bytes = 1 send.buffer.bytes = 131072 value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer group.id = spark-kafka-source-1f8c3bd1-53fd-4eca-bcde-6179f50082cb-111795803-driver-0 retry.backoff.ms = 100 sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 ssl.trustmanager.algorithm = PKIX ssl.key.password = null fetch.max.wait.ms = 500 sasl.kerberos.min.time.before.relogin = 60000 connections.max.idle.ms = 540000 session.timeout.ms = 30000 metrics.num.samples = 2 key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer ssl.protocol = TLS ssl.provider = null ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] ssl.keystore.location = null ssl.cipher.suites = null security.protocol = PLAINTEXT ssl.keymanager.algorithm = SunX509 metrics.sample.window.ms = 30000 auto.offset.reset = earliest

18/10/15 17:29:30 INFO ConsumerConfig: ConsumerConfig values: metric.reporters = [] metadata.max.age.ms = 300000 partition.assignment.strategy = [org.apache.kafka.clients.consumer.RangeAssignor] reconnect.backoff.ms = 50 sasl.kerberos.ticket.renew.window.factor = 0.8 max.partition.fetch.bytes = 1048576 bootstrap.servers = [192.168.34.216:9092] ssl.keystore.type = JKS enable.auto.commit = false sasl.mechanism = GSSAPI interceptor.classes = null exclude.internal.topics = true ssl.truststore.password = null client.id = consumer-1 ssl.endpoint.identification.algorithm = null max.poll.records = 1 check.crcs = true request.timeout.ms = 40000 heartbeat.interval.ms = 3000 auto.commit.interval.ms = 5000 receive.buffer.bytes = 65536 ssl.truststore.type = JKS ssl.truststore.location = null ssl.keystore.password = null fetch.min.bytes = 1 send.buffer.bytes = 131072 value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer group.id = spark-kafka-source-1f8c3bd1-53fd-4eca-bcde-6179f50082cb-111795803-driver-0 retry.backoff.ms = 100 sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 ssl.trustmanager.algorithm = PKIX ssl.key.password = null fetch.max.wait.ms = 500 sasl.kerberos.min.time.before.relogin = 60000 connections.max.idle.ms = 540000 session.timeout.ms = 30000 metrics.num.samples = 2 key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer ssl.protocol = TLS ssl.provider = null ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] ssl.keystore.location = null ssl.cipher.suites = null security.protocol = PLAINTEXT ssl.keymanager.algorithm = SunX509 metrics.sample.window.ms = 30000 auto.offset.reset = earliest

18/10/15 17:29:30 INFO AppInfoParser: Kafka version : 0.10.0.1 18/10/15 17:29:30 INFO AppInfoParser: Kafka commitId : a7a17cdec9eaa6c5 Replace view ==campaignNoneWLView 18/10/15 17:29:32 INFO SparkContext: Starting job: count at MainStreaming.scala:43 18/10/15 17:29:32 INFO DAGScheduler: Got job 0 (count at MainStreaming.scala:43) with 1 output partitions 18/10/15 17:29:32 INFO DAGScheduler: Final stage: ResultStage 0 (count at MainStreaming.scala:43) 18/10/15 17:29:32 INFO DAGScheduler: Parents of final stage: List() 18/10/15 17:29:32 INFO DAGScheduler: Missing parents: List() 18/10/15 17:29:32 INFO DAGScheduler: Submitting ResultStage 0 (RedisKVRDD[1] at RDD at RedisRDD.scala:19), which has no missing parents 18/10/15 17:29:32 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 2.1 KB, free 1962.0 MB) 18/10/15 17:29:32 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 1438.0 B, free 1962.0 MB) 18/10/15 17:29:32 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on 192.168.34.216:40639 (size: 1438.0 B, free: 1962.0 MB) 18/10/15 17:29:32 INFO SparkContext: Created broadcast 0 from broadcast at DAGScheduler.scala:1039 18/10/15 17:29:32 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 0 (RedisKVRDD[1] at RDD at RedisRDD.scala:19) (first 15 tasks are for partitions Vector(0)) 18/10/15 17:29:32 INFO TaskSchedulerImpl: Adding task set 0.0 with 1 tasks 18/10/15 17:29:32 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, localhost, executor driver, partition 0, ANY, 8460 bytes) 18/10/15 17:29:32 INFO Executor: Running task 0.0 in stage 0.0 (TID 0) 18/10/15 17:29:32 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 832 bytes result sent to driver 18/10/15 17:29:32 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 257 ms on localhost (executor driver) (1/1) 18/10/15 17:29:32 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool 18/10/15 17:29:32 INFO DAGScheduler: ResultStage 0 (count at MainStreaming.scala:43) finished in 0.397 s 18/10/15 17:29:32 INFO DAGScheduler: Job 0 finished: count at MainStreaming.scala:43, took 0.446641 s 018/10/15 17:29:33 INFO CodeGenerator: Code generated in 255.077498 ms 18/10/15 17:29:33 INFO MapPartitionsRDD: Removing RDD 6 from persistence list 18/10/15 17:29:33 INFO BlockManager: Removing RDD 6 Replace view ==campaignWLView 18/10/15 17:29:33 INFO SparkContext: Starting job: count at MainStreaming.scala:43 18/10/15 17:29:33 INFO DAGScheduler: Got job 1 (count at MainStreaming.scala:43) with 1 output partitions 18/10/15 17:29:33 INFO DAGScheduler: Final stage: ResultStage 1 (count at MainStreaming.scala:43) 18/10/15 17:29:33 INFO DAGScheduler: Parents of final stage: List() 18/10/15 17:29:33 INFO DAGScheduler: Missing parents: List() 18/10/15 17:29:33 INFO DAGScheduler: Submitting ResultStage 1 (RedisKVRDD[11] at RDD at RedisRDD.scala:19), which has no missing parents 18/10/15 17:29:33 INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 2.1 KB, free 1962.0 MB) 18/10/15 17:29:33 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 1436.0 B, free 1962.0 MB) 18/10/15 17:29:33 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on 192.168.34.216:40639 (size: 1436.0 B, free: 1962.0 MB) 18/10/15 17:29:33 INFO SparkContext: Created broadcast 1 from broadcast at DAGScheduler.scala:1039 18/10/15 17:29:33 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 1 (RedisKVRDD[11] at RDD at RedisRDD.scala:19) (first 15 tasks are for partitions Vector(0)) 18/10/15 17:29:33 INFO TaskSchedulerImpl: Adding task set 1.0 with 1 tasks 18/10/15 17:29:33 INFO TaskSetManager: Starting task 0.0 in stage 1.0 (TID 1, localhost, executor driver, partition 0, ANY, 8460 bytes) 18/10/15 17:29:33 INFO Executor: Running task 0.0 in stage 1.0 (TID 1) 18/10/15 17:29:33 INFO Executor: Finished task 0.0 in stage 1.0 (TID 1). 746 bytes result sent to driver 18/10/15 17:29:33 INFO TaskSetManager: Finished task 0.0 in stage 1.0 (TID 1) in 9 ms on localhost (executor driver) (1/1) 18/10/15 17:29:33 INFO TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool 18/10/15 17:29:33 INFO DAGScheduler: ResultStage 1 (count at MainStreaming.scala:43) finished in 0.019 s 18/10/15 17:29:33 INFO DAGScheduler: Job 1 finished: count at MainStreaming.scala:43, took 0.023213 s 018/10/15 17:29:34 INFO ContextCleaner: Cleaned accumulator 38 18/10/15 17:29:34 INFO ContextCleaner: Cleaned accumulator 43 18/10/15 17:29:34 INFO ContextCleaner: Cleaned accumulator 40 18/10/15 17:29:34 INFO ContextCleaner: Cleaned accumulator 41 18/10/15 17:29:34 INFO ContextCleaner: Cleaned accumulator 42 18/10/15 17:29:34 INFO BlockManager: Removing RDD 6 18/10/15 17:29:34 INFO ContextCleaner: Cleaned RDD 6 18/10/15 17:29:34 INFO ContextCleaner: Cleaned accumulator 46 18/10/15 17:29:34 INFO ContextCleaner: Cleaned accumulator 54 18/10/15 17:29:34 INFO ContextCleaner: Cleaned accumulator 52 18/10/15 17:29:34 INFO ContextCleaner: Cleaned accumulator 33 18/10/15 17:29:34 INFO ContextCleaner: Cleaned accumulator 49 18/10/15 17:29:34 INFO BlockManagerInfo: Removed broadcast_1_piece0 on 192.168.34.216:40639 in memory (size: 1436.0 B, free: 1962.0 MB) 18/10/15 17:29:34 INFO ContextCleaner: Cleaned accumulator 48 18/10/15 17:29:34 INFO ContextCleaner: Cleaned accumulator 31 18/10/15 17:29:34 INFO ContextCleaner: Cleaned accumulator 37 18/10/15 17:29:34 INFO ContextCleaner: Cleaned accumulator 34 18/10/15 17:29:34 INFO ContextCleaner: Cleaned accumulator 36 18/10/15 17:29:34 INFO ContextCleaner: Cleaned accumulator 25 18/10/15 17:29:34 INFO ContextCleaner: Cleaned accumulator 53 18/10/15 17:29:34 INFO ContextCleaner: Cleaned accumulator 47 18/10/15 17:29:34 INFO ContextCleaner: Cleaned accumulator 32 18/10/15 17:29:34 INFO ContextCleaner: Cleaned accumulator 55 18/10/15 17:29:34 INFO ContextCleaner: Cleaned accumulator 26 18/10/15 17:29:34 INFO ContextCleaner: Cleaned accumulator 27 18/10/15 17:29:34 INFO ContextCleaner: Cleaned accumulator 51 18/10/15 17:29:34 INFO ContextCleaner: Cleaned accumulator 35 18/10/15 17:29:34 INFO ContextCleaner: Cleaned accumulator 45 18/10/15 17:29:34 INFO ContextCleaner: Cleaned accumulator 39 18/10/15 17:29:34 INFO ContextCleaner: Cleaned accumulator 44 18/10/15 17:29:34 INFO ContextCleaner: Cleaned accumulator 50 18/10/15 17:29:34 INFO MapPartitionsRDD: Removing RDD 16 from persistence list 18/10/15 17:29:34 INFO BlockManager: Removing RDD 16 Replace view ==customerView 18/10/15 17:29:34 INFO SparkContext: Starting job: count at MainStreaming.scala:43 18/10/15 17:29:34 INFO DAGScheduler: Got job 2 (count at MainStreaming.scala:43) with 1 output partitions 18/10/15 17:29:34 INFO DAGScheduler: Final stage: ResultStage 2 (count at MainStreaming.scala:43) 18/10/15 17:29:34 INFO DAGScheduler: Parents of final stage: List() 18/10/15 17:29:34 INFO DAGScheduler: Missing parents: List() 18/10/15 17:29:34 INFO DAGScheduler: Submitting ResultStage 2 (RedisKVRDD[21] at RDD at RedisRDD.scala:19), which has no missing parents 18/10/15 17:29:34 INFO MemoryStore: Block broadcast_2 stored as values in memory (estimated size 2.1 KB, free 1962.0 MB) 18/10/15 17:29:34 INFO MemoryStore: Block broadcast_2_piece0 stored as bytes in memory (estimated size 1441.0 B, free 1962.0 MB) 18/10/15 17:29:34 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on 192.168.34.216:40639 (size: 1441.0 B, free: 1962.0 MB) 18/10/15 17:29:34 INFO SparkContext: Created broadcast 2 from broadcast at DAGScheduler.scala:1039 18/10/15 17:29:34 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 2 (RedisKVRDD[21] at RDD at RedisRDD.scala:19) (first 15 tasks are for partitions Vector(0)) 18/10/15 17:29:34 INFO TaskSchedulerImpl: Adding task set 2.0 with 1 tasks 18/10/15 17:29:34 INFO TaskSetManager: Starting task 0.0 in stage 2.0 (TID 2, localhost, executor driver, partition 0, ANY, 8460 bytes) 18/10/15 17:29:34 INFO Executor: Running task 0.0 in stage 2.0 (TID 2) 18/10/15 17:29:36 ERROR Executor: Exception in task 0.0 in stage 2.0 (TID 2) redis.clients.jedis.exceptions.JedisConnectionException: java.net.SocketTimeoutException: Read timed out at redis.clients.util.RedisInputStream.ensureFill(RedisInputStream.java:202) at redis.clients.util.RedisInputStream.readByte(RedisInputStream.java:40) at redis.clients.jedis.Protocol.process(Protocol.java:151) at redis.clients.jedis.Protocol.read(Protocol.java:215) at redis.clients.jedis.Connection.readProtocolWithCheckingBroken(Connection.java:340) at redis.clients.jedis.Connection.getBinaryMultiBulkReply(Connection.java:276) at redis.clients.jedis.Jedis.hgetAll(Jedis.java:848) at com.redislabs.provider.redis.rdd.RedisKVRDD$$anonfun$getHASH$1$$anonfun$1.apply(RedisRDD.scala:60) at com.redislabs.provider.redis.rdd.RedisKVRDD$$anonfun$getHASH$1$$anonfun$1.apply(RedisRDD.scala:60) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186) at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241) at scala.collection.mutable.ArrayOps$ofRef.flatMap(ArrayOps.scala:186) at com.redislabs.provider.redis.rdd.RedisKVRDD$$anonfun$getHASH$1.apply(RedisRDD.scala:60) at com.redislabs.provider.redis.rdd.RedisKVRDD$$anonfun$getHASH$1.apply(RedisRDD.scala:56) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186) at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241) at scala.collection.mutable.ArrayOps$ofRef.flatMap(ArrayOps.scala:186) at com.redislabs.provider.redis.rdd.RedisKVRDD.getHASH(RedisRDD.scala:55) at com.redislabs.provider.redis.rdd.RedisKVRDD.compute(RedisRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:109) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: java.net.SocketTimeoutException: Read timed out at java.net.SocketInputStream.socketRead0(Native Method) at java.net.SocketInputStream.socketRead(SocketInputStream.java:116) at java.net.SocketInputStream.read(SocketInputStream.java:171) at java.net.SocketInputStream.read(SocketInputStream.java:141) at java.net.SocketInputStream.read(SocketInputStream.java:127) at redis.clients.util.RedisInputStream.ensureFill(RedisInputStream.java:196) ... 32 more 18/10/15 17:29:36 WARN TaskSetManager: Lost task 0.0 in stage 2.0 (TID 2, localhost, executor driver): redis.clients.jedis.exceptions.JedisConnectionException: java.net.SocketTimeoutException: Read timed out at redis.clients.util.RedisInputStream.ensureFill(RedisInputStream.java:202) at redis.clients.util.RedisInputStream.readByte(RedisInputStream.java:40) at redis.clients.jedis.Protocol.process(Protocol.java:151) at redis.clients.jedis.Protocol.read(Protocol.java:215) at redis.clients.jedis.Connection.readProtocolWithCheckingBroken(Connection.java:340) at redis.clients.jedis.Connection.getBinaryMultiBulkReply(Connection.java:276) at redis.clients.jedis.Jedis.hgetAll(Jedis.java:848) at com.redislabs.provider.redis.rdd.RedisKVRDD$$anonfun$getHASH$1$$anonfun$1.apply(RedisRDD.scala:60) at com.redislabs.provider.redis.rdd.RedisKVRDD$$anonfun$getHASH$1$$anonfun$1.apply(RedisRDD.scala:60) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186) at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241) at scala.collection.mutable.ArrayOps$ofRef.flatMap(ArrayOps.scala:186) at com.redislabs.provider.redis.rdd.RedisKVRDD$$anonfun$getHASH$1.apply(RedisRDD.scala:60) at com.redislabs.provider.redis.rdd.RedisKVRDD$$anonfun$getHASH$1.apply(RedisRDD.scala:56) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186) at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241) at scala.collection.mutable.ArrayOps$ofRef.flatMap(ArrayOps.scala:186) at com.redislabs.provider.redis.rdd.RedisKVRDD.getHASH(RedisRDD.scala:55) at com.redislabs.provider.redis.rdd.RedisKVRDD.compute(RedisRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:109) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: java.net.SocketTimeoutException: Read timed out at java.net.SocketInputStream.socketRead0(Native Method) at java.net.SocketInputStream.socketRead(SocketInputStream.java:116) at java.net.SocketInputStream.read(SocketInputStream.java:171) at java.net.SocketInputStream.read(SocketInputStream.java:141) at java.net.SocketInputStream.read(SocketInputStream.java:127) at redis.clients.util.RedisInputStream.ensureFill(RedisInputStream.java:196) ... 32 more

18/10/15 17:29:36 ERROR TaskSetManager: Task 0 in stage 2.0 failed 1 times; aborting job 18/10/15 17:29:36 INFO TaskSchedulerImpl: Removed TaskSet 2.0, whose tasks have all completed, from pool 18/10/15 17:29:36 INFO TaskSchedulerImpl: Cancelling stage 2 18/10/15 17:29:36 INFO DAGScheduler: ResultStage 2 (count at MainStreaming.scala:43) failed in 2.048 s due to Job aborted due to stage failure: Task 0 in stage 2.0 failed 1 times, most recent failure: Lost task 0.0 in stage 2.0 (TID 2, localhost, executor driver): redis.clients.jedis.exceptions.JedisConnectionException: java.net.SocketTimeoutException: Read timed out at redis.clients.util.RedisInputStream.ensureFill(RedisInputStream.java:202) at redis.clients.util.RedisInputStream.readByte(RedisInputStream.java:40) at redis.clients.jedis.Protocol.process(Protocol.java:151) at redis.clients.jedis.Protocol.read(Protocol.java:215) at redis.clients.jedis.Connection.readProtocolWithCheckingBroken(Connection.java:340) at redis.clients.jedis.Connection.getBinaryMultiBulkReply(Connection.java:276) at redis.clients.jedis.Jedis.hgetAll(Jedis.java:848) at com.redislabs.provider.redis.rdd.RedisKVRDD$$anonfun$getHASH$1$$anonfun$1.apply(RedisRDD.scala:60) at com.redislabs.provider.redis.rdd.RedisKVRDD$$anonfun$getHASH$1$$anonfun$1.apply(RedisRDD.scala:60) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186) at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241) at scala.collection.mutable.ArrayOps$ofRef.flatMap(ArrayOps.scala:186) at com.redislabs.provider.redis.rdd.RedisKVRDD$$anonfun$getHASH$1.apply(RedisRDD.scala:60) at com.redislabs.provider.redis.rdd.RedisKVRDD$$anonfun$getHASH$1.apply(RedisRDD.scala:56) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186) at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241) at scala.collection.mutable.ArrayOps$ofRef.flatMap(ArrayOps.scala:186) at com.redislabs.provider.redis.rdd.RedisKVRDD.getHASH(RedisRDD.scala:55) at com.redislabs.provider.redis.rdd.RedisKVRDD.compute(RedisRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:109) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: java.net.SocketTimeoutException: Read timed out at java.net.SocketInputStream.socketRead0(Native Method) at java.net.SocketInputStream.socketRead(SocketInputStream.java:116) at java.net.SocketInputStream.read(SocketInputStream.java:171) at java.net.SocketInputStream.read(SocketInputStream.java:141) at java.net.SocketInputStream.read(SocketInputStream.java:127) at redis.clients.util.RedisInputStream.ensureFill(RedisInputStream.java:196) ... 32 more

Driver stacktrace: 18/10/15 17:29:36 INFO DAGScheduler: Job 2 failed: count at MainStreaming.scala:43, took 2.052060 s Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 2.0 failed 1 times, most recent failure: Lost task 0.0 in stage 2.0 (TID 2, localhost, executor driver): redis.clients.jedis.exceptions.JedisConnectionException: java.net.SocketTimeoutException: Read timed out at redis.clients.util.RedisInputStream.ensureFill(RedisInputStream.java:202) at redis.clients.util.RedisInputStream.readByte(RedisInputStream.java:40) at redis.clients.jedis.Protocol.process(Protocol.java:151) at redis.clients.jedis.Protocol.read(Protocol.java:215) at redis.clients.jedis.Connection.readProtocolWithCheckingBroken(Connection.java:340) at redis.clients.jedis.Connection.getBinaryMultiBulkReply(Connection.java:276) at redis.clients.jedis.Jedis.hgetAll(Jedis.java:848) at com.redislabs.provider.redis.rdd.RedisKVRDD$$anonfun$getHASH$1$$anonfun$1.apply(RedisRDD.scala:60) at com.redislabs.provider.redis.rdd.RedisKVRDD$$anonfun$getHASH$1$$anonfun$1.apply(RedisRDD.scala:60) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186) at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241) at scala.collection.mutable.ArrayOps$ofRef.flatMap(ArrayOps.scala:186) at com.redislabs.provider.redis.rdd.RedisKVRDD$$anonfun$getHASH$1.apply(RedisRDD.scala:60) at com.redislabs.provider.redis.rdd.RedisKVRDD$$anonfun$getHASH$1.apply(RedisRDD.scala:56) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186) at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241) at scala.collection.mutable.ArrayOps$ofRef.flatMap(ArrayOps.scala:186) at com.redislabs.provider.redis.rdd.RedisKVRDD.getHASH(RedisRDD.scala:55) at com.redislabs.provider.redis.rdd.RedisKVRDD.compute(RedisRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:109) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: java.net.SocketTimeoutException: Read timed out at java.net.SocketInputStream.socketRead0(Native Method) at java.net.SocketInputStream.socketRead(SocketInputStream.java:116) at java.net.SocketInputStream.read(SocketInputStream.java:171) at java.net.SocketInputStream.read(SocketInputStream.java:141) at java.net.SocketInputStream.read(SocketInputStream.java:127) at redis.clients.util.RedisInputStream.ensureFill(RedisInputStream.java:196) ... 32 more

Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1602) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1590) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1589) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1589) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831) at scala.Option.foreach(Option.scala:257) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:831) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1823) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1772) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1761) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:642) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2034) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2055) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2074) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2099) at org.apache.spark.rdd.RDD.count(RDD.scala:1162) at MainStreaming$.createReplaceRedisView(MainStreaming.scala:43) at MainStreaming$.customerUpdates(MainStreaming.scala:165) at MainStreaming$.main(MainStreaming.scala:240) at MainStreaming.main(MainStreaming.scala) Caused by: redis.clients.jedis.exceptions.JedisConnectionException: java.net.SocketTimeoutException: Read timed out at redis.clients.util.RedisInputStream.ensureFill(RedisInputStream.java:202) at redis.clients.util.RedisInputStream.readByte(RedisInputStream.java:40) at redis.clients.jedis.Protocol.process(Protocol.java:151) at redis.clients.jedis.Protocol.read(Protocol.java:215) at redis.clients.jedis.Connection.readProtocolWithCheckingBroken(Connection.java:340) at redis.clients.jedis.Connection.getBinaryMultiBulkReply(Connection.java:276) at redis.clients.jedis.Jedis.hgetAll(Jedis.java:848) at com.redislabs.provider.redis.rdd.RedisKVRDD$$anonfun$getHASH$1$$anonfun$1.apply(RedisRDD.scala:60) at com.redislabs.provider.redis.rdd.RedisKVRDD$$anonfun$getHASH$1$$anonfun$1.apply(RedisRDD.scala:60) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186) at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241) at scala.collection.mutable.ArrayOps$ofRef.flatMap(ArrayOps.scala:186) at com.redislabs.provider.redis.rdd.RedisKVRDD$$anonfun$getHASH$1.apply(RedisRDD.scala:60) at com.redislabs.provider.redis.rdd.RedisKVRDD$$anonfun$getHASH$1.apply(RedisRDD.scala:56) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186) at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241) at scala.collection.mutable.ArrayOps$ofRef.flatMap(ArrayOps.scala:186) at com.redislabs.provider.redis.rdd.RedisKVRDD.getHASH(RedisRDD.scala:55) at com.redislabs.provider.redis.rdd.RedisKVRDD.compute(RedisRDD.scala:35) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:109) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: java.net.SocketTimeoutException: Read timed out at java.net.SocketInputStream.socketRead0(Native Method) at java.net.SocketInputStream.socketRead(SocketInputStream.java:116) at java.net.SocketInputStream.read(SocketInputStream.java:171) at java.net.SocketInputStream.read(SocketInputStream.java:141) at java.net.SocketInputStream.read(SocketInputStream.java:127) at redis.clients.util.RedisInputStream.ensureFill(RedisInputStream.java:196) ... 32 more 18/10/15 17:29:45 INFO SparkContext: Invoking stop() from shutdown hook 18/10/15 17:29:45 INFO SparkUI: Stopped Spark web UI at http://192.168.34.216:4043 18/10/15 17:29:45 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped! 18/10/15 17:29:45 INFO MemoryStore: MemoryStore cleared 18/10/15 17:29:45 INFO BlockManager: BlockManager stopped 18/10/15 17:29:45 INFO BlockManagerMaster: BlockManagerMaster stopped 18/10/15 17:29:45 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped! 18/10/15 17:29:45 INFO SparkContext: Successfully stopped SparkContext 18/10/15 17:29:45 INFO ShutdownHookManager: Shutdown hook called 18/10/15 17:29:45 INFO ShutdownHookManager: Deleting directory /tmp/spark-11ef82ef-a4b2-49ea-9e65-66955bc948cb 18/10/15 17:29:45 INFO ShutdownHookManager: Deleting directory /tmp/temporaryReader-c274d20d-87f4-4c51-963e-bd7699233ab0

Process finished with exit code 130 (interrupted by signal 2: SIGINT) `

itamarhaber commented 6 years ago

@beyhangl Is it also possible to look at the redis.log file from around that time? Also, does running SLOWLOG GET return anything?

beyhangl commented 6 years ago

Redis log

16184:C 11 Oct 16:53:01.976 # oO0OoO0OoO0Oo Redis is starting oO0OoO0OoO0Oo 16184:C 11 Oct 16:53:01.976 # Redis version=4.0.11, bits=64, commit=00000000, modified=0, pid=16184, just started 16184:C 11 Oct 16:53:01.976 # Configuration loaded 16184:M 11 Oct 16:53:01.977 * Increased maximum number of open files to 10032 (it was originally set to 1024). .
_.-__ ''-._ _.- .. ''-. Redis 4.0.11 (00000000/0) 64 bit .-.-```. ```\/ _.,_ ''-._ ( ' , .-` | `, ) Running in standalone mode |`-._`-...-` __...-.-.|'` .-'| Port: 6379 | -._. / .-' | PID: 16184 -._-. `-./ .-' .-'
|`-.
-._-..-' .-'.-'|
| -._-. .-'.-' | http://redis.io
`-.
-._-.
.-'.-' .-'
|-._-._ -.__.-' _.-'_.-'| |-.`-. .-'.-' |
-._-._-.__.-'_.-' _.-' -._ -.__.-' _.-' -. .-'
`-.__.-'

16184:M 11 Oct 16:53:01.978 # WARNING: The TCP backlog setting of 511 cannot be enforced because /proc/sys/net/core/somaxconn is set to the lower value of 128. 16184:M 11 Oct 16:53:01.978 # Server initialized 16184:M 11 Oct 16:53:01.978 # WARNING overcommit_memory is set to 0! Background save may fail under low memory condition. To fix this issue add 'vm.overcommit_memory = 1' to /etc/sysctl.conf and then reboot or run the command 'sysctl vm.overcommit_memory=1' for this to take effect. 16184:M 11 Oct 16:53:01.978 # WARNING you have Transparent Huge Pages (THP) support enabled in your kernel. This will create latency and memory usage issues with Redis. To fix this issue run the command 'echo never > /sys/kernel/mm/transparent_hugepage/enabled' as root, and add it to your /etc/rc.local in order to retain the setting after a reboot. Redis must be restarted after THP is disabled. 16184:M 11 Oct 16:53:11.189 DB loaded from disk: 9.210 seconds 16184:M 11 Oct 16:53:11.189 Ready to accept connections

SLOWLOG GET

127.0.0.1:6379> SLOWLOG GET 1) 1) (integer) 0 2) (integer) 1539615703 3) (integer) 4652680 4) 1) "HGETALL" 2) "customerTesttim" 5) "127.0.0.1:48558" 6) ""

fe2s commented 6 years ago

As a workaround you may try to increase the timeout by setting .set("redis.timeout", "10000") or .set("spark.redis.timeout", "10000")
depending on which spark-redis version you use. The default timeout is 2000 (2 seconds).