fsanaulla / chronicler-spark

InfluxDB connector to Apache Spark on top of Chronicler
Apache License 2.0
27 stars 4 forks source link
chronicler dataframe influxdb rdd scala spark streaming

chronicler-spark

Scala CI Maven Central Scala Steward badge

Open-source InfluxDB connector for Apache Spark on top of Chronicler.

Get Started

At the beginning add required module to your build.sbt:

// For RDD
libraryDependencies += "com.github.fsanaulla" %% "chronicler-spark-rdd" % <version>

// For Dataset
libraryDependencies += "com.github.fsanaulla" %% "chronicler-spark-ds" % <version>

// For Structured Streaming
libraryDependencies += "com.github.fsanaulla" %% "chronicler-spark-structured-streaming" % <version>

// For DStream
libraryDependencies += "com.github.fsanaulla" %% "chronicler-spark-streaming" % <version>

Usage

Default configuration:

final case class InfluxConfig(
    host: String,
    port: Int = 8086,
    credentials: Option[InfluxCredentials] = None,
    compress: Boolean = false,
    ssl: Boolean = false)

It's recommended to enable data compression to decrease network traffic.

For RDD[T]:

import com.github.fsanaulla.chronicler.spark.rdd._

val rdd: RDD[T] = _
rdd.saveToInfluxDBMeas("dbName", "measurementName")

// to save with dynamicly generated measurement
rdd.saveToInfluxDB("dbName")

For Dataset[T]:

import com.github.fsanaulla.chronicler.spark.ds._

val ds: Dataset[T] = _
ds.saveToInfluxDBMeas("dbName", "measurementName")

// to save with dynamicly generated measurement
ds.saveToInfluxDB("dbName")

For DataStreamWriter[T]


import com.github.fsanaulla.chronicler.spark.structured.streaming._

val structStream: DataStreamWriter[T] = _
val saved = structStream.saveToInfluxDBMeas("dbName", "measurementName")

// to save with dynamicly generated measurement
val saved = structStream.saveToInfluxDB("dbName")
..
saved.start().awaitTermination()

For DStream[T]:

import com.github.fsanaulla.chronicler.spark.streaming._

val stream: DStream[T] = _
stream.saveToInfluxDBMeas("dbName", "measurementName")
stream,saveToInfluxDB("dbName")