apache / dolphinscheduler

Apache DolphinScheduler is the modern data orchestration platform. Agile to create high performance workflow with low-code
https://dolphinscheduler.apache.org/
Apache License 2.0
12.5k stars 4.52k forks source link

[Feature] Data Quality Design #4283

Closed zixi0825 closed 1 week ago

zixi0825 commented 3 years ago

1 Summary

Data quality inspection is an important part of the data processing process. After data synchronization and data processing, it is usually necessary to check the accuracy of the data, such as comparing the difference in the number of data between the source table and the target table, or checking according to a certain rule that calculate a certain column and compare the standard value and the calculated value to judge. At present, there is no such type of data quality check in the task type of DS, so it is necessary to add a new data quality task type so that the data quality check task can be directly added when defining the workflow, so that the entire data processing process is more complete.

2 Requirements Analysis

For data quality inspection tasks, the core functions are rule management, specific task execution, and execution result alarms. To achieve a lightweight data quality, the following functions must be met:

2.1 Rule Manager

2.1.1 RuleType

2.1.2 Rule Implementation

2.1.3 Rule Definition and Parser

2.1.3.1 Rule Definition

The complete rules should include connector information, executed SQL statements, the type of comparison value, the type of inspection, etc., that is, the parameters needed to define a data quality task can be obtained through the rules

2.1.3.2 Rule Parser

The main responsibility of rule parser is to obtain an parameter that conforms to the execution of the data quality task by parsing the parameter value input by the user and the rule definition.

2.2 Task Execution Mode

Based on the existing task execution method of DolphinScheduler, a more appropriate way is to use Spark as the execution engine for data quality tasks, pass specific execution SQL to the Spark job to run through configuration, and write the execution results to the specified storage engine

2.1.2 Alert

Each rule configure alertrules, when the check result is abnormal, an alertoccurs. Use DS's alert module for alarm

3 Summary Design

3.1 Rule Manager Design

3.1.1 Rule Component Design

3.1.1.1 Single Rule

3.1.1.2 MultiTableAccuracyRule

3.1.1.3 ,MultiTableValueComparsionRule

3.1.2 Custom Rule

3.2 Task Execute Process Design

3.2.1 Execution Engine

3.2.2 Task Execution Process

数据质量流程英文版

3.3 Task Manager Design

Data quality tasks do not support separate definition and scheduled scheduling, which can be defined and scheduled in the workflow

3.4 Data Quality Task Definition UI Design

3.4.1 UI Generation Method

The data quality task definition UI interface will automatically generated by the front-end component according to a JSON string.

3.4.2 Task Define UI Prototype Diagram

数据质量任务定义界面UI (2)

3.4.3 Custom Rule UI Prototype Diagram

自定义规则 (1)

4 Detail Design

4.1 Database Design

4.1.1 RuleInfo

column type comment
id int id
name string rule name
type int rule type:single-table/multi-table
rule_json text rule definition
create_time date create time
update_time date update time

4.1.2 CheckResultInfo

column type comment
id int id
task_id long Task ID
task_instance_id long TaskInstance ID
rule_type int rule type
statistics_value double statistics value
comparsion_value double comparsion value
check_type int check type,fixed value or percentage
threshold double threshold
operator int operator:>,<,=,>=,<=
create_time date create time
update_time date update time

4.1.3 CheckResultStatisticsInfo

4.2 Class Design

4.2.1 Rule Design

4.2.1.1Rule Related Model

4.2.1.2 RuleParser

1)Connector Parameter Parser

To get the information of datasource including url, database, table, username, password according the datasource_id and constructed information of connector

2)Replace the placeholders in executeSQL to construct an executeSQL list

3)Construct writer configuration, including construct writer connector configuration and saveSQL

if(comparsionType == FIXED){
  map.put("${comparsion_name}","fixed_value")
  sql = "select ${comparsion_name} as comparsion_value from ${staticsTableName} 
} else {
  sql = "select ${comparsion_name} as comparsion_value from ${statics_table_Name} full join ${comparsion_table_Name} 
}
resultSQL = sql.replacePlaceholder(map)

Finally, it will be constructed into the json string parameter and passed to the Spark application

4.2.2 Task Design

4.2.2.1 DolphinScheduler Task Design

4.2.2.2 Spark Data Quality Task Design

1)The data quality task is actually a Spark task. The main responsibilities of this task are as follows:

2)The execute mode has the follow options

5 Todo List

6 related issue and pr

issue: DataQuality Application pr: DataQuality Common Entity


1 摘要

数据质量检查是数据处理流程中比较重要的环节,在数据同步和数据处理后通常是需要检查数据的准确性,例如比较源表和目标表之间的数据条数差,或者根据某个规则对某一列进行计算,将标准值和计算值进行比较判断。目前在 DS 的任务类型没有数据质量检查这样的类型,所以需要新增数据质量任务类型,以便于在定义工作流的时候可以直接添加数据质量检查任务,让整个数据处理流程更加的完整。

2 需求分析

对于数据质量检查任务来说,核心的功能就是规则管理、具体的任务执行以及执行结果告警,实现一个轻量级的数据质量需要满足以下功能:

2.1 规则管理

2.1.1 规则类型

2.1.2 规则实现方式

2.1.3 规则的定义和解析

2.1.3.1 规则定义

完整的规则应该包括 connector 信息、执行的 sql 语句、比较值的类型,检查的类型等,即通过规则可以获取定义一个数据质量任务所需要的参数

2.1.3.2 规则解析

规则解析主要职责是通过解析用户输入的参数值和规则定义得到一个符合数据质量任务运行的输入参数

2.2 任务的执行方式

基于 DolphinScheduler 现有的任务执行方式,比较合适的方式就是使用 Spark 作为数据质量任务的执行引擎,通过配置的方式将具体的执行 SQL 传入 Spark 作业来运行,并将执行的结果写到指定的存储引擎中

2.3 检查结果告警

每个规则都会配置告警规则,当检查结果为异常的话,则会进行告警。使用 DolphinScheduler 的告警模块进行告警

3 概要设计

3.1 规则管理设计

3.1.1 规则组成设计

3.1.1.1 单表规则

3.1.1.2 跨表准确性规则

3.1.1.3 跨表值比对规则

select ${statistics_name} as statistics_value,${comparsion_name} as coparsion_value from ${statistics_execute_sql} full join ${comparsion_execute_sql}

3.1.2 自定义规则

3.2 任务执行流程设计

3.2.1 执行引擎

3.2.2 任务执行流程

数据质量检测任务执行流程 (3)

3.3 任务管理设计

数据质量任务不支持单独定义和定时调度,可以通过在工作流中定义和定时调度

3.4 数据质量任务定义 UI 设计

3.4.1 UI 页面生成方式

数据质量任务定义 UI 界面会根据不同规则的参数生成 JSON 串由前端组件自动生成

3.4.2 任务定义 UI 示意图

数据质量任务定义界面UI (2)

3.4.3 自定义规则界面 UI 示意图

自定义规则 (1)

4 详细设计

4.1 数据库设计

4.1.1 规则表

字段 类型 解释
id int id
name string 规则名称
type int 规则类型:单表规则/跨表规则
rule_json text 规则定义
create_time date 创建时间
update_time date 更新时间

4.1.2 检查结果表

字段 类型 解释
id int id
task_id long 任务 ID
task_instance_id long 任务实例 ID
rule_type int 规则类型
statistics_value double 计算的指标值
comparsion_value double 比对的指标值
check_type int 检测类型,数值比较或者百分比
threshold double 阈值
operator int 操作符:大于,小于,等于,不等于,大于等于,小于等于
create_time date 创建时间
update_time date 更新时间

4.1.3 检查结果统计表

4.2 类设计

4.2.1 规则相关

4.2.1.1 规则实体

4.2.1.2 规则解析

1) 规则使用的流程分析

2)规则解析具体内容

根据 datasource_id 拿到相关的数据源信息,包括 url,database,table,username,password,构造 connector 配置

if(comparsionType == FIXED){
  map.put("${comparsion_name}","fixed_value")
  sql = "select ${comparsion_name} as comparsion_value from ${staticsTableName} 
} else {
  sql = "select ${comparsion_name} as comparsion_value from ${statics_table_Name} full join ${comparsion_table_Name} 
}
resultSQL = sql.replacePlaceholder(map)

3)最终会构造成json 格式 的参数传给 Spark 应用

4.2.2 任务相关

4.2.2.1 DolphinScheduler 任务设计

4.2.2.2 Spark 数据质量任务设计

1)数据质量任务实际上是一个 Spark 任务,这个任务的主要责任是如下:

2)运行方式可如下:

5 Todo List

6 相关 issue 和 pr

issue: DataQuality Application pr: DataQuality Common Entity

davidzollo commented 3 years ago

good feature

Kyofin commented 3 years ago

LGTM

lbjyuer commented 3 years ago

Essential functions of big data ETL System~~Looking forward to going online soon

davidzollo commented 3 years ago

Essential functions of big data ETL System~~Looking forward to going online soon

+1

zixi0825 commented 3 years ago

Development Planning:

Version 1.0

Version 2.0 (Time to be determined)


开发计划:

1.0 版本 (本地开发已完成95%以上,尚未提PR)

dqs_1

dqs_2

dqs_3

2.0 版本 (时间待定)

597365581 commented 3 years ago

+1

davidzollo commented 3 years ago

+1

ATLgo commented 3 years ago

+1

ATLgo commented 3 years ago

@zixi0825 大佬,想请教一下怎么才能将您完成的功能跑起来?

zixi0825 commented 3 years ago

@zixi0825 大佬,想请教一下怎么才能将您完成的功能跑起来?

It is not completed yet

JacobZheng0927 commented 3 years ago

I have also implemented the data quality management function in my company. The difference is that my work is based on Spark + parquet/avro to complete the data quality calculation. So I can provide some reference for the design of the calculation rules or the implementation of the specific code. I am very interested in participating in the development of this feature.How can I help you?

davidzollo commented 3 years ago

I have also implemented the data quality management function in my company. The difference is that my work is based on Spark + parquet/avro to complete the data quality calculation. So I can provide some reference for the design of the calculation rules or the implementation of the specific code. I am very interested in participating in the development of this feature.How can I help you?

When you see the current solution, please give some suggestions and we can discuss

wangqinghuan commented 2 years ago

@zixi0825 Thanks for this amazing job. Do we only output check result to csv file at present?

zixi0825 commented 2 years ago

@zixi0825 Thanks for this amazing job. Do we only output check result to csv file at present?

At present, only CSV is supported. I will add other formats.

wangqinghuan commented 2 years ago

@zixi0825 Thanks for this amazing job. Do we only output check result to csv file at present?

At present, only CSV is supported. I will add other formats.

Do we have plan support output to database ? I want to show these results in UI or BI tools.

davidzollo commented 2 years ago

@zixi0825 Thanks for this amazing job. Do we only output check result to csv file at present?

At present, only CSV is supported. I will add other formats.

Do we have plan support output to database ? I want to show these results in UI or BI tools.

I think this issue can be discussed in the dev@dolphinscheduler.apache.org mailing list

davidzollo commented 2 years ago

or I think you can submit an new issue to describe what you want

wangqinghuan commented 2 years ago

I have opened an new #8586

a092cc commented 2 years ago

Exception in thread "main" org.apache.spark.sql.AnalysisException: Table or view not found: 执行hive或者mysql语句时,表明明是存在的,报这个错误是什么情况?@zixi0825

yxsgao commented 2 years ago

Exception in thread "main" org.apache.spark.sql.AnalysisException: Table or view not found: 执行hive或者mysql语句时,表明明是存在的,报这个错误是什么情况?@zixi0825

请问你这个问题解决了吗?我也遇到了一样的问题。改了hive spark下面的hive-site.xml,但是还是报这个错误。

a092cc commented 2 years ago

需要把hive-site.xml 加载到conf中,我 是这么加的


* The SparkRuntimeEnvironment is responsible for creating SparkSession and SparkExecution
  */
@@ -47,14 +52,29 @@ public class SparkRuntimeEnvironment {
     }

     public void prepare() {
-        sparkSession = SparkSession.builder().config(createSparkConf()).getOrCreate();
+        sparkSession = SparkSession.builder().config(createSparkConf())
+                .enableHiveSupport()
+                .getOrCreate();
     }

     private SparkConf createSparkConf() {
         SparkConf conf = new SparkConf();
+
         this.config.entrySet()
             .forEach(entry -> conf.set(entry.getKey(), String.valueOf(entry.getValue())));
+
         conf.set("spark.sql.crossJoin.enabled","true");
+
+        Configuration cf = new Configuration();
+        cf.addResource("hive-site.xml");
+        cf.addResource("hdfs-site.xml");
+        cf.addResource("core-site.xml");
+        for (Map.Entry<String, String> next : cf) {
+            String key = next.getKey();
+            String value = next.getValue();
+            conf.set(key, value);
+        }
+
         return conf;
     }```
feixiameiruhua commented 2 years ago

需要把hive-site.xml 加载到conf中,我 是这么加的

* The SparkRuntimeEnvironment is responsible for creating SparkSession and SparkExecution
  */
@@ -47,14 +52,29 @@ public class SparkRuntimeEnvironment {
     }

     public void prepare() {
-        sparkSession = SparkSession.builder().config(createSparkConf()).getOrCreate();
+        sparkSession = SparkSession.builder().config(createSparkConf())
+                .enableHiveSupport()
+                .getOrCreate();
     }

     private SparkConf createSparkConf() {
         SparkConf conf = new SparkConf();
+
         this.config.entrySet()
             .forEach(entry -> conf.set(entry.getKey(), String.valueOf(entry.getValue())));
+
         conf.set("spark.sql.crossJoin.enabled","true");
+
+        Configuration cf = new Configuration();
+        cf.addResource("hive-site.xml");
+        cf.addResource("hdfs-site.xml");
+        cf.addResource("core-site.xml");
+        for (Map.Entry<String, String> next : cf) {
+            String key = next.getKey();
+            String value = next.getValue();
+            conf.set(key, value);
+        }
+
         return conf;
     }```

赞👍🏻,不过我就改了上面的那句就可以了,然后打包替换成对应的这个jar(dolphinscheduler-data-quality-xxx.jar).
sparkSession = SparkSession.builder().config(createSparkConf()) .enableHiveSupport() .getOrCreate();

zixi0825 commented 1 year ago

In cdh,adding hive-site.xml into /opt/cloudera/parcels/SPARK2/lib/spark2/conf can solve the problem

On Fri, May 20, 2022 at 10:39 AM a092cc @.***> wrote:

需要把hive-site.xml 加载到conf中,我 The SparkRuntimeEnvironment is responsible for creating SparkSession and SparkExecution / @@ -47,14 +52,29 @@ public class SparkRuntimeEnvironment { }

public void prepare() {

-

  sparkSession = SparkSession.builder().config(createSparkConf()).getOrCreate();

-

  sparkSession = SparkSession.builder().config(createSparkConf())

-

          .enableHiveSupport()

-

          .getOrCreate();

}

private SparkConf createSparkConf() { SparkConf conf = new SparkConf();

  this.config.entrySet()

      .forEach(entry -> conf.set(entry.getKey(), String.valueOf(entry.getValue())));

-

  conf.set("spark.sql.crossJoin.enabled","true");

-

  Configuration cf = new Configuration();

-

  cf.addResource("hive-site.xml");

-

  cf.addResource("hdfs-site.xml");

-

  cf.addResource("core-site.xml");

-

  for (Map.Entry<String, String> next : cf) {

-

      String key = next.getKey();

-

      String value = next.getValue();

-

      conf.set(key, value);

-

  }

-

  return conf;

} 是这么加的

— Reply to this email directly, view it on GitHub https://github.com/apache/dolphinscheduler/issues/4283#issuecomment-1132396853, or unsubscribe https://github.com/notifications/unsubscribe-auth/ACSUBBCLR5OONOBNTKDT3T3VK33MVANCNFSM4VEFPIRA . You are receiving this because you were mentioned.Message ID: @.***>