Closed harshini98 closed 5 years ago
from pyspark.sql import SparkSession from pyspark.conf import SparkConf from pyspark.sql.functions import from pyspark.sql.streaming import conf = SparkConf()
spark = SparkSession \ .builder \ .appName("Get Country Traffic") \ .master("yarn") \ .getOrCreate()
spark.sparkContext.setLogLevel("DEBUG")
spark.conf.set("spark.sql.shuffle.partitions", "2")
lines = spark \ .readStream \ .format("kafka") \ .option("kafka.bootstrap.servers", "wn01.itversity.com:6667, wn02.itversity.com:6667, wn03.itversity.com:6667") \ .option("subscribe", "retail-multis") \ .load() \ .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
lines \ .writeStream \ .queryName("country_traffic") \ .format("console") \ .trigger(processingTime='60 seconds') \ .start()
from pyspark.sql import SparkSession from pyspark.conf import SparkConf from pyspark.sql.functions import from pyspark.sql.streaming import conf = SparkConf()
spark = SparkSession \ .builder \ .appName("Get Country Traffic") \ .master("yarn") \ .getOrCreate()
spark.sparkContext.setLogLevel("DEBUG")
spark.conf.set("spark.sql.shuffle.partitions", "2")
lines = spark \ .readStream \ .format("kafka") \ .option("kafka.bootstrap.servers", "wn01.itversity.com:6667, wn02.itversity.com:6667, wn03.itversity.com:6667") \ .option("subscribe", "retail-multis") \ .load() \ .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
lines \ .writeStream \ .queryName("country_traffic") \ .format("console") \ .trigger(processingTime='60 seconds') \ .start()