fsanaulla / chronicler-spark

InfluxDB connector to Apache Spark on top of Chronicler
Apache License 2.0
27 stars 4 forks source link

On write InfluxDB gives status 204 without any content written #15

Closed abossenbroek closed 5 years ago

abossenbroek commented 5 years ago

I am trying to load data from a CSV to InfluxDB using chronicler-spark. The program completes without errors but InfluxDB reports a 204 error. Example program that shows this behaviour on my laptop can be found below. May be related to a bug in influxdb.

sbt
name := "sensor_data"

version := "0.1"

scalaVersion := "2.12.8"
lazy val chronicler: String = "0.4.6"

libraryDependencies ++= Seq(
  // You don’t *have to* use Spark, but in case you want to, we have added the dependency
  "org.apache.spark" %% "spark-core" % "2.4.1",
  "org.apache.spark" %% "spark-sql" % "2.4.1",
  "com.github.fsanaulla" %% "chronicler-spark-ds"       % "0.2.8",
  "com.github.fsanaulla" %% "chronicler-ahc-io"         % chronicler,
  "com.github.fsanaulla" %% "chronicler-macros"         % chronicler,
  "com.github.fsanaulla" %% "chronicler-url-io"         % chronicler,
  "com.github.fsanaulla" %% "chronicler-url-management" % chronicler
)
Writing to InfluxDB
import org.apache.spark.sql.{DataFrame, SparkSession}
import com.github.fsanaulla.chronicler.core.model.{InfluxCredentials, InfluxWriter}
import com.github.fsanaulla.chronicler.urlhttp.shared.InfluxConfig
import com.github.fsanaulla.chronicler.macros.annotations.{field, tag, timestamp}
import com.github.fsanaulla.chronicler.urlhttp.management.{InfluxMng, UrlManagementClient}
import org.apache.spark.sql

final case class MeteringEntry(
                                @tag sensorID: String,
                                @tag unit: Option[String],
                                @field value: Double,
                                @timestamp created: Long
                              )

object Main extends App {
  val spark: SparkSession =
    SparkSession
      .builder()
      .appName("Global temperature loader")
      .config("spark.master", "local[1]")
      .config("spark.driver.memory", "1600M")
      .config("spark.executor.memory", "1600M")
      .getOrCreate()

  import spark.implicits._

  val rawData: DataFrame  = spark.read
    .option("header", true)
    .option("sep", "|")
    .option("charset", "iso-8859-1")
    .csv("/tmp/toy_data.csv")
    .toDF(Seq("sensorID", "unit", "value", "created"): _*)
    .withColumn("value", 'value.cast(sql.types.DoubleType))
    .withColumn("created", 'created.cast(sql.types.LongType))

  val data = rawData.as[MeteringEntry]

  val credentials: InfluxCredentials = InfluxCredentials("influx", "influxx")
  implicit lazy val influxConf: InfluxConfig = InfluxConfig(host = "localhost", port = 8086,
    credentials = Some(credentials), gzipped = false, None)

  lazy val management: UrlManagementClient = InfluxMng(influxConf)

  management.dropDatabase("galaxy")
  management.createDatabase("galaxy")

  management.close()

  import com.github.fsanaulla.chronicler.spark.ds._

  implicit val wr: InfluxWriter[MeteringEntry] = new InfluxWriter[MeteringEntry] {
    def write(me: MeteringEntry): String = {
      val sb = StringBuilder.newBuilder

      def unitUnpacker(value: Option[String]): String = value match {
        case Some(s) => s
        case _ => ""
      }

      // Query looks like: <tags> <fields> <timestamp RFC3339>
      sb.append("unit=")
        .append(unitUnpacker(me.unit))
        .append(" ")
        .append("level=")
        .append(me.value)
        .append(" ")
        .append(me.created)

      sb.toString()
    }
  }

  val sensorID = "meaningOfLife"

  print(data.filter(row => row.sensorID == sensorID).show(10))
  data.filter(row => row.sensorID == sensorID).saveToInfluxDB(dbName = "galaxy", measName = sensorID,
    onFailure = ex => {
      throw new Exception(ex)
    })

  spark.stop()
}

/tmp/toy_data.csv

meaningOfLife|zorg|442.401184|1451602860000000000
meaningOfLife|zorg|442.083191|1451602920000000000
meaningOfLife|zorg|442.161682|1451602980000000000
meaningOfLife|zorg|442.598999|1451603040000000000
meaningOfLife|zorg|442.052185|1451603100000000000
meaningOfLife|zorg|442.718689|1451603160000000000
meaningOfLife|zorg|442.338806|1451603220000000000
meaningOfLife|zorg|442.322815|1451603280000000000
meaningOfLife|zorg|442.588318|1451603340000000000
meaningOfLife|zorg|442.041687|1451603400000000000

docker-compose.yml

version: '3.3'
services:
  influxdb:
    image: influxdb:1.7
    restart: always
    hostname: influxdb
    environment:
      - INFLUXDB_ADMIN_ENABLED=true
      - INFLUXDB_DB=uv_sensors
      - INFLUXDB_ADMIN_USER=influx
      - INFLUXDB_ADMIN_PASSWORD=influxx
      - INFLUXDB_WRITE_USER=influx_user
      - INFLUXDB_WRITE_USER_PASSWORD=influxx_user
      - VIRTUAL_PORT=8086
    volumes:
      - ./volumes/influxdb:/var/lib/influxdb:rw
    ports:
      - "8086:8086"

  chronograf:
    image: chronograf:1.7
    restart: always
    hostname: chronograf
    environment:
      - VIRTUAL_PORT=8888
      - INFLUXDB_URL=http://influxdb:8086
      - INFLUXDB_USERNAME=influx
      - INFLUXDB_PASSWORD=influxx
    volumes:
      - ./volumes/chronograf:/var/lib/chronograf:rw
    depends_on:
      - influxdb
    ports:
      - "8888:8888"

Logs

When running with sbt run I see the following on the spark logs,

19/04/22 18:19:48 INFO ContextCleaner: Cleaned accumulator 12
+-------------+----+----------+-------------------+
|     sensorID|unit|     value|            created|
+-------------+----+----------+-------------------+
|meaningOfLife|zorg|442.083191|1451602920000000000|
|meaningOfLife|zorg|442.161682|1451602980000000000|
|meaningOfLife|zorg|442.598999|1451603040000000000|
|meaningOfLife|zorg|442.052185|1451603100000000000|
|meaningOfLife|zorg|442.718689|1451603160000000000|
|meaningOfLife|zorg|442.338806|1451603220000000000|
|meaningOfLife|zorg|442.322815|1451603280000000000|
|meaningOfLife|zorg|442.588318|1451603340000000000|
|meaningOfLife|zorg|442.041687|1451603400000000000|
+-------------+----+----------+-------------------+

and the following in the docker logs

influxdb_1    | ts=2019-04-22T17:19:48.026308Z lvl=info msg="Executing query" log_id=0Ew~~o7l000 service=query query="DROP DATABASE galaxy"
influxdb_1    | [httpd] 172.20.0.1 - inflx [22/Apr/2019:17:19:48 +0000] "GET /query?p=%5BREDACTED%5D&q=DROP+DATABASE+galaxy&u=inflx HTTP/1.1" 200 168 "-" "Java/1.8.0_212" d4848559-6522-11e9-82a8-000000000000 818
influxdb_1    | ts=2019-04-22T17:19:48.081766Z lvl=info msg="Executing query" log_id=0Ew~~o7l000 service=query query="CREATE DATABASE galaxy"
influxdb_1    | [httpd] 172.20.0.1 - inflx [22/Apr/2019:17:19:48 +0000] "GET /query?p=%5BREDACTED%5D&q=CREATE+DATABASE+galaxy&u=inflx HTTP/1.1" 200 170 "-" "Java/1.8.0_212" d48cfc4f-6522-11e9-82a9-000000000000 7946
influxdb_1    | [httpd] 172.20.0.1 - inflx [22/Apr/2019:17:19:49 +0000] "POST /write?db=galaxy&p=%5BREDACTED%5D&u=inflx HTTP/1.1" 204 0 "-" "Java/1.8.0_212" d51ca41b-6522-11e9-82aa-000000000000 163483

Diagnosis

To understand the issue better I set a breakpoint at com.github.fsanaulla.chronicler.urlhttp.io.models.UrlWriter.scala:45 and stepped through the method. I can confirm the entity is properly formatted and works well when inserted with insert using the chronograf gui.

fsanaulla commented 5 years ago

Hello, @abossenbroek.

Thank you for your attention.

As I know Influx use 204 code as a result of a successful insert link.

So, I'm I right, you have 204 without inserting data?

abossenbroek commented 5 years ago

Thanks for pointing out the doc, wasn't aware @fsanaulla. You are right, the example I provided does not lead to any data inserted into the database. Remarkably, it works if I create a Dataset in scala without reading in a csv.

fsanaulla commented 5 years ago

Few notes:

abossenbroek commented 5 years ago

Few notes:

  • can you provide a file sample?

See CSV section in my question

  • check you timestamp format, because by default influx inspect to receive in nano precision.

I am aware of that, I took the data and wrote a bulk load myself and it works. Also tried

  val data: Dataset[MeteringEntry] = Seq(
    MeteringEntry("meaningOfLife", Option("zorg"), 442.401184,1451602860000000000L),
    MeteringEntry("meaningOfLife", Option("zorg"), 442.083191,1451602920000000000L),
    MeteringEntry("meaningOfLife", Option("zorg"), 442.161682,1451602980000000000L),
    MeteringEntry("meaningOfLife", Option("zorg"), 442.598999,1451603040000000000L),
    MeteringEntry("meaningOfLife", Option("zorg"), 442.052185,1451603100000000000L),
    MeteringEntry("meaningOfLife", Option("zorg"), 442.718689,1451603160000000000L),
    MeteringEntry("meaningOfLife", Option("zorg"), 442.338806,1451603220000000000L),
    MeteringEntry("meaningOfLife", Option("zorg"), 442.322815,1451603280000000000L),
    MeteringEntry("meaningOfLife", Option("zorg"), 442.588318,1451603340000000000L),
    MeteringEntry("meaningOfLife", Option("zorg"), 442.041687,1451603400000000000L)).toDS

and this works properly. Somehow just the csv doesn't work.

  • did you try structure-streaming for csv? Connector for it already exists in this project

Not sure how to do this. Any pointers would be useful, thanks.

fsanaulla commented 5 years ago

@abossenbroek I will look on it today.

About structured-streaming:

abossenbroek commented 5 years ago

Interestingly, adding .repartition(2) solves the issue somehow. Test with:

data.repartition(2).filter(row => row.sensorID == sensorID).saveToInfluxDB(dbName = "galaxy", measName = sensorID,
    onFailure = ex => {
      throw new Exception(ex)
    })
fsanaulla commented 5 years ago

@abossenbroek 0.2.9 should be in a few hours. It should help you I think. Tell me if it helps.

fsanaulla commented 5 years ago

Reopen if the error still occurs.