Open joques opened 5 hours ago
As a general rule, you can follow the official documentation with a few context-specific modifications. I don't have a proper setup to test it myself, but I'd start with this doc. According to it, first you need to add spark-sql-kafka
library to the Spark session. Something like this:
spark = SparkSession.builder()
.appName("...")
.master("spark://ip:7077")
.config("spark.jars", "/path/,/path/to/another.jar")
.getOrCreate()
then just follow the examples on the linked page (Python or Scala API):
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
.option("subscribe", "topic1") \
.load()
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
Please, let me know if it worked for you.
Hi all, how do I get Spark.jl to read a stream from and write to Kafka? I need help finding documentation on that.