airscholar / e2e-data-engineering

An end-to-end data engineering pipeline that orchestrates data ingestion, processing, and storage using Apache Airflow, Python, Apache Kafka, Apache Zookeeper, Apache Spark, and Cassandra. All components are containerized with Docker for easy deployment and scalability.
https://www.youtube.com/watch?v=GqAcTrqKcrY
177 stars 74 forks source link

Connect with Kafka not able to form #4

Open choonhongyeoh0241 opened 7 months ago

choonhongyeoh0241 commented 7 months ago

Whenever we try to connect to kafka, we get this error: WARNING:root:kafka dataframe could not be created because: An error occurred while calling o36.load. : java.lang.NoClassDefFoundError: scala/$less$colon$less at org.apache.spark.sql.kafka010.KafkaSourceProvider.org$apache$spark$sql$kafka010$KafkaSourceProvider$$validateStreamOptions(KafkaSourceProvider.scala:338) at org.apache.spark.sql.kafka010.KafkaSourceProvider.sourceSchema(KafkaSourceProvider.scala:71) at org.apache.spark.sql.execution.datasources.DataSource.sourceSchema(DataSource.scala:233) at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo$lzycompute(DataSource.scala:118) at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo(DataSource.scala:118) at org.apache.spark.sql.execution.streaming.StreamingRelation$.apply(StreamingRelation.scala:35) at org.apache.spark.sql.streaming.DataStreamReader.loadInternal(DataStreamReader.scala:168) at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:144) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374) at py4j.Gateway.invoke(Gateway.java:282) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182) at py4j.ClientServerConnection.run(ClientServerConnection.java:106) at java.lang.Thread.run(Thread.java:750) Caused by: java.lang.ClassNotFoundException: scala.$less$colon$less at java.net.URLClassLoader.findClass(URLClassLoader.java:387) at java.lang.ClassLoader.loadClass(ClassLoader.java:418) at java.lang.ClassLoader.loadClass(ClassLoader.java:351) ... 20 more

Traceback (most recent call last): File "C:\Users\User\Desktop\Data Engineering\Data Engineering\spark_stream.py", line 143, in selection_df = create_selection_df_from_kafka(spark_df) File "C:\Users\User\Desktop\Data Engineering\Data Engineering\spark_stream.py", line 129, in create_selection_df_from_kafka sel = spark_df.selectExpr("CAST(value AS STRING)") \ AttributeError: 'NoneType' object has no attribute 'selectExpr'

VuKhoiGVM commented 6 months ago

hello, i had fixed that error successfully after 2 days. Here is my step:

  1. Check kafka version - it is base on confluentinc/cp-server image of broker, the version of Kafka used is typically bundled with the Confluent Platform version.
  2. Find the right version of jars package in the config of Spark Session at Maven, add it to: venv/lib/python3.11/site-packages/pyspark/jars
  3. Change config of jar file you just added in:

s_conn = SparkSession.builder \ .appName('SparkDataStreaming') \ .config('spark.jars.packages', "com.datastax.spark:spark-cassandra-connector_2.12:3.4.0," "org.apache.spark:spark-sql-kafka-0-10_2.12:3.4.0") \ .config('spark.cassandra.connection.host', 'localhost') \ .getOrCreate()

  1. Run spark-submit again and keyspace and table will be create sucessfully.

Hope it effective!

teddythinh commented 6 months ago

Hi there, the author has explained the missing packages in this part: https://youtu.be/GqAcTrqKcrY?si=QzgPjlC-RHMULUS0&t=4599

I believe it's still missing the kafka-clients package. You can add the jar file with two missing packages from @VuKhoiGVM's comment.

Ref: https://stackoverflow.com/a/71059689/17316050

Also please double-check the Spark, Cassandra, and Scala versions to download the compatible version or else it won't work.

ElNino9495 commented 5 months ago

I get the error even after adding the jar files

omursnck commented 3 months ago

I get the error even after adding the jar files

How did you manage to solve that issue?