Closed mehulsarang closed 4 years ago
Hello, Where do you put this kafka_conf
? Because this is not Kafka configuration. This is configuration for Abris and Confluent Schema Registry Client.
I put that config with toConfluentAvro while writing kafka messages....
someData
.toConfluentAvro("d_sensor_normalized_test", "d_sensor_normalized_test","sensor.machine.data")(kafka_conf)
.write
.format("kafka")
Ok, that seems to be correct. java.lang.NoClassDefFoundError
usually mean that you are missing some dependencies. Probably Kafka? So I would check that.
@cerveada thank you so much. I am looking forward for your response I appreciate it.
One more additional info. I tried to use latest 3.1.1 version. It gives same issue.
Hi guys, just to add, @mehulsarang , do you have Kafka Clients in your pom
? This is the dependency containing the class org.apache.kafka.common.config.ConfigException
.
@felipemmelo I do have below kafka client.
I upgraded that to 2.4.0
org.apache.kafka kafka-clients 2.4.0
same error....
Could you provide the stack trace around the exception?
Stack Trace
java.lang.NoClassDefFoundError: org/apache/kafka/common/config/ConfigException
at my.test.datasolutions.normalize.streaming.SurfaceSensorWrite$.wrtieKafkaForNormalized(SurfaceSensorWrite.scala:156)
at my.test.datasolutions.normalize.streaming.SurfaceSensorWrite$.writeSensorDeltaForNormalized(SurfaceSensorWrite.scala:219)
at my.test.datasolutions.normalize.streaming.EquipmentNormalizationStreamingJob$.processKafkaBatch(EquipmentNormalizationStreamingJob.scala:120)
at my.test.datasolutions.normalize.streaming.EquipmentNormalizationStreamingJob$$anonfun$1.apply(EquipmentNormalizationStreamingJob.scala:71)
at my.test.datasolutions.normalize.streaming.EquipmentNormalizationStreamingJob$$anonfun$1.apply(EquipmentNormalizationStreamingJob.scala:71)
at org.apache.spark.sql.execution.streaming.sources.ForeachBatchSink.addBatch(ForeachBatchSink.scala:36)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$5$$anonfun$apply$17.apply(MicroBatchExecution.scala:569)
at org.apache.spark.sql.execution.SQLExecution$$anonfun$withCustomExecutionEnv$1.apply(SQLExecution.scala:112)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:241)
at org.apache.spark.sql.execution.SQLExecution$.withCustomExecutionEnv(SQLExecution.scala:98)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:171)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$5.apply(MicroBatchExecution.scala:567)
at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:263)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:61)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch(MicroBatchExecution.scala:566)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:208)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:176)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:176)
at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:263)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:61)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:176)
at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:170)
at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:296)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:208)
Caused by: java.lang.ClassNotFoundException: org.apache.kafka.common.config.ConfigException
at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
at com.databricks.backend.daemon.driver.ClassLoaders$LibraryClassLoader.loadClass(ClassLoaders.scala:151)
at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
... 25 more
20/03/23 16:36:32 INFO DatabricksStreamingQueryListener: Query termination received for
20/03/23 16:36:32 INFO QueryListener: Query terminated - 6095248d-9ead-4796-ac23-d5e6e2abeae4 at 2020-03-23_16:36 with exception - java.lang.NoClassDefFoundError: org/apache/kafka/common/config/ConfigException
at my.test.datasolutions.normalize.streaming.SurfaceSensorWrite$.wrtieKafkaForNormalized(SurfaceSensorWrite.scala:156)
at my.test.datasolutions.normalize.streaming.SurfaceSensorWrite$.writeSensorDeltaForNormalized(SurfaceSensorWrite.scala:219)
at my.test.datasolutions.normalize.streaming.EquipmentNormalizationStreamingJob$.processKafkaBatch(EquipmentNormalizationStreamingJob.scala:120)
at my.test.datasolutions.normalize.streaming.EquipmentNormalizationStreamingJob$$anonfun$1.apply(EquipmentNormalizationStreamingJob.scala:71)
at my.test.datasolutions.normalize.streaming.EquipmentNormalizationStreamingJob$$anonfun$1.apply(EquipmentNormalizationStreamingJob.scala:71)
at org.apache.spark.sql.execution.streaming.sources.ForeachBatchSink.addBatch(ForeachBatchSink.scala:36)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$5$$anonfun$apply$17.apply(MicroBatchExecution.scala:569)
at org.apache.spark.sql.execution.SQLExecution$$anonfun$withCustomExecutionEnv$1.apply(SQLExecution.scala:112)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:241)
at org.apache.spark.sql.execution.SQLExecution$.withCustomExecutionEnv(SQLExecution.scala:98)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:171)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$5.apply(MicroBatchExecution.scala:567)
at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:263)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:61)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch(MicroBatchExecution.scala:566)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:208)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:176)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:176)
at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:263)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:61)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:176)
at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:170)
at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:296)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:208)
Caused by: java.lang.ClassNotFoundException: org.apache.kafka.common.config.ConfigException
at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
at com.databricks.backend.daemon.driver.ClassLoaders$LibraryClassLoader.loadClass(ClassLoaders.scala:151)
at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
It is not related to ABRiS but instead to the connection between Kafka and Spark and/or Komatsu.
Do you you get the same error if you do not use ABRiS?
@felipemmelo I do not get error to connect to kafka if i do not use ABRIS. I am reading message from same kafka another topic, process that message and write out to another topic using ABRIS. I am able write messages when i don not use ABRIS. I get error only while using ABRIS. I am using abris to get that avro message using schema registry.
Which version of Spark are you using?
spark version is
@mehulsarang , two more questions then:
If you run the examples here do you get the same exception?
could you share your pom so that we can try to reproduce?
@felipemmelo i will try the sample and let you know soon.
Below is the POM file
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<java.version>1.8</java.version>
<scala.majorVersion>2.11</scala.majorVersion>
<scala.minorVersion>8</scala.minorVersion>
<spark.version>2.4.0</spark.version>
</properties>
<modelVersion>4.0.0</modelVersion>
<groupId>kafkatest</groupId>
<artifactId>Kafka-test</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<scope>test</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.logging.log4j/log4j-core -->
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>2.11.1</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql-kafka-0-10_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.1.0</version>
</dependency>
<dependency>
<groupId>com.databricks</groupId>
<artifactId>spark-avro_2.11</artifactId>
<version>4.0.0</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.majorVersion}.${scala.minorVersion}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.github.benfradet/spark-kafka-writer -->
<dependency>
<groupId>com.github.benfradet</groupId>
<artifactId>spark-kafka-writer_2.11</artifactId>
<version>0.4.0</version>
</dependency>
<dependency>
<groupId>za.co.absa</groupId>
<artifactId>abris_2.11</artifactId>
<version>2.2.2</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_${scala.majorVersion}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.majorVersion}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.majorVersion}</artifactId>
<version>${spark.version}</version>
</dependency>
</dependencies>
<build>
<sourceDirectory>src/main/scala</sourceDirectory>
<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.7.0</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>4.0.1</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
<configuration>
<jvmArgs>
<jvmArg>-Xms64m</jvmArg>
<jvmArg>-Xmx1024m</jvmArg>
</jvmArgs>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>2.2-beta-5</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<archive>
<manifest>
<mainClass>test.App</mainClass>
</manifest>
</archive>
</configuration>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.18.1</version>
<configuration>
<useFile>false</useFile>
<disableXmlReport>true</disableXmlReport>
<!-- If you have classpath issue like NoDefClassError,... -->
<!-- useManifestOnlyJar>false</useManifestOnlyJar -->
<includes>
<include>**/*Test.*</include>
<include>**/*Suite.*</include>
</includes>
</configuration>
</plugin>
</plugins>
</build>
Hi @mehulsarang ,
I tried to run using your pom
and found 2 issues:
`
`
Hope it helps.
now i moved to updated version 3.1.1 and using correct avro version 1.8.2.
now the error is java.lang.IllegalArgumentException: Invalid strategy: key.schema.naming.strategy
schema is as below,
{ "subject": "d_test-value", "version": 5, "id": 165, "schema": { "type": "record", "name": "sdata", "namespace": "test.solution", "fields": [ { "name": "asset_id", "type": "string" }, { "name": "datetime_utc", "type": "long" }, { "name": "name", "type": "string" }, { "name": "value", "type": "string" }, { "name": "index", "type": "string" }, { "name": "iana_timezone", "type": "string" }, { "name": "file_name", "type": "string" }, { "name": "data_source", "type": "string" }, { "name": "datetime", "type": "long" }, { "name": "sdatetime", "type": "long" }, { "name": "mname", "type": "string" }, { "name": "calculate_value", "type": "string" }, { "name": "data_type", "type": "string" }, { "name": "asset_type", "type": "string" } ] } }
Hi @mehulsarang , as stated in the documentation, you have to inform a naming strategy, but you're passing the key name as both, key and value.
If you provide any of these it will work:
SchemaManager.SchemaStorageNamingStrategies.{TOPIC_NAME, RECORD_NAME, TOPIC_RECORD_NAME}
@felipemmelo thank you so much sir. You helped a lot. ...I
@felipemmelo @cerveada
Going back to the original error "org/apache/kafka/common/config/ConfigException"
Error shows up when i run Abris 3.1.1 version as provided. After that i added latest org.apache.spark:spark-streaming-kafka-0-10_2.11:2.4.5 to databricks cluster directly and error get resolved.
Abris is using org.apache.spark:spark-streaming-kafka-0-10_2.11:2.4.4 version. I tried to override it by providing as dependency in POM file while building jar but it is not overriding.
Not sure exact issue between org.apache.spark:spark-streaming-kafka-0-10_2.11:2.4.5 and org.apache.spark:spark-streaming-kafka-0-10_2.11:2.4.4.
Not working correctly on databricks.
Hi @mehulsarang , I could not reproduce your error. Tried the version you mentioned and everything worked as expected. Here is my pom.
`
<dependencies>
<!-- Avro -->
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>1.8.2</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<scope>test</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.logging.log4j/log4j-core -->
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>2.11.1</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql-kafka-0-10_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.1.0</version>
</dependency>
<dependency>
<groupId>com.databricks</groupId>
<artifactId>spark-avro_2.11</artifactId>
<version>4.0.0</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.majorVersion}.${scala.minorVersion}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.github.benfradet/spark-kafka-writer -->
<dependency>
<groupId>com.github.benfradet</groupId>
<artifactId>spark-kafka-writer_2.11</artifactId>
<version>0.4.0</version>
</dependency>
<dependency>
<groupId>za.co.absa</groupId>
<artifactId>abris_2.11</artifactId>
<version>3.1.1</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_${scala.majorVersion}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.majorVersion}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.majorVersion}</artifactId>
<version>${spark.version}</version>
</dependency>
</dependencies>
<build>
<sourceDirectory>src/main/scala</sourceDirectory>
<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.7.0</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>4.0.1</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
<configuration>
<jvmArgs>
<jvmArg>-Xms64m</jvmArg>
<jvmArg>-Xmx1024m</jvmArg>
</jvmArgs>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>2.2-beta-5</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<archive>
<manifest>
<mainClass>test.App</mainClass>
</manifest>
</archive>
</configuration>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.18.1</version>
<configuration>
<useFile>false</useFile>
<disableXmlReport>true</disableXmlReport>
<!-- If you have classpath issue like NoDefClassError,... -->
<!-- useManifestOnlyJar>false</useManifestOnlyJar -->
<includes>
<include>**/*Test.*</include>
<include>**/*Suite.*</include>
</includes>
</configuration>
</plugin>
</plugins>
</build>
`
Closing for now, feel free to reopen.
Getting error ConfigException at line SchemaManager.PARAM_SCHEMA_REGISTRY_URL. Kafka URL is correct. Not sure what is going wrong.