scylladb / scylla-code-samples

Code samples for working with ScyllaDB
Apache License 2.0
243 stars 131 forks source link

Process is exiting without the error for the Strucutred Streaming to ScyllaDB data sending module #143

Closed KunwarAkanksha closed 3 months ago

KunwarAkanksha commented 4 years ago

Hello all I am new to this , I have read a Stream and then I want to select some fields and push that strucutred Stream to ScyllaDb please tell me where I am making a mistake.

import org.apache.spark.sql.functions. import org.apache.spark.sql.types.{DataTypes, IntegerType, StringType, StructType} import org.apache.spark.sql.SaveMode import org.apache.spark.sql.streaming.OutputMode import org.apache.spark.sql.{SQLContext, SparkSession} import org.apache.spark.sql.execution.streaming.Sink import org.apache.spark.sql.sources.StreamSinkProvider import org.apache.spark.sql.{functions => f, DataFrame} import org.apache.spark.sql.cassandra.

object StreamProcessor { def main(args: Array[String]): Unit = { new StreamProcessor().process() } }

class StreamProcessor {

def process(): Unit = { val spark = SparkSession.builder() .config("spark.eventLog.enabled", "false") // 3 lines included later .config("spark.driver.memory", "2g") .config("spark.executor.memory", "2g") .config("spark.sql.catalog.cass100", "com.datastax.spark.connector.datasource.CassandraCatalog") .config("spark.sql.catalog.cass100.spark.cassandra.connection.host", "192.168.144.23")
.appName("kafka-tutorials") .master("local[*]") .getOrCreate()

spark.sparkContext.setLogLevel("DEBUG")

// spark.conf.set("spark.sql.catalog.cass100", "com.datastax.spark.connector.datasource.CassandraCatalog") // spark.conf.set("spark.sql.catalog.cass100.spark.cassandra.connection.host", "192.168.144.24")

val jsonSchema = new StructType()
.add("@type", DataTypes.StringType).add("1", DataTypes.IntegerType)
.add("2", DataTypes.IntegerType)
.add("8", DataTypes.StringType).add("12", DataTypes.StringType)
.add("7", DataTypes.StringType).add("11", DataTypes.StringType)
 .add("4", DataTypes.StringType).add("6", DataTypes.StringType)
.add("27", DataTypes.StringType).add("28", DataTypes.StringType)
.add("152", DataTypes.StringType).add("153", DataTypes.StringType)
.add("0", DataTypes.StringType).add("subTemplateMultiList",DataTypes.StringType)

import spark.implicits._
val outputTopic = "test"
val inputTopic= "ipfix320"
val brokers= "192.168.144.93:9092"

val inputDf = spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", brokers)
  .option("subscribe", inputTopic)
  .load()
  .selectExpr("CAST(value AS STRING)")
  .select(from_json($"value", jsonSchema) as "value") //.where($"value.1">2000 && $"value.2">12)
  .select("value.*")
  .withColumn("timestamp", lit(current_timestamp()))

val newDf= inputDf.columns.foldLeft(inputDf)((curr, n)=>curr.withColumnRenamed(n, "c"+n.toString()))

     newDf.printSchema()

val otherDf= newDf.select(col("c12").as("src"),col("c8").as("dest"),col("c2").as("max"),col("c1").as("sum"),col("c152").as("window"))

otherDf.printSchema()

try {
  val query = otherDf
    .writeStream
    .format("ScyllaSinkProvider")
    .outputMode(OutputMode.Append)
    .options(
      Map(
        "cluster" -> "Test Cluster",
        "keyspace" -> "tls",
        "table" -> "agg",
        "checkpointLocation" -> "/tmp/checkpoints"
      )
    )
    .start()
}
catch {
  case ex:Exception=>println
}

} }

class ScyllaSinkProvider extends StreamSinkProvider { println("########################################HELLO############################") override def createSink(sqlContext: SQLContext, parameters: Map[String, String], partitionColumns: Seq[String], outputMode: OutputMode): ScyllaSink = new ScyllaSink(parameters) }

class ScyllaSink(parameters: Map[String, String]) extends Sink { override def addBatch(batchId: Long, data: DataFrame): Unit = data.write .cassandraFormat(parameters("agg"), parameters("tls"), parameters("Test Cluster")) .mode(SaveMode.Append) .save() } And my keyspace is tls, with table agg as following on ScyllaDb :

CREATE TABLE tls.agg ( src text, dest text, max float, sum float, window text, PRIMARY KEY (src, dest) ) WITH CLUSTERING ORDER BY (dest ASC) AND bloom_filter_fp_chance = 0.01 AND caching = {'keys': 'ALL', 'rows_per_partition': 'ALL'} AND comment = '' AND compaction = {'class': 'SizeTieredCompactionStrategy'} AND compression = {'sstable_compression': 'org.apache.cassandra.io.compress.LZ4Compressor'} AND crc_check_chance = 1.0 AND dclocal_read_repair_chance = 0.1 AND default_time_to_live = 0 AND gc_grace_seconds = 864000 AND max_index_interval = 2048 AND memtable_flush_period_in_ms = 0 AND min_index_interval = 128 AND read_repair_chance = 0.0 AND speculative_retry = '99.0PERCENTILE';

Please tell me why the code is getting exited without giving any error.