Open boycode opened 6 years ago
Hi Vishnu, It will be good if you can share more details. 1) how spark streaming is started? I am assuming it's a spark job. 2) Could not understand Snappydata setup. What do you mean by concurrency? Are you using explicit threads? As a general practice each thread should create it's own SnappySession.
Some more details would be helpful.
On Tue, May 8, 2018, 7:27 PM Vishnu Sankar notifications@github.com wrote:
We are running spark streaming in distributed mode with multiple threads on Snappy instance running on local mode - using smart connector node.
The idea is keeping some lookup data in Snappy memory for spark streaming job - It is running well and good in Spark local mode - but when the concurrency and distribution increase on the smart connector mode there is lot of inconsistent behavior - sometimes it saves the data /sometimes it fails with SQL exception .
We use spark context - all operations between snappy and spark are done using the DF.
Do we have any best practices for similar scenario?
Please help.
— You are receiving this because you are subscribed to this thread. Reply to this email directly, view it on GitHub https://github.com/SnappyDataInc/snappydata/issues/1028, or mute the thread https://github.com/notifications/unsubscribe-auth/AAwcry-rtysDAUrgEqZRHN0F1jC3khWcks5twaRGgaJpZM4T2sXU .
Hi Rishi,
Please see the details:
1) how spark streaming is started? I am assuming it's a spark job.
/opt/spark/spark-2.1.1-bin-hadoop2.7/bin/spark-submit --master local[*] --conf spark.snappydata.connection=10.76.99.41:1527 --jars /opt/snappyData/snappydata-1.0.0-bin/snappydata-1.0.0-s_2.11.jar,/home/snappydata/lkupsample/spark-streaming-kafka-0-8-assembly_2.11-2.1.1.jar,/home/snappydata/lkupsample/spark-cassandra-connector_2.11-2.0.7.jar,/home/snappydata/lkupsample/jsr166e-1.1.0.jar /home/snappydata/lkupsample/sample.py > lkupdata.log 2 > 1
2) Could not understand Snappydata setup
Single node setup - local mode.
3) Thread means - Spark Threads for Snappy Context. - here is the sample code
Here is the code `#!/usr/bin/python from future import print_function import sys from pyspark.context import SparkContext from pyspark.sql import SQLContext from pyspark.sql import Row from pyspark.streaming import StreamingContext from pyspark.streaming.kafka import KafkaUtils from pyspark.conf import SparkConf from cassandra import ConsistencyLevel from cassandra.cluster import Cluster from cassandra.query import SimpleStatement from pyspark.streaming.flume import FlumeUtils from pyspark.sql.functions import monotonically_increasing_id from pyspark.sql.functions import lit,concat,concat_ws import time as tm from pyspark.sql.types import StringType from pyspark.sql.types import IntegerType from pyspark.sql.types import LongType from pyspark.sql.types import DoubleType from pyspark.sql.types import FloatType from pyspark.sql.types import StructType from pyspark.sql.types import StructField from pyspark.sql import Row import uuid import pyspark.sql.functions import random import logging import requests from requests.auth import HTTPBasicAuth import json import traceback from pyspark.sql import SparkSession log = logging.getLogger() log.setLevel('DEBUG') handler = logging.StreamHandler() handler.setFormatter(logging.Formatter("%(asctime)s [%(levelname)s] %(name)s: %(message)s")) log.addHandler(handler) prototype = StructType([StructField('con_id',StringType(), True),StructField('pce_nr',StringType(), True),StructField('xsf_id',StringType(), True),StructField('xss_id',StringType(), True),StructField('xsg_id',StringType(), True),StructField('xsd_id',StringType(), True),StructField('pcs_event_lt',StringType(), True),StructField('bul_csys_id_occ',StringType(), True),StructField('bul_csys_id_orig',StringType(), True),StructField('bul_csys_id_dest',StringType(), True),StructField('statuscd',StringType(), True),StructField('uid',StringType(), True)]) sourceschema = StructType([StructField('raw_group_id',StringType(),True),StructField('raw_created_time',StringType(),True),StructField('source_data',StringType(),True)])
errorschema = StructType([StructField('er_group_id',StringType(),True),StructField('er_time',StringType(),True),StructField('er_window',StringType(),True),StructField('er_rule',StringType(),True),StructField('error_message',StringType(),True),StructField('source_data',StringType(),True)])
def getSqlContextInstance(sparkContext): if ('sparkSessionSingletonInstance' not in globals()): sqlContext = SparkSession.builder.config(conf=sparkContext.getConf()).getOrCreate()
globals()['sparkSessionSingletonInstance'] = sqlContext
return globals()['sparkSessionSingletonInstance']
def createAuditTables(): cluster_Cassandra = Cluster(['xx41','xx42','xx43']) session_Cassandra = cluster_Cassandra.connect() session_Cassandra.execute(" CREATE KEYSPACE IF NOT EXISTS audit_consignementstatuspipeline WITH replication = {'class': 'NetworkTopologyStrategy', 'DC1': '3' } AND durable_writes = true " ) session_Cassandra.execute(" CREATE TABLE IF NOT EXISTS audit_consignementstatuspipeline.tbl_raw_source (raw_group_id text,raw_created_time timestamp,source_data text,PRIMARY KEY(raw_created_time,source_data))" ) session_Cassandra.execute("CREATE INDEX IF NOT EXISTS ON audit_consignementstatuspipeline.tbl_raw_source(raw_group_id)" ) session_Cassandra.execute("CREATE TABLE IF NOT EXISTS audit_consignementstatuspipeline.tbl_error_info (er_group_id text,er_time timestamp,er_window text,er_rule text,error_message text,source_data text,PRIMARY KEY(er_time,source_data))" ) session_Cassandra.execute("CREATE INDEX IF NOT EXISTS ON audit_consignementstatuspipeline.tbl_error_info(er_group_id)" )
def createTables(): cluster_Elassandra = Cluster(['XX.1','XX42']) session_Elassandra = cluster_Elassandra.connect() session_Elassandra.execute(" CREATE KEYSPACE IF NOT EXISTS consignment_status_ks WITH replication = { 'class': 'NetworkTopologyStrategy', 'DC1': '2' } AND durable_writes = true " ) session_Elassandra.execute(" CREATE TABLE IF NOT EXISTS consignment_status_ks.consignment_status_tbl (con_id text,pce_nr text,xsf_id text,xss_id text,xsg_id text,xsd_id text,pcs_event_lt timestamp,bul_csys_id_occ text,bul_csys_id_orig text,bul_csys_id_dest text,statuscd text, PRIMARY KEY(con_id) ) with default_time_to_live =86400" ) session_Elassandra.execute(" CREATE INDEX IF NOT EXISTS ON consignment_status_ks.consignment_status_tbl ( statuscd ) ")
def savesourcedata(rdd,sourcetable): if(rdd.count() > 0): try: sqlContext = getSqlContextInstance(rdd.context) raw_created_time = tm.strftime("%Y-%m-%d %H:%M:%S", tm.localtime()) raw_group_id = str(uuid.uuid4()) rawRDD = rdd.map(lambda value: Row(raw_group_id,raw_created_time,value)) sourcedf = sqlContext.createDataFrame(rawRDD,sourceschema) sourcedf.write.format("org.apache.spark.sql.cassandra").mode('append').options(table=sourcetable, keyspace="audit_consignementstatuspipeline").save()
except Exception as e:
print("Exception occurred while fetching raw data ")
print(e.message)
finally:
print("Final loop")
def saveerrordata(rdd,group_id,error_message,pr_window,pr_rule): if(rdd.count() > 0): try: sqlContext = getSqlContextInstance(rdd.context) er_time = tm.strftime("%Y-%m-%d %H:%M:%S", tm.localtime()) errorRDD = rdd.map(lambda dt: Row(group_id,er_time,pr_window,pr_rule,error_message,str(dt[0])+ ',' +str(dt[1])+ ',' +str(dt[2])+ ',' +str(dt[3])+ ',' +str(dt[4])+ ',' +str(dt[5])+ ',' +str(dt[6])+ ',' +str(dt[7])+ ',' +str(dt[8])+ ',' +str(dt[9])+ ',' +str(dt[10]))) errordf = sqlContext.createDataFrame(errorRDD, errorschema) errordf.write.format("org.apache.spark.sql.cassandra").mode('append').options(table="tbl_error_info", keyspace="audit_consignementstatuspipeline").save()
except Exception as e:
print("Exception occurred while saving error source data")
print(e.message)
finally:
print("Final loop")
def processfuncNoWindow(time,rdd): if(rdd.count() > 0): try: pr_group_id = str(uuid.uuid4())[:5] sqlContext = getSqlContextInstance(rdd.context) df0 = sqlContext.createDataFrame(rdd,prototype) df0.registerTempTable("consignment_status_inital_tbl") df1 = sqlContext.sql("select con_id,pce_nr,xsf_id,xss_id,xsg_id,xsd_id,pcs_event_lt,bul_csys_id_occ,bul_csys_id_orig,bul_csys_id_dest,statuscd from consignment_status_inital_tbl") df1.registerTempTable("consignment_status_tbl") df1.write.format("org.apache.spark.sql.cassandra").mode('append').options(table="consignment_status_tbl", keyspace="consignment_status_ks").save()
except Exception as e:
print("Exception occurred while processing")
print(e.message)
finally:
print("Final loop")
if name == "main": log.info("Start Application...") sinkNode = "xx41,xx42,xx43,xx44,xx45" execMode = "yarn-cluster" appName = "ConsignementStatusPipeline" zkQuorum = "xx41:2181,xx42:2181,xx43:2181" kafkaTopic = "sdf" kafkaGroup = "asdf" numThread = 1 conf = SparkConf() conf.setMaster(execMode) conf.setAppName(appName) conf.set("spark.cassandra.connection.host", sinkNode) conf.set("spark.driver.memory", "4g") conf.set("spark.executor.memory", "8g") conf.set("spark.cores.max", "10") conf.set("spark.executor.cores", "5") conf.set("spark.streaming.stopGracefullyOnShutdown", "true") conf.set("spark.cassandra.output.concurrent.writes", "200") conf.set("output.batch.grouping.buffer.size", "2000") conf.set("spark.streaming.concurrentJobs", "20") conf.set("spark.streaming.receiver.maxRate", "500") conf.set("spark.scheduler.mode", "FAIR") conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") conf.set("spark.streaming.receiver.writeAheadLog.enable", "true") conf.set("spark.streaming.backpressure.enabled", "true") conf.set("spark.logConf", "true") conf.set("spark.streaming.backpressure.initialRate", "500") conf.set("spark.streaming.kafka.maxRatePerPartition", "25") conf.set("spark.driver.extraJavaOptions", "-XX:+UseG1GC") conf.set("spark.executor.extraJavaOptions", "-XX:+UseG1GC") conf.set("spark.executor.logs.rolling.strategy", "time") conf.set("spark.executor.logs.rolling.time.interval", "daily") conf.set("spark.executor.logs.rolling.maxRetainedFiles", "10") conf.set("spark.streaming.unpersist", "true") conf.set("spark.cleaner.ttl", "3600") conf.set("spark.executor.instances", "5") conf.set("spark.yarn.executor.memoryOverhead", "8192")
sc = SparkContext(conf=conf)
ssc = StreamingContext(sc, 5)
ssc.checkpoint("/rtdest/checkpoint/ConsignementStatusPipeline")
log.info("Creating tables...")
createAuditTables()
createTables()
log.info("Creating elastic search indices...")
createelasticindex('consignementstatuspipeline_rawindex','audit_consignementstatuspipeline','tbl_raw_source','raw_')
createelasticindex('consignementstatuspipeline_erindex','audit_consignementstatuspipeline','tbl_error_info','er_')
log.info("Creating Data Sources...")
creategrafanadatasource('consignementstatuspipeline_raw_source','consignementstatuspipeline_rawindex','raw_created_time')
creategrafanadatasource('consignementstatuspipeline_er_source','consignementstatuspipeline_erindex','er_time')
log.info("Creating Summary Dashboards...")
creategrsummaryboard('consignementstatuspipeline-transmitted1','Transmitted','consignementstatuspipeline_raw_source','raw_group_id','raw_created_time','','')
log.info("Creating Throughput Dashboards...")
log.info("Start Streaming...")
kafkaStream = KafkaUtils.createStream(ssc, zkQuorum, kafkaGroup, {kafkaTopic: numThread})
auditStream = kafkaStream.map(lambda value: value[1])
auditStream.foreachRDD(lambda rdd:savesourcedata(rdd,'tbl_raw_source'))
parsedStream = kafkaStream.map(lambda csv : csv[1]).map(lambda row:row.split(",")).map(lambda dt : Row(dt[0],dt[1],dt[2],dt[3],dt[4],dt[5],dt[6],dt[7],dt[8],dt[9],dt[10],str(uuid.uuid4())))
parsedStream.foreachRDD(processfuncNoWindow)
ssc.start()
ssc.awaitTermination()
`
more clearly - in Smart Connector mode
here we have set window period of 5 sec. this will work for smaller data stream for short interval , but when the volume of data stream increases - it will get into Spark Queue - If the job is queued it will the snappy session is becoming inconsistent in processing and saving the data to Cassandra.
Will upload a simple piece of code . Sorry uploading the huge code.
Thanks, Vishnu
Hi Vishnu,
Let me try to understand.
1.) You have an spark cluster running streaming jobs. 2.) You have an independent SnappyData cluster connecting to that Spark cluster using the Smart Connector. The streaming jobs persist temporary data to SnappyData. 3.) You have an independent Cassandra cluster SnappyData writes the temporary data out to
And the problem you're encountering is that when the volume of data on the streaming job increases, SnappyData starts have problems persisting to Cassandra (that is, it is working at low volumes)?
Hi Pierce ,
Yes absolutely right . on Point 2, a snappy table is kept as look up information table for the streaming job, let say have some dimension information for the transaction data in spark streaming job.
Hi Vishnu, Maybe I am still missing something. But yours is a classic SnappyData use case. Let me put my understanding. Correct me if I am wrong.+
a) SnappyData cluster is running in local mode. Some tables are created in SD. Row or column?
b) A streaming app written in Python is using SnappyData store (Launched in step 1) for lookup. The python code shared by you does not create a SnappySession. You can check http://snappydatainc.github.io/snappydata/howto/use_python_to_create_tables_and_run_queries/ to how to use SnappySession in python.
c) Your streaming app can refer to SD data in parallel and concurrently. ( I am assuming you mean multiple tasks spawn for the streaming job). snappyDF = snappySession.table("t1") e.g. kafkaDF.join(snappyDF, "col1 = col2", "Inner/SemiJoin) etc.
Hi Rishi/Pierce
Please see the sample steps we followed to build this application - basically we are using smart connector mode of snappy with Spark Streaming python code to Save data into cassandra after doing some join operation with preloaded lookup data in the snappy
**#Steps followed:
snappysc = SnappySession(sc) snappysc.sql("CREATE ...TableLKUP USING ROW OPTIONS ( PARTITIONS BY ,'Col1',PERSISTENCE 'ASYNC')
#Step 2 - Reading Data from Kafka via Spark Streaming - and will be saved into snappy table - Step 3
dfStmData = sqlContext.sql("Select * from TableSTRMDATA ")
#Step3 - Snappy Table created saving the Streaming Data - from above step snappysc.sql("CREATE ...TableSTRMDATA USING ROW OPTIONS ( PARTITIONS BY ,'Col1',PERSISTENCE 'ASYNC')
#Step4 - Streaming Data writing to Snappy Table
snappydf = snappysc.createDataFrame(dfStmData .rdd, dfStmData .schema) snappydf .write.format("row").mode('append').saveAsTable("TableSTRMSNAPPYDATA ") snappydfOut = snappydf .sql("Select * from TableSTRMSNAPPYDATA JOIN TableLKUP" )
finalDF = snappydfOut .toDF(*[c.lower() for c in snappydfOut .columns])
#Step5 - Saving the data to Cassandra .
FinalDF.write.format("org.apache.spark.sql.cassandra").mode('append').options(table="FINAL_TABLE", keyspace="KEY_FINAL").save()
The issue is happening at the Save Side .
Vishnu, I think the code you shared earlier is not what you described here. It will be good if you can share the code where SnappyData is started in connector mode.
Dear Rishi ,
The above code is the minimized version of the exact logic I am trying to build, please see the full code.
#!/usr/bin/python
from __future__ import print_function
import sys
from pyspark.context import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql import Row
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
from pyspark.conf import SparkConf
from cassandra import ConsistencyLevel
from cassandra.cluster import Cluster
from cassandra.query import SimpleStatement
from pyspark.streaming.flume import FlumeUtils
from pyspark.sql.functions import monotonically_increasing_id
from pyspark.sql.functions import lit,concat,concat_ws
import time as tm
from pyspark.sql.types import StringType
from pyspark.sql.types import IntegerType
from pyspark.sql.types import LongType
from pyspark.sql.types import DoubleType
from pyspark.sql.types import FloatType
from pyspark.sql.types import StructType
from pyspark.sql.types import StructField
from pyspark.sql import Row
import uuid
import pyspark.sql.functions
import random
import logging
import requests
from requests.auth import HTTPBasicAuth
import json
import traceback
from pyspark.sql import SparkSession
from pyspark.sql.snappy import *
from pyspark.sql.snappy import SnappySession
log = logging.getLogger()
log.setLevel('DEBUG')
handler = logging.StreamHandler()
handler.setFormatter(logging.Formatter("%(asctime)s [%(levelname)s] %(name)s: %(message)s"))
log.addHandler(handler)
prototype = StructType([StructField('service_provider_name',StringType(), True),StructField('service_provider_employee_id',StringType(), True),StructField('service_provider_employee_lastName',StringType(), True),StructField('service_provider_employee_firstname',StringType(), True),StructField('order_id',LongType(), True),StructField('route_id',StringType(), True),StructField('from_location',StringType(), True),StructField('current_location',StringType(), True),StructField('next_stop',StringType(), True),StructField('city',StringType(), True),StructField('zipcode',LongType(), True),StructField('stop_id',LongType(), True),StructField('stoptype',StringType(), True),StructField('stop_sequence',LongType(), True),StructField('profit_center_id',LongType(), True),StructField('profit_center_desc',StringType(), True),StructField('route_start_date',StringType(), True),StructField('route_end_date',StringType(), True),StructField('stop_miles',FloatType(), True),StructField('calculated_departure_time',StringType(), True),StructField('calculated_arrival_time',StringType(), True),StructField('scheduled_delivery_time',StringType(), True),StructField('actual_delivery_time',StringType(), True),StructField('job_delivery_status_eta',StringType(), True),StructField('job_delivery_status_actual',StringType(), True),StructField('uid1',StringType(), True),StructField('time_stamp',StringType(), True),StructField('uid',StringType(), True)])
sourceschema = StructType([StructField('raw_group_id',StringType(),True),StructField('raw_created_time',StringType(),True),StructField('source_data',StringType(),True)])
errorschema = StructType([StructField('er_group_id',StringType(),True),StructField('er_time',StringType(),True),StructField('er_window',StringType(),True),StructField('er_rule',StringType(),True),StructField('error_message',StringType(),True),StructField('source_data',StringType(),True)])
processschema = StructType([StructField('pr_group_id',StringType(),True),StructField('pr_time',StringType(),True),StructField('pr_window',StringType(),True),StructField('pr_rule',StringType(),True),StructField('pr_count',IntegerType(),True)])
def getSqlContextInstance(sparkContext):
if ('sparkSessionSingletonInstance' not in globals()):
sqlContext = SparkSession.builder.config(conf=sparkContext.getConf()).getOrCreate()
globals()['sparkSessionSingletonInstance'] = sqlContext
return globals()['sparkSessionSingletonInstance']
def createAuditTables():
cluster_Cassandra = Cluster(['X.X.X.41','X.X.X.42'])
session_Cassandra = cluster_Cassandra.connect()
session_Cassandra.execute(" CREATE KEYSPACE IF NOT EXISTS audit_nrt_realtime_pipeline_final WITH replication = {'class': 'NetworkTopologyStrategy', 'DC1': '3' } AND durable_writes = true " )
session_Cassandra.execute(" CREATE TABLE IF NOT EXISTS audit_nrt_realtime_pipeline_final.tbl_raw_source (raw_group_id text,raw_created_time timestamp,source_data text,PRIMARY KEY(raw_created_time,source_data))" )
session_Cassandra.execute("CREATE INDEX IF NOT EXISTS ON audit_nrt_realtime_pipeline_final.tbl_raw_source(raw_group_id)" )
session_Cassandra.execute("CREATE TABLE IF NOT EXISTS audit_nrt_realtime_pipeline_final.tbl_error_info (er_group_id text,er_time timestamp,er_window text,er_rule text,error_message text,source_data text,PRIMARY KEY(er_time,source_data))" )
session_Cassandra.execute("CREATE INDEX IF NOT EXISTS ON audit_nrt_realtime_pipeline_final.tbl_error_info(er_group_id)" )
session_Cassandra.execute("CREATE TABLE IF NOT EXISTS audit_nrt_realtime_pipeline_final.tbl_process_info (pr_group_id text,pr_time timestamp,pr_window text,pr_rule text,pr_count bigint,PRIMARY KEY(pr_group_id,pr_rule))" )
session_Cassandra.execute("CREATE INDEX IF NOT EXISTS ON audit_nrt_realtime_pipeline_final.tbl_process_info(pr_time)" )
def createTables():
cluster_Elassandra = Cluster(['X.X.X.41','X.X.X.42'])
session_Elassandra = cluster_Elassandra.connect()
session_Elassandra.execute(" CREATE KEYSPACE IF NOT EXISTS nrt_initial_raw WITH replication = { 'class': 'NetworkTopologyStrategy', 'DC1': '2' } AND durable_writes = true " )
session_Elassandra.execute(" CREATE TABLE IF NOT EXISTS nrt_initial_raw.nrt_initial_raw_tbl (time_stamp timestamp,uid text,service_provider_name text,service_provider_employee_id text,service_provider_employee_lastName text,service_provider_employee_firstname text,order_id bigint,route_id text,from_location text,current_location text,next_stop text,city text,zipcode bigint,stop_id bigint,stoptype text,stop_sequence bigint,profit_center_id bigint,profit_center_desc text,route_start_date text,route_end_date text,stop_miles float,calculated_departure_time timestamp,calculated_arrival_time timestamp,scheduled_delivery_time timestamp,actual_delivery_time timestamp,job_delivery_status_eta text,job_delivery_status_actual text, PRIMARY KEY(uid) ) with default_time_to_live =86400" )
session_Elassandra.execute(" CREATE INDEX IF NOT EXISTS ON nrt_initial_raw.nrt_initial_raw_tbl ( time_stamp ) ")
session_Elassandra.execute(" CREATE KEYSPACE IF NOT EXISTS nrt_raw_route_final WITH replication = { 'class': 'NetworkTopologyStrategy', 'DC1': '2' } AND durable_writes = true " )
session_Elassandra.execute(" CREATE TABLE IF NOT EXISTS nrt_raw_route_final.nrt_raw_route_final_tbl (status text,pickup_complete bigint,delivery_complete bigint,no_of_stop_complete bigint,driver text,time_stamp timestamp,uid text,route_id text,current_location text,next_stop text,profit_center_id bigint,profit_center_desc text,stop_miles float, PRIMARY KEY(uid) ) with default_time_to_live =86400" )
session_Elassandra.execute(" CREATE INDEX IF NOT EXISTS ON nrt_raw_route_final.nrt_raw_route_final_tbl ( time_stamp ) ")
session_Elassandra.execute(" CREATE KEYSPACE IF NOT EXISTS nrt_todays_route WITH replication = { 'class': 'NetworkTopologyStrategy', 'DC1': '2' } AND durable_writes = true " )
session_Elassandra.execute(" CREATE TABLE IF NOT EXISTS nrt_todays_route.nrt_todays_route_tbl (no_of_stop_completed bigint,delivery_completed bigint,pickup_completed bigint,status text,driver text,time_stamp timestamp,uid text,route_id text,current_location text,next_stop text,stop_miles float, PRIMARY KEY(uid) ) with default_time_to_live =86400" )
session_Elassandra.execute(" CREATE INDEX IF NOT EXISTS ON nrt_todays_route.nrt_todays_route_tbl ( time_stamp ) ")
session_Elassandra.execute(" CREATE KEYSPACE IF NOT EXISTS nrt_today_route_latest WITH replication = { 'class': 'NetworkTopologyStrategy', 'DC1': '2' } AND durable_writes = true " )
session_Elassandra.execute(" CREATE TABLE IF NOT EXISTS nrt_today_route_latest.nrt_today_route_latest_tbl (profit_center_desc text,profit_center_id bigint,status text,driver text,time_stamp timestamp,route_id text,current_location text,next_stop text, PRIMARY KEY(driver,route_id) ) with default_time_to_live =86400" )
session_Elassandra.execute(" CREATE INDEX IF NOT EXISTS ON nrt_today_route_latest.nrt_today_route_latest_tbl ( time_stamp ) ")
session_Elassandra.execute(" CREATE KEYSPACE IF NOT EXISTS nrt_today_route_status WITH replication = { 'class': 'NetworkTopologyStrategy', 'DC1': '2' } AND durable_writes = true " )
session_Elassandra.execute(" CREATE TABLE IF NOT EXISTS nrt_today_route_status.nrt_today_route_status_tbl (stop_miles float,profit_center_desc text,profit_center_id bigint,no_of_stop_completed bigint,delivery_completed bigint,pickup_completed bigint,status text,driver text,time_stamp timestamp,route_id text,current_location text,next_stop text, PRIMARY KEY(driver,route_id) ) with default_time_to_live =86400" )
session_Elassandra.execute(" CREATE INDEX IF NOT EXISTS ON nrt_today_route_status.nrt_today_route_status_tbl ( time_stamp ) ")
session_Elassandra.execute(" CREATE KEYSPACE IF NOT EXISTS nrt_todays_route_final WITH replication = { 'class': 'NetworkTopologyStrategy', 'DC1': '2' } AND durable_writes = true " )
session_Elassandra.execute(" CREATE TABLE IF NOT EXISTS nrt_todays_route_final.nrt_todays_route_final_tbl (stop_miles float,profit_center_desc text,profit_center_id bigint,no_of_stop_completed bigint,delivery_completed bigint,pickup_completed bigint,status text,driver text,time_stamp timestamp,route_id text,current_location text,next_stop text,total_delivery_count bigint,total_no_of_stop bigint,total_pickup_count bigint,total_projected_mile double, PRIMARY KEY(driver,route_id) ) with default_time_to_live =86400" )
session_Elassandra.execute(" CREATE INDEX IF NOT EXISTS ON nrt_todays_route_final.nrt_todays_route_final_tbl ( status ) ")
def createelasticindex(elindexname,keyspace,tablename,fieldflag):
rqurl = 'http://X.X.X.41.44:9200/'+elindexname
rqsettings = '''{"settings":{"keyspace":"'''+keyspace+'''"},"mappings":{"'''+tablename+'''":{"discover":"'''+fieldflag+'''.*"}}}'''
rqjson = rqsettings.replace("\n", "")
createstatus = requests.post(rqurl, data=rqjson)
def savesourcedata(rdd,sourcetable):
if(rdd.count() > 0):
try:
sqlContext = getSqlContextInstance(rdd.context)
raw_created_time = tm.strftime("%Y-%m-%d %H:%M:%S", tm.localtime())
raw_group_id = str(uuid.uuid4())
rawRDD = rdd.map(lambda value: Row(raw_group_id,raw_created_time,value))
sourcedf = sqlContext.createDataFrame(rawRDD,sourceschema)
sourcedf.write.format("org.apache.spark.sql.cassandra").mode('append').options(table=sourcetable, keyspace="audit_nrt_realtime_pipeline_final").save()
except Exception as e:
print("Exception occurred while fetching raw data %s"%e)
print(e.message)
finally:
print("Final loop")
def saveprocessinfo(rdd,pr_group_id,pr_window,pr_rule,pr_count):
if(rdd.count() > 0):
try:
sqlContext = getSqlContextInstance(rdd.context)
pr_time = tm.strftime("%Y-%m-%d %H:%M:%S", tm.localtime())
pr_info = [(pr_group_id, pr_time,pr_window,pr_rule, pr_count)]
processinfo = rdd.context.parallelize(pr_info)
processRDD = processinfo.map(lambda pr: Row(pr[0], pr[1], pr[2],pr[3],int(pr[4])))
processDF = sqlContext.createDataFrame(processRDD, processschema)
processDF.write.format("org.apache.spark.sql.cassandra").mode('append').options(table="tbl_process_info", keyspace="audit_nrt_realtime_pipeline_final").save()
except Exception as e:
print("Exception occurred while fetching process info %s"%e)
print(e.message)
finally:
print("Final loop")
def saveerrordata(rdd,group_id,error_message,pr_window,pr_rule):
if(rdd.count() > 0):
try:
sqlContext = getSqlContextInstance(rdd.context)
er_time = tm.strftime("%Y-%m-%d %H:%M:%S", tm.localtime())
errorRDD = rdd.map(lambda dt: Row(group_id,er_time,pr_window,pr_rule,error_message,str(dt[0])+ ',' +str(dt[1])+ ',' +str(dt[2])+ ',' +str(dt[3])+ ',' +str(dt[4])+ ',' +str(dt[5])+ ',' +str(dt[6])+ ',' +str(dt[7])+ ',' +str(dt[8])+ ',' +str(dt[9])+ ',' +str(dt[10])+ ',' +str(dt[11])+ ',' +str(dt[12])+ ',' +str(dt[13])+ ',' +str(dt[14])+ ',' +str(dt[15])+ ',' +str(dt[16])+ ',' +str(dt[17])+ ',' +str(dt[18])+ ',' +str(dt[19])+ ',' +str(dt[20])+ ',' +str(dt[21])+ ',' +str(dt[22])+ ',' +str(dt[23])+ ',' +str(dt[24])+ ',' +str(dt[25])+ ',' +str(dt[26])))
errordf = sqlContext.createDataFrame(errorRDD, errorschema)
errordf.write.format("org.apache.spark.sql.cassandra").mode('append').options(table="tbl_error_info", keyspace="audit_nrt_realtime_pipeline_final").save()
except Exception as e:
print("Exception occurred while saving error source data %s"%e)
print(e.message)
finally:
print("Final loop")
def creategrafanadatasource(dsrcname,elindexname,indexTimeField):
rqUrl ='http://X.X.X.41:3000/api/datasources'
elindexurl ='http://X.X.X.41:9200'
rqheader = {'Content-Type': 'application/json'}
datasourcetempplate = '''{"id":null,"orgId":1,"name": "'''+dsrcname+'''","type":"elasticsearch","typeLogoUrl":"public/app/plugins/datasource/elasticsearch/img/elasticsearch.svg","access":"proxy","url": "'''+elindexurl+'''","password":"","user":"","database": "'''+elindexname+'''","basicAuth":false,"isDefault":false,"jsonData":{"esVersion":2,"timeField": "'''+indexTimeField+'''"}}'''
sourcejson = datasourcetempplate.replace("\n", "")
sourcestatus = requests.post(rqUrl, data=sourcejson, auth=HTTPBasicAuth('audituser', 'audituser'), headers=rqheader)
def creategrsummaryboard(db_name,db_title,grdatasource,indexField,indexTimeField,ruleField,ruleName):
grafanaUrl ='http://X.X.X.41:3000/api/dashboards/db'
rqheader = {'Content-Type': 'application/json'}
if db_title == 'Processed' and ruleName!='':
summarytemplate ='''{"dashboard":{"__inputs":[{"name":"'''+db_title+''''","label":"'''+db_title+'''","description":"","type":"datasource","pluginId":"elasticsearch","pluginName":"Elasticsearch"}],"__requires":[{"type":"datasource","id":"elasticsearch","name":"Elasticsearch","version":"3.0.0"},{"type":"grafana","id":"grafana","name":"Grafana","version":"4.5.2"},{"type":"panel","id":"singlestat","name":"Singlestat","version":""}],"annotations":{"list":[]},"editable":true,"gnetId":null,"graphTooltip":0,"hideControls":false,"id":null,"links":[],"refresh":false,"rows":[{"collapse":false,"height":250,"panels":[{"cacheTimeout":null,"colorBackground":false,"colorValue":false,"colors":["rgba(50, 172, 45, 0.97)","rgb(41, 164, 48)","rgb(27, 180, 43)"],"datasource":"'''+grdatasource+'''","format":"none","gauge":{"maxValue":10000000,"minValue":0,"show":true,"thresholdLabels":false,"thresholdMarkers":true},"id":1,"interval":null,"links":[],"mappingType":1,"mappingTypes":[{"name":"value to text","value":1},{"name":"range to text","value":2}],"maxDataPoints":100,"nullPointMode":"connected","nullText":null,"postfix":"","postfixFontSize":"50%","prefix":"","prefixFontSize":"30%","rangeMaps":[{"from":"null","text":"N/A","to":"null"}],"span":12,"sparkline":{"fillColor":"rgba(31, 118, 189, 0.18)","full":false,"lineColor":"rgb(31, 120, 193)","show":false},"tableColumn":"","targets":[{"bucketAggs":[{"field":"'''+indexField+'''","id":"2","settings":{"interval":"auto","min_doc_count":0,"trimEdges":0},"type":"date_histogram"}],"dsType":"elasticsearch","metrics":[{"field":"'''+indexField+'''","id":"1","meta":{},"settings":{},"type":"sum"}],"query":"'''+ruleField+'''=\\"'''+ruleName+'''\\"","refId":"A","timeField":"'''+indexTimeField+'''"}],"thresholds":"10,20,30","title":"'''+db_title+'''''''","type":"singlestat","valueFontSize":"80%","valueMaps":[{"op":"=","text":"N/A","value":"null"}],"valueName":"total"}],"repeat":null,"repeatIteration":null,"repeatRowId":null,"showTitle":false,"title":"Dashboard Row","titleSize":"h6"}],"schemaVersion":14,"style":"dark","tags":[],"templating":{"list":[]},"time":{"from":"2018-01-15T09:32:26.137Z","to":"2018-01-19T09:32:26.763Z"},"timepicker":{"refresh_intervals":["5s","10s","30s","1m","5m","15m","30m","1h","2h","1d"],"time_options":["5m","15m","1h","6h","12h","24h","2d","7d","30d"]},"timezone":"","title":"'''+db_name+'''","version":4},"overwrite": true}'''
elif db_title == 'Rejected' and ruleName!='':
summarytemplate ='''{"dashboard":{"__inputs":[{"name":"'''+db_title+''''","label":"'''+db_title+'''","description":"","type":"datasource","pluginId":"elasticsearch","pluginName":"Elasticsearch"}],"__requires":[{"type":"datasource","id":"elasticsearch","name":"Elasticsearch","version":"3.0.0"},{"type":"grafana","id":"grafana","name":"Grafana","version":"4.5.2"},{"type":"panel","id":"singlestat","name":"Singlestat","version":""}],"annotations":{"list":[]},"editable":true,"gnetId":null,"graphTooltip":0,"hideControls":false,"id":null,"links":[],"refresh":false,"rows":[{"collapse":false,"height":250,"panels":[{"cacheTimeout":null,"colorBackground":false,"colorValue":false,"colors":["rgba(50, 172, 45, 0.97)","rgb(41, 164, 48)","rgb(27, 180, 43)"],"datasource":"'''+grdatasource+'''","format":"none","gauge":{"maxValue":10000000,"minValue":0,"show":true,"thresholdLabels":false,"thresholdMarkers":true},"id":1,"interval":null,"links":[],"mappingType":1,"mappingTypes":[{"name":"value to text","value":1},{"name":"range to text","value":2}],"maxDataPoints":100,"nullPointMode":"connected","nullText":null,"postfix":"","postfixFontSize":"50%","prefix":"","prefixFontSize":"30%","rangeMaps":[{"from":"null","text":"N/A","to":"null"}],"span":12,"sparkline":{"fillColor":"rgba(31, 118, 189, 0.18)","full":false,"lineColor":"rgb(31, 120, 193)","show":false},"tableColumn":"","targets":[{"bucketAggs":[{"field":"'''+indexField+'''","id":"2","settings":{"interval":"auto","min_doc_count":0,"trimEdges":0},"type":"date_histogram"}],"dsType":"elasticsearch","metrics":[{"field":"'''+indexField+'''","id":"1","meta":{},"settings":{},"type":"count"}],"query":"'''+ruleField+'''=\\"'''+ruleName+'''\\"","refId":"A","timeField":"'''+indexTimeField+'''"}],"thresholds":"10,20,30","title":"'''+db_title+'''''''","type":"singlestat","valueFontSize":"80%","valueMaps":[{"op":"=","text":"N/A","value":"null"}],"valueName":"total"}],"repeat":null,"repeatIteration":null,"repeatRowId":null,"showTitle":false,"title":"Dashboard Row","titleSize":"h6"}],"schemaVersion":14,"style":"dark","tags":[],"templating":{"list":[]},"time":{"from":"2018-01-15T09:32:26.137Z","to":"2018-01-19T09:32:26.763Z"},"timepicker":{"refresh_intervals":["5s","10s","30s","1m","5m","15m","30m","1h","2h","1d"],"time_options":["5m","15m","1h","6h","12h","24h","2d","7d","30d"]},"timezone":"","title":"'''+db_name+'''","version":4},"overwrite": true}'''
else:
summarytemplate ='''{"dashboard":{"__inputs":[{"name":"'''+db_title+''''","label":"'''+db_title+'''","description":"","type":"datasource","pluginId":"elasticsearch","pluginName":"Elasticsearch"}],"__requires":[{"type":"datasource","id":"elasticsearch","name":"Elasticsearch","version":"3.0.0"},{"type":"grafana","id":"grafana","name":"Grafana","version":"4.5.2"},{"type":"panel","id":"singlestat","name":"Singlestat","version":""}],"annotations":{"list":[]},"editable":true,"gnetId":null,"graphTooltip":0,"hideControls":false,"id":null,"links":[],"refresh":false,"rows":[{"collapse":false,"height":250,"panels":[{"cacheTimeout":null,"colorBackground":false,"colorValue":false,"colors":["rgba(50, 172, 45, 0.97)","rgb(41, 164, 48)","rgb(27, 180, 43)"],"datasource":"'''+grdatasource+'''","format":"none","gauge":{"maxValue":10000000,"minValue":0,"show":true,"thresholdLabels":false,"thresholdMarkers":true},"id":1,"interval":null,"links":[],"mappingType":1,"mappingTypes":[{"name":"value to text","value":1},{"name":"range to text","value":2}],"maxDataPoints":100,"nullPointMode":"connected","nullText":null,"postfix":"","postfixFontSize":"50%","prefix":"","prefixFontSize":"30%","rangeMaps":[{"from":"null","text":"N/A","to":"null"}],"span":12,"sparkline":{"fillColor":"rgba(31, 118, 189, 0.18)","full":false,"lineColor":"rgb(31, 120, 193)","show":false},"tableColumn":"","targets":[{"bucketAggs":[{"field":"'''+indexField+'''","id":"2","settings":{"interval":"auto","min_doc_count":0,"trimEdges":0},"type":"date_histogram"}],"dsType":"elasticsearch","metrics":[{"field":"'''+indexField+'''","id":"1","meta":{},"settings":{},"type":"count"}],"refId":"A","timeField":"'''+indexTimeField+'''"}],"thresholds":"10,20,30","title":"'''+db_title+'''''''","type":"singlestat","valueFontSize":"80%","valueMaps":[{"op":"=","text":"N/A","value":"null"}],"valueName":"total"}],"repeat":null,"repeatIteration":null,"repeatRowId":null,"showTitle":false,"title":"Dashboard Row","titleSize":"h6"}],"schemaVersion":14,"style":"dark","tags":[],"templating":{"list":[]},"time":{"from":"2018-01-15T09:32:26.137Z","to":"2018-01-19T09:32:26.763Z"},"timepicker":{"refresh_intervals":["5s","10s","30s","1m","5m","15m","30m","1h","2h","1d"],"time_options":["5m","15m","1h","6h","12h","24h","2d","7d","30d"]},"timezone":"","title":"'''+db_name+'''","version":4},"overwrite": true}'''
summaryjson = summarytemplate.replace("\n", "")
sbdstatus = requests.post(grafanaUrl, data=summaryjson, auth=HTTPBasicAuth('audituser', 'audituser'), headers=rqheader)
def creategrthroughput(db_name,db_title,grdatasource,indexTimeField,ruleField,ruleName):
grafanaUrl ='http://X.X.X.41:3000/api/dashboards/db'
rqheader = {'Content-Type': 'application/json'}
if ruleName == '':
throughputtempplate = '''{"dashboard":{"__inputs":[{"name":"'''+db_title+'''","label":"'''+db_title+'''","description":"","type":"datasource","pluginId":"elasticsearch","pluginName":"Elasticsearch"}],"__requires":[{"type":"datasource","id":"elasticsearch","name":"Elasticsearch","version":"3.0.0"},{"type":"grafana","id":"grafana","name":"Grafana","version":"4.5.2"},{"type":"panel","id":"graph","name":"Graph","version":""}],"annotations":{"list":[]},"editable":true,"gnetId":null,"graphTooltip":0,"hideControls":false,"id":null,"links":[],"rows":[{"collapse":false,"height":"250px","panels":[{"aliasColors":{},"bars":false,"dashLength":10,"dashes":false,"datasource":"'''+grdatasource+'''","fill":1,"id":1,"legend":{"avg":false,"current":false,"max":false,"min":false,"show":true,"total":false,"values":false},"lines":true,"linewidth":1,"links":[],"nullPointMode":"null","percentage":false,"pointradius":5,"points":false,"renderer":"flot","seriesOverrides":[],"spaceLength":10,"span":12,"stack":false,"steppedLine":false,"targets":[{"bucketAggs":[{"field":"'''+indexTimeField+'''","id":"2","settings":{"interval":"auto","min_doc_count":0,"trimEdges":0},"type":"date_histogram"}],"dsType":"elasticsearch","metrics":[{"field":"select field","id":"1","type":"count"}],"refId":"A","timeField":"'''+indexTimeField+''''"}],"thresholds":[],"timeFrom":null,"timeShift":null,"title":"'''+db_title+'''","tooltip":{"shared":true,"sort":0,"value_type":"individual"},"type":"graph","xaxis":{"buckets":null,"mode":"time","name":null,"show":true,"values":[]},"yaxes":[{"format":"short","label":null,"logBase":1,"max":null,"min":null,"show":true},{"format":"short","label":null,"logBase":1,"max":null,"min":null,"show":true}]}],"repeat":null,"repeatIteration":null,"repeatRowId":null,"showTitle":false,"title":"Dashboard Row","titleSize":"h6"}],"schemaVersion":14,"style":"dark","tags":[],"templating":{"list":[]},"time":{"from":"now-6h","to":"now"},"timepicker":{"refresh_intervals":["5s","10s","30s","1m","5m","15m","30m","1h","2h","1d"],"time_options":["5m","15m","1h","6h","12h","24h","2d","7d","30d"]},"timezone":"","title":"'''+db_name+'''","version":1},"overwrite": true}'''
else:
throughputtempplate = '''{"dashboard":{"__inputs":[{"name":"'''+db_title+'''","label":"'''+db_title+'''","description":"","type":"datasource","pluginId":"elasticsearch","pluginName":"Elasticsearch"}],"__requires":[{"type":"datasource","id":"elasticsearch","name":"Elasticsearch","version":"3.0.0"},{"type":"grafana","id":"grafana","name":"Grafana","version":"4.5.2"},{"type":"panel","id":"graph","name":"Graph","version":""}],"annotations":{"list":[]},"editable":true,"gnetId":null,"graphTooltip":0,"hideControls":false,"id":null,"links":[],"rows":[{"collapse":false,"height":"250px","panels":[{"aliasColors":{},"bars":false,"dashLength":10,"dashes":false,"datasource":"'''+grdatasource+'''","fill":1,"id":1,"legend":{"avg":false,"current":false,"max":false,"min":false,"show":true,"total":false,"values":false},"lines":true,"linewidth":1,"links":[],"nullPointMode":"null","percentage":false,"pointradius":5,"points":false,"renderer":"flot","seriesOverrides":[],"spaceLength":10,"span":12,"stack":false,"steppedLine":false,"targets":[{"bucketAggs":[{"field":"'''+indexTimeField+'''","id":"2","settings":{"interval":"auto","min_doc_count":0,"trimEdges":0},"type":"date_histogram"}],"dsType":"elasticsearch","metrics":[{"field":"select field","id":"1","type":"count"}],"query":"'''+ruleField+'''=\\"'''+ruleName+'''\\"","refId":"A","timeField":"'''+indexTimeField+''''"}],"thresholds":[],"timeFrom":null,"timeShift":null,"title":"'''+db_title+'''","tooltip":{"shared":true,"sort":0,"value_type":"individual"},"type":"graph","xaxis":{"buckets":null,"mode":"time","name":null,"show":true,"values":[]},"yaxes":[{"format":"short","label":null,"logBase":1,"max":null,"min":null,"show":true},{"format":"short","label":null,"logBase":1,"max":null,"min":null,"show":true}]}],"repeat":null,"repeatIteration":null,"repeatRowId":null,"showTitle":false,"title":"Dashboard Row","titleSize":"h6"}],"schemaVersion":14,"style":"dark","tags":[],"templating":{"list":[]},"time":{"from":"now-6h","to":"now"},"timepicker":{"refresh_intervals":["5s","10s","30s","1m","5m","15m","30m","1h","2h","1d"],"time_options":["5m","15m","1h","6h","12h","24h","2d","7d","30d"]},"timezone":"","title":"'''+db_name+'''","version":1},"overwrite": true}'''
throughputjson = throughputtempplate.replace("\n", "")
stpstatus = requests.post(grafanaUrl, data=throughputjson, auth=HTTPBasicAuth('audituser', 'audituser'), headers=rqheader)
def createLkupTbl(param_sc):
snappysn = SnappySession(param_sc)
snappysn.sql("DROP TABLE IF EXISTS nrt_lookup_tbl1")
snappysn.sql("CREATE TABLE IF NOT EXISTS nrt_lookup_tbl1(route_id VARCHAR(128) NOT NULL PRIMARY KEY,driver VARCHAR(128),time_stamp TIMESTAMP,total_delivery_count BIGINT,total_no_of_stop BIGINT,total_pickup_count BIGINT,total_projected_mile DOUBLE,uid STRING) USING ROW OPTIONS ( PARTITION_BY 'driver', PERSISTENCE 'ASYNC' )")
dflkup0 = snappysn.read.format("org.apache.spark.sql.cassandra").options(table="nrt_route_plan_aggr_tbl1", keyspace="nrt_route_plan_aggr1").load()
dflkup0.write.format("row").mode('append').saveAsTable("nrt_lookup_tbl1")
def processfuncNRTRouteAggrigateWindow(time,rdd):
if(rdd.count() > 0):
try:
pr_group_id = str(uuid.uuid4())[:5]
sqlContext = getSqlContextInstance(rdd.context)
sqlContext.sql("set spark.sql.caseSensitive=false")
df0 = sqlContext.createDataFrame(rdd,prototype)
df0.registerTempTable("nrt_initial_raw_tbl")
df1 = sqlContext.sql("select uid1 as uid,service_provider_name,service_provider_employee_id,service_provider_employee_lastName as service_provider_employee_lastname,service_provider_employee_firstname,order_id,route_id,from_location,current_location,next_stop,city,zipcode,stop_id,stoptype,stop_sequence,profit_center_id,profit_center_desc,route_start_date,route_end_date,stop_miles, calculated_departure_time, calculated_arrival_time,scheduled_delivery_time,case when actual_delivery_time IS NULL then NULL else actual_delivery_time end as actual_delivery_time,job_delivery_status_eta,job_delivery_status_actual,time_stamp from nrt_initial_raw_tbl")
df1.registerTempTable("nrt_initial_raw_tbl")
pr_window='NRTRouteAggrigateWindow'
br_rule1='NRTRawDataFromKafkaSrc'
br_count1= df1.count()
try:
saveprocessinfo(rdd,pr_group_id,pr_window,br_rule1,br_count1)
df1a = df1.toDF(*[c.lower() for c in df1.columns])
df1a.write.format("org.apache.spark.sql.cassandra").mode('append').options(table="nrt_initial_raw_tbl", keyspace="nrt_initial_raw").save()
except Exception as e:
saveerrordata(rdd, pr_group_id, "Exception in Rule Process:"+e.message, pr_window, br_rule1)
df2 = sqlContext.sql("select uid,route_id,current_location,next_stop,concat(service_provider_employee_firstname,service_provider_employee_lastname) as driver,case when stoptype IN ('P') AND actual_delivery_time IS NOT NULL then 1 else 0 end as pickup_complete,case when stoptype IN ('D') AND actual_delivery_time IS NOT NULL then 1 else 0 end as delivery_complete,case when actual_delivery_time IS NOT NULL then 1 else 0 end as no_of_stop_complete,case when scheduled_delivery_time >= actual_delivery_time then 'On Time' else 'Delayed' end as status,time_stamp,profit_center_id,profit_center_desc,stop_miles from nrt_initial_raw_tbl")
df2.registerTempTable("nrt_raw_route_final_tbl")
pr_window='NRTRouteAggrigateWindow'
br_rule2='NRTRawRouteRule'
br_count2= df2.count()
try:
saveprocessinfo(rdd,pr_group_id,pr_window,br_rule2,br_count2)
df2a = df2.toDF(*[c.lower() for c in df2.columns])
df2a.write.format("org.apache.spark.sql.cassandra").mode('append').options(table="nrt_raw_route_final_tbl", keyspace="nrt_raw_route_final").save()
except Exception as e:
saveerrordata(rdd, pr_group_id, "Exception in Rule Process:"+e.message, pr_window, br_rule2)
df3 = sqlContext.sql("select max(uid) as uid,max(route_id) as route_id,max(current_location) as current_location,max(next_stop) as next_stop,max(driver) as driver,sum(pickup_complete) as pickup_completed,sum(delivery_complete) as delivery_completed,sum(no_of_stop_complete) as no_of_stop_completed,max(status) as status,max(time_stamp) as time_stamp,sum(stop_miles) as stop_miles from nrt_raw_route_final_tbl group by route_id,driver")
df3.registerTempTable("nrt_todays_route_tbl")
pr_window='NRTRouteAggrigateWindow'
br_rule3='NRTRouteAggrigate'
br_count3= df3.count()
try:
saveprocessinfo(rdd,pr_group_id,pr_window,br_rule3,br_count3)
df3a = df3.toDF(*[c.lower() for c in df3.columns])
df3a.write.format("org.apache.spark.sql.cassandra").mode('append').options(table="nrt_todays_route_tbl", keyspace="nrt_todays_route").save()
except Exception as e:
saveerrordata(rdd, pr_group_id, "Exception in Rule Process:"+e.message, pr_window, br_rule3)
df4 = sqlContext.sql("select a.route_id as route_id,a.current_location as current_location,a.next_stop as next_stop,a.driver as driver,a.status as status,a.time_stamp as time_stamp,a.profit_center_id as profit_center_id,a.profit_center_desc as profit_center_desc from nrt_raw_route_final_tbl a where time_stamp in (select max(time_stamp) from nrt_raw_route_final_tbl b where b.route_id = a.route_id)")
df4.registerTempTable("nrt_today_route_latest_tbl")
pr_window='NRTRouteAggrigateWindow'
br_rule4='NRTRawRouteRule1'
br_count4= df4.count()
try:
saveprocessinfo(rdd,pr_group_id,pr_window,br_rule4,br_count4)
df4a = df4.toDF(*[c.lower() for c in df4.columns])
df4a.write.format("org.apache.spark.sql.cassandra").mode('append').options(table="nrt_today_route_latest_tbl", keyspace="nrt_today_route_latest").save()
except Exception as e:
saveerrordata(rdd, pr_group_id, "Exception in Rule Process:"+e.message, pr_window, br_rule4)
df5 = sqlContext.sql("select b.no_of_stop_completed as no_of_stop_completed,b.delivery_completed as delivery_completed,b.pickup_completed as pickup_completed,a.status as status,a.driver as driver,a.time_stamp as time_stamp, a.route_id as route_id ,a.current_location as current_location,a.next_stop as next_stop,a.profit_center_id as profit_center_id,a.profit_center_desc as profit_center_desc,b.stop_miles as stop_miles from nrt_today_route_latest_tbl a JOIN nrt_todays_route_tbl b ON a.route_id = b.route_id")
df5.registerTempTable("nrt_today_route_status_tbl")
pr_window='NRTRouteAggrigateWindow'
br_rule5='NRTRouteLocationRule'
br_count5= df5.count()
try:
saveprocessinfo(rdd,pr_group_id,pr_window,br_rule5,br_count5)
df5a = df5.toDF(*[c.lower() for c in df5.columns])
df5a.write.format("org.apache.spark.sql.cassandra").mode('append').options(table="nrt_today_route_status_tbl", keyspace="nrt_today_route_status").save()
except Exception as e:
saveerrordata(rdd, pr_group_id, "Exception in Rule Process:"+e.message, pr_window, br_rule5)
snappysn = SnappySession(rdd.context)
snappysn.sql("CREATE TABLE IF NOT EXISTS nrt_today_route_status_tbl" + pr_group_id + "(no_of_stop_completed BIGINT,delivery_completed BIGINT,pickup_completed BIGINT,status STRING,driver VARCHAR(128),time_stamp TIMESTAMP,route_id VARCHAR(128),current_location VARCHAR(128) NOT NULL PRIMARY KEY,next_stop STRING,profit_center_id BIGINT,profit_center_desc STRING,stop_miles DOUBLE) USING ROW OPTIONS ( PARTITION_BY 'route_id', PERSISTENCE 'ASYNC' )")
snappydf5 = snappysn.createDataFrame(df5.rdd, df5.schema)
snappydf5.write.format("row").mode('append').saveAsTable("nrt_today_route_status_tbl" + pr_group_id)
snappyout5 = snappysn.sql("select a.driver as driver,a.route_id as route_id,a.current_location as current_location,a.next_stop as next_stop,a.pickup_completed as pickup_completed,a.delivery_completed as delivery_completed,a.no_of_stop_completed as no_of_stop_completed,b.total_pickup_count as total_pickup_count,b.total_delivery_count as total_delivery_count,b.total_no_of_stop as total_no_of_stop,b.total_projected_mile as total_projected_mile,a.status as status,a.time_stamp as time_stamp,a.profit_center_id as profit_center_id,a.profit_center_desc as profit_center_desc,a.stop_miles as stop_miles from nrt_today_route_status_tbl" + pr_group_id + " a JOIN nrt_lookup_tbl1 b ON a.driver = b.driver")
df6 = snappyout5.toDF(*[c.lower() for c in snappyout5.columns])
df6.write.format("org.apache.spark.sql.cassandra").mode('append').options(table="nrt_todays_route_final_tbl", keyspace="nrt_todays_route_final").save()
sparkdf5 = sqlContext.createDataFrame(df6.rdd , df6.schema )
sparkdf5.registerTempTable("nrt_todays_route_final_tbl")
pr_window='NRTRouteAggrigateWindow'
br_rule6='NRTRouteFinalRule1'
br_count6= df6.count()
try:
saveprocessinfo(rdd,pr_group_id,pr_window,br_rule6,br_count6)
except Exception as e:
saveerrordata(rdd, pr_group_id, "Exception in Rule Process:"+e.message, pr_window, br_rule6)
except Exception as e:
print("Exception occurred while processing :%s"%e)
print(e.message)
finally:
print("Final loop")
if __name__ == "__main__":
log.info("Start Application...")
sinkNode = "X.X.X.41"
execMode = "yarn-cluster"
appName = "NRT_Realtime_Pipeline_Final"
zkQuorum = "X.X.X.41:2181,X.X.X.42:2181"
kafkaTopic = "XPONRTTopic3"
kafkaGroup = "NRT_Group_Final"
numThread = 1
conf = SparkConf()
conf.setMaster(execMode)
conf.setAppName(appName)
conf.set("spark.cassandra.connection.host", sinkNode)
conf.set("spark.driver.memory", "8g")
conf.set("spark.executor.memory", "8g")
conf.set("spark.cores.max", "30")
conf.set("spark.executor.cores", "5")
conf.set("spark.streaming.stopGracefullyOnShutdown", "true")
conf.set("spark.cassandra.output.concurrent.writes", "200")
conf.set("output.batch.grouping.buffer.size", "2000")
conf.set("spark.streaming.concurrentJobs", "20")
conf.set("spark.streaming.receiver.maxRate", "500")
conf.set("spark.scheduler.mode", "FAIR")
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
conf.set("spark.streaming.receiver.writeAheadLog.enable", "true")
conf.set("spark.streaming.backpressure.enabled", "true")
conf.set("spark.logConf", "true")
conf.set("spark.streaming.backpressure.initialRate", "500")
conf.set("spark.streaming.kafka.maxRatePerPartition", "25")
conf.set("spark.driver.extraJavaOptions", "-XX:+UseG1GC")
conf.set("spark.executor.extraJavaOptions", "-XX:+UseG1GC")
conf.set("spark.executor.logs.rolling.strategy", "time")
conf.set("spark.executor.logs.rolling.time.interval", "daily")
conf.set("spark.executor.logs.rolling.maxRetainedFiles", "10")
conf.set("spark.streaming.unpersist", "true")
conf.set("spark.cleaner.ttl", "3600")
conf.set("spark.executor.instances", "5")
conf.set("spark.yarn.executor.memoryOverhead", "8192")
conf.set("spark.snappydata.locator","X.X.X.41:1527")
conf.set("spark.snappydata.connection","X.X.X.41:1527")
conf.set("spark.cassandra.connection.host","X.X.X.41")
sc = SparkContext(conf=conf)
ssc = StreamingContext(sc, 10)
ssc.checkpoint("/rtdest/checkpoint/NRT_Realtime_Pipeline_Final")
log.info("Creating tables...")
createAuditTables()
createTables()
log.info("Creating elastic search indices...")
createelasticindex('nrt_realtime_pipeline_final_rawindex','audit_nrt_realtime_pipeline_final','tbl_raw_source','raw_')
createelasticindex('nrt_realtime_pipeline_final_erindex','audit_nrt_realtime_pipeline_final','tbl_error_info','er_')
createelasticindex('nrt_realtime_pipeline_final_prindex','audit_nrt_realtime_pipeline_final','tbl_process_info','pr_')
log.info("Creating Data Sources...")
creategrafanadatasource('nrt_realtime_pipeline_final_raw_source','nrt_realtime_pipeline_final_rawindex','raw_created_time')
creategrafanadatasource('nrt_realtime_pipeline_final_er_source','nrt_realtime_pipeline_final_erindex','er_time')
creategrafanadatasource('nrt_realtime_pipeline_final_pr_source','nrt_realtime_pipeline_final_prindex','pr_time')
log.info("Creating Summary Dashboards...")
creategrsummaryboard('nrt_realtime_pipeline_final-transmitted1','Transmitted','nrt_realtime_pipeline_final_raw_source','raw_group_id','raw_created_time','','')
log.info("Creating Throughput Dashboards...")
creategrthroughput('nrt_realtime_pipeline_final-nrtrouteaggrigatewindow-nrtrawdatafromkafkasrc-processed-throughput','Processed Throughput','nrt_realtime_pipeline_final_pr_source','pr_time','pr_rule','NRTRawDataFromKafkaSrc')
creategrthroughput('nrt_realtime_pipeline_final-nrtrouteaggrigatewindow-nrtrawdatafromkafkasrc-rejected-throughput','Rejected Throughput','nrt_realtime_pipeline_final_er_source','er_time','er_rule','NRTRawDataFromKafkaSrc')
creategrsummaryboard('nrt_realtime_pipeline_final-nrtrouteaggrigatewindow-nrtrawdatafromkafkasrc-processed','Processed','nrt_realtime_pipeline_final_pr_source','pr_count','pr_time','pr_rule','NRTRawDataFromKafkaSrc')
creategrsummaryboard('nrt_realtime_pipeline_final-nrtrouteaggrigatewindow-nrtrawdatafromkafkasrc-rejected','Rejected','nrt_realtime_pipeline_final_er_source','er_count','er_time','er_rule','NRTRawDataFromKafkaSrc')
creategrthroughput('nrt_realtime_pipeline_final-nrtrouteaggrigatewindow-nrtrawrouterule-processed-throughput','Processed Throughput','nrt_realtime_pipeline_final_pr_source','pr_time','pr_rule','NRTRawRouteRule')
creategrthroughput('nrt_realtime_pipeline_final-nrtrouteaggrigatewindow-nrtrawrouterule-rejected-throughput','Rejected Throughput','nrt_realtime_pipeline_final_er_source','er_time','er_rule','NRTRawRouteRule')
creategrsummaryboard('nrt_realtime_pipeline_final-nrtrouteaggrigatewindow-nrtrawrouterule-processed','Processed','nrt_realtime_pipeline_final_pr_source','pr_count','pr_time','pr_rule','NRTRawRouteRule')
creategrsummaryboard('nrt_realtime_pipeline_final-nrtrouteaggrigatewindow-nrtrawrouterule-rejected','Rejected','nrt_realtime_pipeline_final_er_source','er_count','er_time','er_rule','NRTRawRouteRule')
creategrthroughput('nrt_realtime_pipeline_final-nrtrouteaggrigatewindow-nrtrouteaggrigate-processed-throughput','Processed Throughput','nrt_realtime_pipeline_final_pr_source','pr_time','pr_rule','NRTRouteAggrigate')
creategrthroughput('nrt_realtime_pipeline_final-nrtrouteaggrigatewindow-nrtrouteaggrigate-rejected-throughput','Rejected Throughput','nrt_realtime_pipeline_final_er_source','er_time','er_rule','NRTRouteAggrigate')
creategrsummaryboard('nrt_realtime_pipeline_final-nrtrouteaggrigatewindow-nrtrouteaggrigate-processed','Processed','nrt_realtime_pipeline_final_pr_source','pr_count','pr_time','pr_rule','NRTRouteAggrigate')
creategrsummaryboard('nrt_realtime_pipeline_final-nrtrouteaggrigatewindow-nrtrouteaggrigate-rejected','Rejected','nrt_realtime_pipeline_final_er_source','er_count','er_time','er_rule','NRTRouteAggrigate')
creategrthroughput('nrt_realtime_pipeline_final-nrtrouteaggrigatewindow-nrtrawrouterule1-processed-throughput','Processed Throughput','nrt_realtime_pipeline_final_pr_source','pr_time','pr_rule','NRTRawRouteRule1')
creategrthroughput('nrt_realtime_pipeline_final-nrtrouteaggrigatewindow-nrtrawrouterule1-rejected-throughput','Rejected Throughput','nrt_realtime_pipeline_final_er_source','er_time','er_rule','NRTRawRouteRule1')
creategrsummaryboard('nrt_realtime_pipeline_final-nrtrouteaggrigatewindow-nrtrawrouterule1-processed','Processed','nrt_realtime_pipeline_final_pr_source','pr_count','pr_time','pr_rule','NRTRawRouteRule1')
creategrsummaryboard('nrt_realtime_pipeline_final-nrtrouteaggrigatewindow-nrtrawrouterule1-rejected','Rejected','nrt_realtime_pipeline_final_er_source','er_count','er_time','er_rule','NRTRawRouteRule1')
creategrthroughput('nrt_realtime_pipeline_final-nrtrouteaggrigatewindow-nrtroutelocationrule-processed-throughput','Processed Throughput','nrt_realtime_pipeline_final_pr_source','pr_time','pr_rule','NRTRouteLocationRule')
creategrthroughput('nrt_realtime_pipeline_final-nrtrouteaggrigatewindow-nrtroutelocationrule-rejected-throughput','Rejected Throughput','nrt_realtime_pipeline_final_er_source','er_time','er_rule','NRTRouteLocationRule')
creategrsummaryboard('nrt_realtime_pipeline_final-nrtrouteaggrigatewindow-nrtroutelocationrule-processed','Processed','nrt_realtime_pipeline_final_pr_source','pr_count','pr_time','pr_rule','NRTRouteLocationRule')
creategrsummaryboard('nrt_realtime_pipeline_final-nrtrouteaggrigatewindow-nrtroutelocationrule-rejected','Rejected','nrt_realtime_pipeline_final_er_source','er_count','er_time','er_rule','NRTRouteLocationRule')
creategrthroughput('nrt_realtime_pipeline_final-nrtrouteaggrigatewindow-nrtroutefinalrule1-processed-throughput','Processed Throughput','nrt_realtime_pipeline_final_pr_source','pr_time','pr_rule','NRTRouteFinalRule1')
creategrthroughput('nrt_realtime_pipeline_final-nrtrouteaggrigatewindow-nrtroutefinalrule1-rejected-throughput','Rejected Throughput','nrt_realtime_pipeline_final_er_source','er_time','er_rule','NRTRouteFinalRule1')
creategrsummaryboard('nrt_realtime_pipeline_final-nrtrouteaggrigatewindow-nrtroutefinalrule1-processed','Processed','nrt_realtime_pipeline_final_pr_source','pr_count','pr_time','pr_rule','NRTRouteFinalRule1')
creategrsummaryboard('nrt_realtime_pipeline_final-nrtrouteaggrigatewindow-nrtroutefinalrule1-rejected','Rejected','nrt_realtime_pipeline_final_er_source','er_count','er_time','er_rule','NRTRouteFinalRule1')
log.info("Creating Lookup Part...")
createLkupTbl(sc)
log.info("Start Streaming...")
kafkaStream = KafkaUtils.createStream(ssc, zkQuorum, kafkaGroup, {kafkaTopic: numThread})
auditStream = kafkaStream.map(lambda value: value[1])
auditStream.foreachRDD(lambda rdd:savesourcedata(rdd,'tbl_raw_source'))
parsedStream = kafkaStream.map(lambda csv : csv[1]).map(lambda row:row.split(",")).map(lambda dt : Row(dt[0],dt[1],dt[2],dt[3],long(dt[4]),dt[5],dt[6],dt[7],dt[8],dt[9],long(dt[10]),long(dt[11]),dt[12],long(dt[13]),long(dt[14]),dt[15],dt[16],dt[17],float(dt[18]),dt[19],dt[20],dt[21],dt[22],dt[23],dt[24],dt[25],dt[26],str(uuid.uuid4())))
processNRTRouteAggrigateWindow = parsedStream.window(600, 10)
processNRTRouteAggrigateWindow.foreachRDD(processfuncNRTRouteAggrigateWindow)
ssc.start()
ssc.awaitTermination()
Thanks, Vishnu, Now I get the whole picture. Code wise it all looks fine. Can you tell me what SQL exceptions you are getting? A couple of points. a) If you expect nrt_lookup_tbl1 to contain only static records and few records like less than 100K then you can make it as a replicated row table. Join with a replicated table is more efficient. If not then leave it as it is. b) The Snappy table nrt_today_route_status_tbl" + pr_group_id which is a continuously growing table can be made a column table. Also, if you decide to make the look up table as a partitioned table then you can partition this table also on "driver". The join keys being similar it will be more performant.
Hi Rishi,
The nrt_today_route_status_tbl +pr_group_id is not a growing table - it used for some transformation per spark sliding window and once it is done it will saves it to Cassandra.We are using the ROW approach here.
Right now we are able to stabilize the pipeline side - but the challenge it is not working as expected for lower batch interval ( Spark Streaming batch interval) .In detail, If we increase the Spark Sliding window to 30 sec / 40 Sec it is consistently saving the data , but for real time or near real time pipeline this is not much benefited. We need to have 1 sec or 2 sec as sliding window interval in the spark streaming side.
If I reduce the spark streaming sliding window for few seconds, Snappy is behaving inconsistently and we are not able solve this. I guess since the parallelism increases and volume of data increases - savetocassndra from Snappy is getting delayed or not able handle it properly.
We would like to know the best practices that can be applied in similar scenario.
18/05/09 16:24:43 INFO util.RackResolver: Resolved labcluster01node01.ad.company.com to /default-rack
18/05/09 16:24:43 INFO util.RackResolver: Resolved labcluster01node01.ad.company.com to /default-rack
18/05/09 16:24:43 INFO util.RackResolver: Resolved labcluster01node01.ad.company.com to /default-rack
18/05/09 16:24:43 ERROR scheduler.DAGScheduler: Failed to update accumulators for task 52
java.net.SocketException: Broken pipe (Write failed)
at java.net.SocketOutputStream.socketWrite0(Native Method)
at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:111)
at java.net.SocketOutputStream.write(SocketOutputStream.java:155)
at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140)
at java.io.DataOutputStream.flush(DataOutputStream.java:123)
at org.apache.spark.api.python.PythonAccumulatorV2.merge(PythonRDD.scala:915)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$updateAccumulators$1.apply(DAGScheduler.scala:1088)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$updateAccumulators$1.apply(DAGScheduler.scala:1080)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.updateAccumulators(DAGScheduler.scala:1080)
at org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:1183)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1647)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1605)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1594)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
18/05/09 16:24:43 ERROR scheduler.DAGScheduler: Failed to update accumulators for task 53
java.net.SocketException: Broken pipe (Write failed)
at java.net.SocketOutputStream.socketWrite0(Native Method)
at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:111)
at java.net.SocketOutputStream.write(SocketOutputStream.java:155)
at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)
at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:140)
at java.io.DataOutputStream.flush(DataOutputStream.java:123)
at org.apache.spark.api.python.PythonAccumulatorV2.merge(PythonRDD.scala:915)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$updateAccumulators$1.apply(DAGScheduler.scala:1088)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$updateAccumulators$1.apply(DAGScheduler.scala:1080)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
at org.apache.spark.scheduler.DAGScheduler.updateAccumulators(DAGScheduler.scala:1080)
at org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:1183)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1647)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1605)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1594)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
18/05/09 16:24:43 INFO util.RackResolver: Resolved labcluster01node02.ad.company.com to /default-rack
18/05/09 16:24:43 INFO util.RackResolver: Resolved labcluster01node02.ad.company.com to /default-rack
18/05/09 16:24:43 INFO util.RackResolver: Resolved labcluster01node01.ad.company.com to /default-rack
18/05/09 16:24:43 INFO util.RackResolver: Resolved labcluster01node01.ad.company.com to /default-rack
18/05/09 16:24:43 INFO util.RackResolver: Resolved labcluster01node03.ad.company.com to /default-rack
Hi all,
Any thought here or the above is a known issue?
Hi Vishnu, Was trying to simulate the issue. The problem is after the task execution. And looking at the code it appears it just logging an error, and should not have impacted the task execution. Yes, it is not able to update some accumulators which might show incorrect statistics. Can you please put some logs to identify which position these errors are coming. There are a lot of jobs( save to Cassandra, save to snappy, read from snappy etc) and it's not possible to identify which job is causing this. Some other guys have also reported a similar issue in Spark community https://issues.apache.org/jira/browse/SPARK-10924.
Hi Rishi,
Thanks for the update.
We did this logging - it is coming on savetocassandra - call.
Hi all ,
Any solution to this ?
@boycode have you found a solution in the end?
We are running spark streaming in distributed mode with multiple threads on Snappy instance running on local mode - using smart connector node.
The idea is keeping some lookup data in Snappy memory for spark streaming job - It is running well and good in Spark local mode - but when the concurrency and distribution increase on the smart connector mode there is lot of inconsistent behavior - sometimes it saves the data /sometimes it fails with SQL exception .
We use spark context - all operations between snappy and spark are done using the DF.
Do we have any best practices for similar scenario?
Please help.