FRosner / drunken-data-quality

Spark package for checking data quality
Apache License 2.0
222 stars 69 forks source link

Reporting Showcase #68

Closed FRosner closed 8 years ago

FRosner commented 8 years ago

Architecture

Using ELK + Beats, inspired by a DigitalOcean blog entry. I left out the reverse proxy, though.

architecture

Components

Component Version
log4j 1.2.17
filebeat 1.1.1
elasticsearch 2.2.0
kibana 4.4.1
logstash 2.2.2

Configuration

log4j.properties

log4j.logger.DDQ=info, file

log4j.appender.file=org.apache.log4j.FileAppender
log4j.appender.file.File=ddq.log
log4j.appender.file.ImmediateFlush=true
log4j.appender.file.Threshold=INFO
log4j.appender.file.Append=true
log4j.appender.file.layout=org.apache.log4j.PatternLayout
log4j.appender.file.layout.conversionPattern=%m%n

filebeat.yml

filebeat:
  prospectors:
    -
      paths:
        - ddq*.log
      input_type: log

output:
  logstash:
    hosts: ["localhost:5044"]
  console:
    pretty: true
    bulk_max_size: 1

shipper:

logging:
  to_files: true
  files:
    path: ddqbeat
    name: ddqbeat.log
    rotateeverybytes: 10485760 # = 10MB
    keepfiles: 7

ddq-logstash.conf

input {
  beats {
    port => 5044
  }
}

filter {
    json {
        source => "message"
        target => "constraintResult"
    }

    mutate {
        remove_field => [ "message","path","priority","logstash-socket","thread","class","file","method","logger_name" ]
    }
}

output {
    elasticsearch {
       hosts => ["localhost:9200"]
    }
}

DDQ Log4jReporter

import org.apache.spark.streaming._
import de.frosner.ddq.core._
import de.frosner.ddq.reporters.Log4jReporter
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._

val sqlContext = new SQLContext(sc)
import sqlContext.implicits._

val ssc = new StreamingContext(sc, Seconds(5))
val lines = ssc.socketTextStream("localhost", 9999)
lines.foreachRDD { rdd =>
  val df = rdd.map{ line => 
    val Array(number, string) = line.split(" ")
    (number, string)
  }.toDF("potentialNumber", "string").withColumn(
    "number",
    new Column("potentialNumber").cast(IntegerType)
  )

  val reporter = Log4jReporter()
  Check(
    dataFrame = df,
    cacheMethod = None,
    displayName = Some("numbers")
  ).satisfies(
    new Column("number") > 0
  ).hasUniqueKey(
    "string"
  ).satisfies(
    !new Column("string").startsWith("z")
  ).run(reporter)
}

ssc.start()
ssc.awaitTermination()

Data Generator

while :
do
    echo $(( ( RANDOM % 100 )  - 1 )) $(head /dev/urandom | LC_ALL=C tr -dc A-Za-z0-9 | head -c 12)
    sleep 0.0$(( ( RANDOM % 10 )  + 1 ))
done | nc -lk 9999

Kibana Dashboard

[
  {
    "_id": "Showcase-Dashboard",
    "_type": "dashboard",
    "_source": {
      "title": "Showcase Dashboard",
      "hits": 0,
      "description": "",
      "panelsJSON": "[{\"id\":\"String-Column-is-Valid\",\"type\":\"visualization\",\"panelIndex\":1,\"size_x\":4,\"size_y\":2,\"col\":9,\"row\":1},{\"id\":\"Total-Number-of-Rows\",\"type\":\"visualization\",\"panelIndex\":2,\"size_x\":4,\"size_y\":2,\"col\":1,\"row\":1},{\"id\":\"String-Unique-Key-Result\",\"type\":\"visualization\",\"panelIndex\":3,\"size_x\":4,\"size_y\":2,\"col\":5,\"row\":1},{\"id\":\"Failing-Rows-(number->-0)\",\"type\":\"visualization\",\"panelIndex\":4,\"size_x\":12,\"size_y\":4,\"col\":1,\"row\":3}]",
      "optionsJSON": "{\"darkTheme\":false}",
      "uiStateJSON": "{\"P-3\":{\"vis\":{\"legendOpen\":true,\"colors\":{\"Success\":\"#629E51\"}},\"spy\":{\"mode\":{\"name\":null,\"fill\":false}}}}",
      "version": 1,
      "timeRestore": false,
      "kibanaSavedObjectMeta": {
        "searchSourceJSON": "{\"filter\":[{\"query\":{\"query_string\":{\"query\":\"*\",\"analyze_wildcard\":true}}}]}"
      }
    }
  },
  {
    "_id": "String-Unique-Key-Result",
    "_type": "visualization",
    "_source": {
      "title": "String Unique Key Result",
      "visState": "{\"title\":\"New Visualization\",\"type\":\"pie\",\"params\":{\"shareYAxis\":true,\"addTooltip\":true,\"addLegend\":true,\"isDonut\":false},\"aggs\":[{\"id\":\"1\",\"type\":\"count\",\"schema\":\"metric\",\"params\":{}},{\"id\":\"2\",\"type\":\"terms\",\"schema\":\"segment\",\"params\":{\"field\":\"constraintResult.status.raw\",\"size\":5,\"order\":\"desc\",\"orderBy\":\"1\"}}],\"listeners\":{}}",
      "uiStateJSON": "{}",
      "description": "",
      "version": 1,
      "kibanaSavedObjectMeta": {
        "searchSourceJSON": "{\"index\":\"logstash-*\",\"query\":{\"query_string\":{\"query\":\"constraintResult.constraint:UniqueKeyConstraint\",\"analyze_wildcard\":true}},\"filter\":[]}"
      }
    }
  },
  {
    "_id": "String-Column-is-Valid",
    "_type": "visualization",
    "_source": {
      "title": "String Column is Valid",
      "visState": "{\"title\":\"New Visualization\",\"type\":\"pie\",\"params\":{\"shareYAxis\":true,\"addTooltip\":true,\"addLegend\":true,\"isDonut\":false},\"aggs\":[{\"id\":\"1\",\"type\":\"count\",\"schema\":\"metric\",\"params\":{}},{\"id\":\"2\",\"type\":\"terms\",\"schema\":\"segment\",\"params\":{\"field\":\"constraintResult.status.raw\",\"size\":5,\"order\":\"desc\",\"orderBy\":\"1\"}}],\"listeners\":{}}",
      "uiStateJSON": "{\"vis\":{\"colors\":{\"Success\":\"#629E51\",\"Failure\":\"#EA6460\"}}}",
      "description": "",
      "version": 1,
      "kibanaSavedObjectMeta": {
        "searchSourceJSON": "{\"index\":\"logstash-*\",\"query\":{\"query_string\":{\"query\":\"constraintResult.column:\\\"NOT StartsWith(string, z)\\\"\",\"analyze_wildcard\":true}},\"filter\":[]}"
      }
    }
  },
  {
    "_id": "Total-Number-of-Rows",
    "_type": "visualization",
    "_source": {
      "title": "Total Number of Rows",
      "visState": "{\"title\":\"New Visualization\",\"type\":\"metric\",\"params\":{\"fontSize\":60},\"aggs\":[{\"id\":\"1\",\"type\":\"sum\",\"schema\":\"metric\",\"params\":{\"field\":\"constraintResult.check.rowsTotal\"}}],\"listeners\":{}}",
      "uiStateJSON": "{}",
      "description": "",
      "version": 1,
      "kibanaSavedObjectMeta": {
        "searchSourceJSON": "{\"index\":\"logstash-*\",\"query\":{\"query_string\":{\"query\":\"constraintResult.constraint:UniqueKeyConstraint\",\"analyze_wildcard\":true}},\"filter\":[]}"
      }
    }
  },
  {
    "_id": "Failing-Rows-(number->-0)",
    "_type": "visualization",
    "_source": {
      "title": "Failing Rows (number > 0)",
      "visState": "{\"title\":\"Failing Rows (number > 0)\",\"type\":\"histogram\",\"params\":{\"shareYAxis\":true,\"addTooltip\":true,\"addLegend\":true,\"scale\":\"linear\",\"mode\":\"stacked\",\"times\":[],\"addTimeMarker\":false,\"defaultYExtents\":false,\"setYExtents\":false,\"yAxis\":{}},\"aggs\":[{\"id\":\"1\",\"type\":\"sum\",\"schema\":\"metric\",\"params\":{\"field\":\"constraintResult.failed\"}},{\"id\":\"2\",\"type\":\"date_histogram\",\"schema\":\"segment\",\"params\":{\"field\":\"@timestamp\",\"interval\":\"custom\",\"customInterval\":\"10s\",\"min_doc_count\":1,\"extended_bounds\":{}}}],\"listeners\":{}}",
      "uiStateJSON": "{\"vis\":{\"colors\":{\"Sum of constraintResult.failed\":\"#BF1B00\"}}}",
      "description": "",
      "version": 1,
      "kibanaSavedObjectMeta": {
        "searchSourceJSON": "{\"index\":\"logstash-*\",\"query\":{\"query_string\":{\"query\":\"constraintResult.column:\\\"(number > 0)\\\"\",\"analyze_wildcard\":true}},\"filter\":[]}"
      }
    }
  }
]

Demo Video

image

FRosner commented 8 years ago

I created a wiki page and linked it to the documentation.