from pyspark.sql import SparkSession
from pyspark.conf import SparkConf
from pyspark.sql.functions import
from pyspark.sql.streaming import
conf = SparkConf()
This is the code I've developed so far
When I run it using spark-submit I get the below log
It ends abruptly without printing any output.
Any help would be great!
from pyspark.sql import SparkSession from pyspark.conf import SparkConf from pyspark.sql.functions import from pyspark.sql.streaming import conf = SparkConf()
spark = SparkSession \ .builder \ .appName("Get Country Traffic") \ .master("yarn") \ .getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
spark.conf.set("spark.sql.shuffle.partitions", "2")
lines = spark \ .readStream \ .format("kafka") \ .option("kafka.bootstrap.servers", "wn01.itversity.com:6667, wn02.itversity.com:6667, wn03.itversity.com:6667") \ .option("subscribe", "retail-multis") \ .load() \ .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
lines \ .writeStream \ .queryName("country_traffic") \ .format("console") \ .trigger(processingTime='60 seconds') \ .start()
This is the code I've developed so far When I run it using spark-submit I get the below log It ends abruptly without printing any output. Any help would be great!