dotnet / spark

.NET for Apache® Spark™ makes Apache Spark™ easily accessible to .NET developers.
https://dot.net/spark
MIT License
2.02k stars 312 forks source link

Failed to connect spark to kafka #634

Closed m-evazzadeh closed 4 years ago

m-evazzadeh commented 4 years ago

I am using Apache Spark in .Net Core

I'm trying to connect Spark Streaming with Kafka, when I run my application I get the below errors.

----------------------------my source code:

string bootstrapServers = Helper.Instance.KafkaUri;
            string subscribeType = "subscribe";
            string topics = Helper.Instance.KafkaTopic;

            System.Console.WriteLine("l1");

            System.Console.WriteLine(bootstrapServers);
            System.Console.WriteLine(subscribeType);
            System.Console.WriteLine(topics);
            System.Console.WriteLine("l2");

            SparkSession spark = SparkSession
                .Builder()
                .AppName("StructuredKafkaWordCount")
                .GetOrCreate();
            System.Console.WriteLine("l3");

            DataFrame lines = spark
                .ReadStream()
                .Format("kafka")
                .Option("kafka.bootstrap.servers", bootstrapServers)
                .Option(subscribeType, topics)

                .Load()
                .SelectExpr("CAST(value AS STRING)");
            System.Console.WriteLine("l4");

            DataFrame words = lines
                .Select(Explode(Split(lines["value"], " "))
                    .Alias("word"));
            DataFrame wordCounts = words.GroupBy("word").Count();
            System.Console.WriteLine("l5");

            StreamingQuery query = wordCounts
                .WriteStream()
                .OutputMode("complete")
                .Format("console")
                .Start();
            System.Console.WriteLine("l6");

            query.AwaitTermination();

-----------------------Log: Ivy Default Cache set to: C:\Users\MyUserAccount.ivy2\cache The jars for the packages stored in: C:\Users\MyUserAccount.ivy2\jars :: loading settings :: url = jar:file:/C:/bin/spark-2.4.1-bin-hadoop2.7/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml org.apache.spark#spark-sql-kafka-0-10_2.11 added as a dependency :: resolving dependencies :: org.apache.spark#spark-submit-parent-. . .

        |                  |            modules            ||   artifacts   |
        |       conf       | number| search|dwnlded|evicted|| number|dwnlded|
        ---------------------------------------------------------------------
        |      default     |   6   |   0   |   0   |   0   ||   6   |   0   |
        ---------------------------------------------------------------------
:: retrieving :: org.apache.spark#spark-submit-parent-83ef8d1b-5e0e-420d-af99-18bdeeed2bc7
        confs: [default]
        0 artifacts copied, 6 already retrieved (0kB/15ms)
20/08/15 15:32:07 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
20/08/15 15:32:07 INFO DotnetRunner: Starting DotnetBackend with dotnet.
20/08/15 15:32:08 INFO DotnetRunner: Port number used by DotnetBackend is 57067
20/08/15 15:32:08 INFO DotnetRunner: Adding key=spark.jars and value=file:///C:/Users/MyUserAccount/.ivy2/jars/org.apache.spark_spark-sql-kafka-0-10_2.11-2.3.2.jar,file:///C:/Users/MyUserAccount/.ivy2/jars/org.apache.kafka_kafka-clients-0.10.0.1.jar,file:///C:/Users/MyUserAccount/.ivy2/jars/org.spark-project.spark_unused-1.0.0.jar,file:///C:/Users/MyUserAccount/.ivy2/jars/net.jpountz.lz4_lz4-1.3.0.jar,file:///C:/Users/MyUserAccount/.ivy2/jars/org.xerial.snappy_snappy-java-1.1.2.6.jar,file:///C:/Users/MyUserAccount/.ivy2/jars/org.slf4j_slf4j-api-1.7.16.jar,file:/C:/MyUserAccount/Projects/CEP/source/TradeValue/Asa.Cep.TradeValue/Asa.Cep.TradeValue.Processing/bin/Debug/netcoreapp3.1/microsoft-spark-2.4.x-0.10.0.jar to environment
20/08/15 15:32:08 INFO DotnetRunner: Adding key=spark.app.name and value=org.apache.spark.deploy.dotnet.DotnetRunner to environment
20/08/15 15:32:08 INFO DotnetRunner: Adding key=spark.submit.deployMode and value=client to environment
20/08/15 15:32:08 INFO DotnetRunner: Adding key=spark.master and value=local to environment
20/08/15 15:32:08 INFO DotnetRunner: Adding key=spark.repl.local.jars and value=file:///C:/Users/MyUserAccount/.ivy2/jars/org.apache.spark_spark-sql-kafka-0-10_2.11-2.3.2.jar,file:///C:/Users/MyUserAccount/.ivy2/jars/org.apache.kafka_kafka-clients-0.10.0.1.jar,file:///C:/Users/MyUserAccount/.ivy2/jars/org.spark-project.spark_unused-1.0.0.jar,file:///C:/Users/MyUserAccount/.ivy2/jars/net.jpountz.lz4_lz4-1.3.0.jar,file:///C:/Users/MyUserAccount/.ivy2/jars/org.xerial.snappy_snappy-java-1.1.2.6.jar,file:///C:/Users/MyUserAccount/.ivy2/jars/org.slf4j_slf4j-api-1.7.16.jar to environment
l1
http://localhost:9092
subscribe
chat-message
l2
[2020-08-15T11:02:08.9293042Z] [MyUserAccount] [Info] [ConfigurationService] Using port 57067 for connection.
[2020-08-15T11:02:08.9303643Z] [MyUserAccount] [Info] [JvmBridge] JvMBridge port is 57067
20/08/15 15:32:09 INFO SparkContext: Running Spark version 2.4.1
20/08/15 15:32:09 INFO SparkContext: Submitted application: StructuredKafkaWordCount
20/08/15 15:32:09 INFO SecurityManager: Changing view acls to: MyUserAccount
20/08/15 15:32:09 INFO SecurityManager: Changing modify acls to: MyUserAccount
20/08/15 15:32:09 INFO SecurityManager: Changing view acls groups to:
20/08/15 15:32:09 INFO SecurityManager: Changing modify acls groups to:
20/08/15 15:32:09 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(MyUserAccount); groups with view permissions: Set(); users  with modify permissions: Set(MyUserAccount); groups with modify permissions: Set()
20/08/15 15:32:09 INFO Utils: Successfully started service 'sparkDriver' on port 57073.
20/08/15 15:32:09 INFO SparkEnv: Registering MapOutputTracker
20/08/15 15:32:09 INFO SparkEnv: Registering BlockManagerMaster
20/08/15 15:32:09 INFO BlockManagerMasterEndpoint: Using org.apache.spark.storage.DefaultTopologyMapper for getting topology information
20/08/15 15:32:09 INFO BlockManagerMasterEndpoint: BlockManagerMasterEndpoint up
20/08/15 15:32:09 INFO DiskBlockManager: Created local directory at C:\Users\MyUserAccount\AppData\Local\Temp\blockmgr-4276b71f-affa-46bd-8a0a-fac55f0f825f
20/08/15 15:32:09 INFO MemoryStore: MemoryStore started with capacity 366.3 MB
20/08/15 15:32:09 INFO SparkEnv: Registering OutputCommitCoordinator
20/08/15 15:32:09 INFO Utils: Successfully started service 'SparkUI' on port 4040.
20/08/15 15:32:09 INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at http://MyUserAccount.asax.local:4040
20/08/15 15:32:09 INFO SparkContext: Added JAR file:///C:/Users/MyUserAccount/.ivy2/jars/org.apache.spark_spark-sql-kafka-0-10_2.11-2.3.2.jar at spark://MyUserAccount.asax.local:57073/jars/org.apache.spark_spark-sql-kafka-0-10_2.11-2.3.2.jar with timestamp 1597489329693
20/08/15 15:32:09 INFO SparkContext: Added JAR file:///C:/Users/MyUserAccount/.ivy2/jars/org.apache.kafka_kafka-clients-0.10.0.1.jar at spark://MyUserAccount.asax.local:57073/jars/org.apache.kafka_kafka-clients-0.10.0.1.jar with timestamp 1597489329694
20/08/15 15:32:09 INFO SparkContext: Added JAR file:///C:/Users/MyUserAccount/.ivy2/jars/org.spark-project.spark_unused-1.0.0.jar at spark://MyUserAccount.asax.local:57073/jars/org.spark-project.spark_unused-1.0.0.jar with timestamp 1597489329696
20/08/15 15:32:09 INFO SparkContext: Added JAR file:///C:/Users/MyUserAccount/.ivy2/jars/net.jpountz.lz4_lz4-1.3.0.jar at spark://MyUserAccount.asax.local:57073/jars/net.jpountz.lz4_lz4-1.3.0.jar with timestamp 1597489329697
20/08/15 15:32:09 INFO SparkContext: Added JAR file:///C:/Users/MyUserAccount/.ivy2/jars/org.xerial.snappy_snappy-java-1.1.2.6.jar at spark://MyUserAccount.asax.local:57073/jars/org.xerial.snappy_snappy-java-1.1.2.6.jar with timestamp 1597489329698
20/08/15 15:32:09 INFO SparkContext: Added JAR file:///C:/Users/MyUserAccount/.ivy2/jars/org.slf4j_slf4j-api-1.7.16.jar at spark://MyUserAccount.asax.local:57073/jars/org.slf4j_slf4j-api-1.7.16.jar with timestamp 1597489329699
20/08/15 15:32:09 INFO SparkContext: Added JAR file:/C:/MyUserAccount/Projects/CEP/source/TradeValue/Asa.Cep.TradeValue/Asa.Cep.TradeValue.Processing/bin/Debug/netcoreapp3.1/microsoft-spark-2.4.x-0.10.0.jar at spark://MyUserAccount.asax.local:57073/jars/microsoft-spark-2.4.x-0.10.0.jar with timestamp 1597489329699
20/08/15 15:32:09 INFO SparkContext: Added JAR file:/C:/MyUserAccount/Projects/CEP/source/TradeValue/Asa.Cep.TradeValue/Asa.Cep.TradeValue.Processing/bin/Debug/netcoreapp3.1/microsoft-spark-2.4.x-0.10.0.jar at spark://MyUserAccount.asax.local:57073/jars/microsoft-spark-2.4.x-0.10.0.jar with timestamp 1597489329699
20/08/15 15:32:09 INFO Executor: Starting executor ID driver on host localhost
20/08/15 15:32:09 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 57082.
20/08/15 15:32:09 INFO NettyBlockTransferService: Server created on MyUserAccount.asax.local:57082
20/08/15 15:32:09 INFO BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy for block replication policy
20/08/15 15:32:09 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(driver, MyUserAccount.asax.local, 57082, None)
20/08/15 15:32:09 INFO BlockManagerMasterEndpoint: Registering block manager MyUserAccount.asax.local:57082 with 366.3 MB RAM, BlockManagerId(driver, MyUserAccount.asax.local, 57082, None)
20/08/15 15:32:09 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, MyUserAccount.asax.local, 57082, None)
20/08/15 15:32:09 INFO BlockManager: Initialized BlockManager: BlockManagerId(driver, MyUserAccount.asax.local, 57082, None)
l3
20/08/15 15:32:10 INFO SharedState: Setting hive.metastore.warehouse.dir ('null') to the value of spark.sql.warehouse.dir ('file:/C:/MyUserAccount/Projects/CEP/source/TradeValue/Asa.Cep.TradeValue/Asa.Cep.TradeValue.Processing/spark-warehouse').
20/08/15 15:32:10 INFO SharedState: Warehouse path is 'file:/C:/MyUserAccount/Projects/CEP/source/TradeValue/Asa.Cep.TradeValue/Asa.Cep.TradeValue.Processing/spark-warehouse'.
20/08/15 15:32:10 INFO StateStoreCoordinatorRef: Registered StateStoreCoordinator endpoint
20/08/15 15:32:10 INFO ConsumerConfig: ConsumerConfig values:
        metric.reporters = []
        metadata.max.age.ms = 300000
        partition.assignment.strategy = [org.apache.kafka.clients.consumer.RangeAssignor]
        reconnect.backoff.ms = 50
        sasl.kerberos.ticket.renew.window.factor = 0.8
        max.partition.fetch.bytes = 1048576
        bootstrap.servers = [http://localhost:9092]
        ssl.keystore.type = JKS
        enable.auto.commit = false
        sasl.mechanism = GSSAPI
        interceptor.classes = null
        exclude.internal.topics = true
        ssl.truststore.password = null
        client.id =
        ssl.endpoint.identification.algorithm = null
        max.poll.records = 1
        check.crcs = true
        request.timeout.ms = 40000
        heartbeat.interval.ms = 3000
        auto.commit.interval.ms = 5000
        receive.buffer.bytes = 65536
        ssl.truststore.type = JKS
        ssl.truststore.location = null
        ssl.keystore.password = null
        fetch.min.bytes = 1
        send.buffer.bytes = 131072
        value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
        group.id = spark-kafka-source-75008566-d5fc-4022-859c-d957a6590559--1597538808-driver-0
        retry.backoff.ms = 100
        sasl.kerberos.kinit.cmd = /usr/bin/kinit
        sasl.kerberos.service.name = null
        sasl.kerberos.ticket.renew.jitter = 0.05
        ssl.trustmanager.algorithm = PKIX
        ssl.key.password = null
        fetch.max.wait.ms = 500
        sasl.kerberos.min.time.before.relogin = 60000
        connections.max.idle.ms = 540000
        session.timeout.ms = 30000
        metrics.num.samples = 2
        key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
        ssl.protocol = TLS
        ssl.provider = null
        ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
        ssl.keystore.location = null
        ssl.cipher.suites = null
        security.protocol = PLAINTEXT
        ssl.keymanager.algorithm = SunX509
        metrics.sample.window.ms = 30000
        auto.offset.reset = earliest

20/08/15 15:32:10 INFO ConsumerConfig: ConsumerConfig values:
        metric.reporters = []
        metadata.max.age.ms = 300000
        partition.assignment.strategy = [org.apache.kafka.clients.consumer.RangeAssignor]
        reconnect.backoff.ms = 50
        sasl.kerberos.ticket.renew.window.factor = 0.8
        max.partition.fetch.bytes = 1048576
        bootstrap.servers = [http://localhost:9092]
        ssl.keystore.type = JKS
        enable.auto.commit = false
        sasl.mechanism = GSSAPI
        interceptor.classes = null
        exclude.internal.topics = true
        ssl.truststore.password = null
        client.id = consumer-1
        ssl.endpoint.identification.algorithm = null
        max.poll.records = 1
        check.crcs = true
        request.timeout.ms = 40000
        heartbeat.interval.ms = 3000
        auto.commit.interval.ms = 5000
        receive.buffer.bytes = 65536
        ssl.truststore.type = JKS
        ssl.truststore.location = null
        ssl.keystore.password = null
        fetch.min.bytes = 1
        send.buffer.bytes = 131072
        value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
        group.id = spark-kafka-source-75008566-d5fc-4022-859c-d957a6590559--1597538808-driver-0
        retry.backoff.ms = 100
        sasl.kerberos.kinit.cmd = /usr/bin/kinit
        sasl.kerberos.service.name = null
        sasl.kerberos.ticket.renew.jitter = 0.05
        ssl.trustmanager.algorithm = PKIX
        ssl.key.password = null
        fetch.max.wait.ms = 500
        sasl.kerberos.min.time.before.relogin = 60000
        connections.max.idle.ms = 540000
        session.timeout.ms = 30000
        metrics.num.samples = 2
        key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
        ssl.protocol = TLS
        ssl.provider = null
        ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
        ssl.keystore.location = null
        ssl.cipher.suites = null
        security.protocol = PLAINTEXT
        ssl.keymanager.algorithm = SunX509
        metrics.sample.window.ms = 30000
        auto.offset.reset = earliest

20/08/15 15:32:10 INFO AppInfoParser: Kafka version : 0.10.0.1
20/08/15 15:32:10 INFO AppInfoParser: Kafka commitId : a7a17cdec9eaa6c5
20/08/15 15:32:10 ERROR DotnetBackendHandler: methods:
20/08/15 15:32:10 ERROR DotnetBackendHandler: public org.slf4j.Logger org.apache.spark.sql.streaming.DataStreamReader.log()
20/08/15 15:32:10 ERROR DotnetBackendHandler: public org.apache.spark.sql.streaming.DataStreamReader org.apache.spark.sql.streaming.DataStreamReader.format(java.lang.String)
20/08/15 15:32:10 ERROR DotnetBackendHandler: public org.apache.spark.sql.Dataset org.apache.spark.sql.streaming.DataStreamReader.load(java.lang.String)
20/08/15 15:32:10 ERROR DotnetBackendHandler: public org.apache.spark.sql.Dataset org.apache.spark.sql.streaming.DataStreamReader.load()
20/08/15 15:32:10 ERROR DotnetBackendHandler: public void org.apache.spark.sql.streaming.DataStreamReader.logDebug(scala.Function0,java.lang.Throwable)
20/08/15 15:32:10 ERROR DotnetBackendHandler: public void org.apache.spark.sql.streaming.DataStreamReader.logDebug(scala.Function0)
20/08/15 15:32:10 ERROR DotnetBackendHandler: public void org.apache.spark.sql.streaming.DataStreamReader.logWarning(scala.Function0,java.lang.Throwable)
20/08/15 15:32:10 ERROR DotnetBackendHandler: public void org.apache.spark.sql.streaming.DataStreamReader.logWarning(scala.Function0)
20/08/15 15:32:10 ERROR DotnetBackendHandler: public java.lang.String org.apache.spark.sql.streaming.DataStreamReader.logName()
20/08/15 15:32:10 ERROR DotnetBackendHandler: public void org.apache.spark.sql.streaming.DataStreamReader.logTrace(scala.Function0)
20/08/15 15:32:10 ERROR DotnetBackendHandler: public void org.apache.spark.sql.streaming.DataStreamReader.logTrace(scala.Function0,java.lang.Throwable)
20/08/15 15:32:10 ERROR DotnetBackendHandler: public void org.apache.spark.sql.streaming.DataStreamReader.logInfo(scala.Function0)
20/08/15 15:32:10 ERROR DotnetBackendHandler: public void org.apache.spark.sql.streaming.DataStreamReader.logInfo(scala.Function0,java.lang.Throwable)
20/08/15 15:32:10 ERROR DotnetBackendHandler: public void org.apache.spark.sql.streaming.DataStreamReader.logError(scala.Function0)
20/08/15 15:32:10 ERROR DotnetBackendHandler: public void org.apache.spark.sql.streaming.DataStreamReader.logError(scala.Function0,java.lang.Throwable)
20/08/15 15:32:10 ERROR DotnetBackendHandler: public boolean org.apache.spark.sql.streaming.DataStreamReader.isTraceEnabled()
20/08/15 15:32:10 ERROR DotnetBackendHandler: public org.slf4j.Logger org.apache.spark.sql.streaming.DataStreamReader.org$apache$spark$internal$Logging$$log_()
20/08/15 15:32:10 ERROR DotnetBackendHandler: public boolean org.apache.spark.sql.streaming.DataStreamReader.initializeLogIfNecessary$default$2()
20/08/15 15:32:10 ERROR DotnetBackendHandler: public void org.apache.spark.sql.streaming.DataStreamReader.org$apache$spark$internal$Logging$$log__$eq(org.slf4j.Logger)
20/08/15 15:32:10 ERROR DotnetBackendHandler: public void org.apache.spark.sql.streaming.DataStreamReader.initializeLogIfNecessary(boolean)
20/08/15 15:32:10 ERROR DotnetBackendHandler: public boolean org.apache.spark.sql.streaming.DataStreamReader.initializeLogIfNecessary(boolean,boolean)
20/08/15 15:32:10 ERROR DotnetBackendHandler: public org.apache.spark.sql.streaming.DataStreamReader org.apache.spark.sql.streaming.DataStreamReader.options(scala.collection.Map)
20/08/15 15:32:10 ERROR DotnetBackendHandler: public org.apache.spark.sql.streaming.DataStreamReader org.apache.spark.sql.streaming.DataStreamReader.options(java.util.Map)
20/08/15 15:32:10 ERROR DotnetBackendHandler: public org.apache.spark.sql.streaming.DataStreamReader org.apache.spark.sql.streaming.DataStreamReader.option(java.lang.String,java.lang.String)
20/08/15 15:32:10 ERROR DotnetBackendHandler: public org.apache.spark.sql.streaming.DataStreamReader org.apache.spark.sql.streaming.DataStreamReader.option(java.lang.String,boolean)
20/08/15 15:32:10 ERROR DotnetBackendHandler: public org.apache.spark.sql.streaming.DataStreamReader org.apache.spark.sql.streaming.DataStreamReader.option(java.lang.String,long)
20/08/15 15:32:10 ERROR DotnetBackendHandler: public org.apache.spark.sql.streaming.DataStreamReader org.apache.spark.sql.streaming.DataStreamReader.option(java.lang.String,double)
20/08/15 15:32:10 ERROR DotnetBackendHandler: public org.apache.spark.sql.Dataset org.apache.spark.sql.streaming.DataStreamReader.text(java.lang.String)
20/08/15 15:32:10 ERROR DotnetBackendHandler: public org.apache.spark.sql.streaming.DataStreamReader org.apache.spark.sql.streaming.DataStreamReader.schema(org.apache.spark.sql.types.StructType)
20/08/15 15:32:10 ERROR DotnetBackendHandler: public org.apache.spark.sql.streaming.DataStreamReader org.apache.spark.sql.streaming.DataStreamReader.schema(java.lang.String)
20/08/15 15:32:10 ERROR DotnetBackendHandler: public org.apache.spark.sql.Dataset org.apache.spark.sql.streaming.DataStreamReader.json(java.lang.String)
20/08/15 15:32:10 ERROR DotnetBackendHandler: public org.apache.spark.sql.Dataset org.apache.spark.sql.streaming.DataStreamReader.textFile(java.lang.String)
20/08/15 15:32:10 ERROR DotnetBackendHandler: public org.apache.spark.sql.Dataset org.apache.spark.sql.streaming.DataStreamReader.parquet(java.lang.String)
20/08/15 15:32:10 ERROR DotnetBackendHandler: public org.apache.spark.sql.Dataset org.apache.spark.sql.streaming.DataStreamReader.orc(java.lang.String)
20/08/15 15:32:10 ERROR DotnetBackendHandler: public org.apache.spark.sql.Dataset org.apache.spark.sql.streaming.DataStreamReader.csv(java.lang.String)
20/08/15 15:32:10 ERROR DotnetBackendHandler: public final void java.lang.Object.wait() throws java.lang.InterruptedException
20/08/15 15:32:10 ERROR DotnetBackendHandler: public final void java.lang.Object.wait(long,int) throws java.lang.InterruptedException
20/08/15 15:32:10 ERROR DotnetBackendHandler: public final native void java.lang.Object.wait(long) throws java.lang.InterruptedException
20/08/15 15:32:10 ERROR DotnetBackendHandler: public boolean java.lang.Object.equals(java.lang.Object)
20/08/15 15:32:10 ERROR DotnetBackendHandler: public java.lang.String java.lang.Object.toString()
20/08/15 15:32:10 ERROR DotnetBackendHandler: public native int java.lang.Object.hashCode()
20/08/15 15:32:10 ERROR DotnetBackendHandler: public final native java.lang.Class java.lang.Object.getClass()
20/08/15 15:32:10 ERROR DotnetBackendHandler: public final native void java.lang.Object.notify()
20/08/15 15:32:10 ERROR DotnetBackendHandler: public final native void java.lang.Object.notifyAll()
20/08/15 15:32:10 ERROR DotnetBackendHandler: args:
[2020-08-15T11:02:10.9187218Z] [MyUserAccount] [Error] [JvmBridge] JVM method execution failed: Nonstatic method load failed for class 7 when called with no arguments
[2020-08-15T11:02:10.9188973Z] [MyUserAccount] [Error] [JvmBridge] java.lang.NoClassDefFoundError: org/apache/spark/sql/sources/v2/reader/SupportsScanUnsafeRow
        at java.lang.ClassLoader.defineClass1(Native Method)
        at java.lang.ClassLoader.defineClass(ClassLoader.java:756)
        at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
        at java.net.URLClassLoader.defineClass(URLClassLoader.java:468)
        at java.net.URLClassLoader.access$100(URLClassLoader.java:74)
        at java.net.URLClassLoader$1.run(URLClassLoader.java:369)
        at java.net.URLClassLoader$1.run(URLClassLoader.java:363)
        at java.security.AccessController.doPrivileged(Native Method)
        at java.net.URLClassLoader.findClass(URLClassLoader.java:362)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
        at org.apache.spark.sql.kafka010.KafkaSourceProvider.createContinuousReader(KafkaSourceProvider.scala:136)
        at org.apache.spark.sql.kafka010.KafkaSourceProvider.createContinuousReader(KafkaSourceProvider.scala:43)
        at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:204)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.spark.api.dotnet.DotnetBackendHandler.handleMethodCall(DotnetBackendHandler.scala:162)
        at org.apache.spark.api.dotnet.DotnetBackendHandler.handleBackendRequest(DotnetBackendHandler.scala:102)
        at org.apache.spark.api.dotnet.DotnetBackendHandler.channelRead0(DotnetBackendHandler.scala:29)
        at org.apache.spark.api.dotnet.DotnetBackendHandler.channelRead0(DotnetBackendHandler.scala:24)
        at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
        at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
        at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:310)
        at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:284)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
        at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1359)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
        at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:935)
        at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:138)
        at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:645)
        at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:580)
        at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:497)
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459)
        at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858)
        at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassNotFoundException: org.apache.spark.sql.sources.v2.reader.SupportsScanUnsafeRow
        at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
        ... 47 more

[2020-08-15T11:02:10.9333693Z] [MyUserAccount] [Exception] [JvmBridge] JVM method execution failed: Nonstatic method load failed for class 7 when called with no arguments
   at Microsoft.Spark.Interop.Ipc.JvmBridge.CallJavaMethod(Boolean isStatic, Object classNameOrJvmObjectReference, String methodName, Object[] args)
Unhandled exception. System.Exception: JVM method execution failed: Nonstatic method load failed for class 7 when called with no arguments
   at Microsoft.Spark.Interop.Ipc.JvmBridge.CallJavaMethod(Boolean isStatic, Object classNameOrJvmObjectReference, String methodName, Object[] args)
   at Microsoft.Spark.Interop.Ipc.JvmBridge.CallNonStaticJavaMethod(JvmObjectReference objectId, String methodName, Object[] args)
   at Microsoft.Spark.Interop.Ipc.JvmObjectReference.Invoke(String methodName, Object[] args)
   at Microsoft.Spark.Sql.Streaming.DataStreamReader.Load()
   at Asa.Cep.TradeValue.Processing.Program.Main(String[] args) in 
suhsteve commented 4 years ago

java.lang.ClassNotFoundException: org.apache.spark.sql.sources.v2.reader.SupportsScanUnsafeRow looks like you may be missing or have incompatible jars.

Looking at your log, you are using Spark 2.4.1. However the kafka jar you are using is org.apache.spark_spark-sql-kafka-0-10_2.11-2.3.2.jar which I think is supposed to be used with Spark 2.3.2. What was the spark-submit command you used ? If you aren't using it already, I would advise using the --packages option with spark-submit. Please try out the suggested in #341 as well.

suhsteve commented 4 years ago

@m-evazzadeh was this able to solve your issue ?

m-evazzadeh commented 4 years ago

Hi

it worked with change command --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.2 to --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.1

thanks @suhsteve