evanash1222 / TechBlogs

0 stars 0 forks source link

Dataworks数据集成 #1

Closed evanash1222 closed 2 years ago

evanash1222 commented 2 years ago

使用限制

基本概念

脏数据不会成功写入目的端。您可以在同步任务配置时,控制同步过程中是否允许脏数据产生,并且支持控制脏数据条数,即当脏数据超过指定条数时,任务失败退出。

数据集成类型

离线(批量)同步

注:DataWorks 的离线同步暂不支持跨时区同步数据。如果同步任务中同步的数据源与使用的 DataWorks 资源组不在同一个时区,则会导致同步的数据有误。 image.png

实时同步

待补充

网络连通方案

需要保障数据集成资源组(执行数据集成任务的机器)与数据库的网络连通性 image.png [选择网络连通方案](https://help.aliyun.com/document_detail/137671.htm?spm=a2c4g.11186623.0.0.48531476zSct1E#concept-ovl-zgv-42b)

网络类型划分

数据集成提供了RDS(MySQL、PostgreSQL和SQLServer)、PolarDB、DRDS、HybridDB for MySQL、AnalyticDB for PostgreSQL和AnalyticDB for MySQL3.0数据源之间的反向代理自动检测功能。在专有网络下,您无需购买一台和VPC同网络的ECS,即可通过反向代理自动检测连通网络。 PPAS、OceanBase、Redis、MongoDB、Memcache、Tabl eStore和HBase等阿里云其它非RDS的数据库,在专有网络下配置数据同步任务时,需要购买同网络的ECS,才可以通过ECS连通网络。

需要使用专线、高速通道、CEN、IPSec VPN等方式打通网络。

数据集成资源组分类

独享数据集成资源组、自定义数据集成资源组、公共数据集成资源组

独享数据集成资源组

可以把独享数据集成资源组看成是VPC网络内的一批可以选定规格的ECS机器。因此:

  1. 不同规格的ECS对应不同的数据集成能力

image.png

  1. 独享数据集成资源组默认无法连接经典网络内的数据源
  2. 独享数据集成资源组需要通过反向VPC的方式访问用户VPC网络的数据源(即绑定VPC)
  3. 独享数据集成资源组只能横向扩容,不能纵向扩容

image.png image.png image.png

自定义数据集成资源组

公共数据集成资源组

需要把公共数据集成资源组的IP加入白名单。注:IP池可能发生变化,因此可能遇到客户未更新白名单出现连接异常的问题。 [白名单列表](https://help.aliyun.com/document_detail/137792.html)

数据集成连通性常见问题

需要收集的信息

  1. 客户的数据源在哪?(云上 or 云下、经典网络 or VPC网络)
  2. 连接什么数据库?
  3. 使用什么类型的资源组?
  4. 历史上是否有连通过?
  5. 报错信息

排查思路:数据集成的数据源连通问题,本质上还是一个网络问题。因此需要对客户的整体链路进行逐一排查。 image.png

网络连通性检查工具

注:金融云没有网络连通性检查工具😭 加载数据 -> 实例状态检查 -> VPC绑定检查 -> 白名单检查 -> ping、telnet、客户端检查 image.png 跨地域的情况下,还可以一键添加路由、一键添加rds白名单等 image.png image.png

数据同步的开发模式

向导模式

上手容易,配置简单。但是不支持数据同步的部分高级功能。 向导模式可以转换成脚本模式,但是不支持脚本模式转换回向导模式。

脚本模式

直接编写数据同步的JSON脚本完成数据同步的配置。

数据集成配置

image.png

切分键

根据配置的字段进行数据分片,实现并发读取。可以将源数据表中某一列作为切分键,建议使用主键或有索引的列作为切分键。

分区信息

建表语句

CREATE TABLE IF NOT EXISTS t_dml_data_0615(
`detail_id`                     BIGINT COMMENT '',
`sale_date`                     DATETIME COMMENT '',
`province`                      STRING COMMENT '',
`city`                          STRING COMMENT '',
`product_id`                    BIGINT COMMENT '',
`cnt`                           BIGINT COMMENT '',
`amt`                           DECIMAL COMMENT ''
)
COMMENT 'null'
PARTITIONED BY (pt STRING) //分区表
lifecycle 36500; //默认生命周期

如果您每日增量数据限定在对应日期的分区中,可以使用分区做每日增量,比如配置分区pt值为${bizdate} 注:默认参数值与数据去向中的分区信息值对应。调度执行迁移任务时,目标表的分区值会被自动替换为任务执行日期的前一天,默认情况下,您会在当前执行前一天的业务数据,这个日期也叫做业务日期。

其他常用的分区pt值 [调度参数概述](https://help.aliyun.com/document_detail/137548.htm?spm=a2c4g.11186623.0.0.39fd1f1brsQQeK#concept-t2q-jmq-p2b)

清理规则

insert overwrite insert into

image.png

任务期望最大并发数

需要考虑独享数据集成资源组的规格

同步速率

分布式处理能力

实现独享数据集成资源组无法纵向拓展的能力 数据同步时,可以将任务切片分散到多台执行节点上并发执行,提高同步速率。该模式下,配置较大任务并发数会增加数据存储访问压力,如需使用该功能,请提前评估数据存储的访问负载。该功能仅支持在独享数据集成资源组配置。

image.png

正在提交...
提交任务成功
正在等待在云端的gateway资源
正在等待在云端的gateway资源
正在等待在云端的gateway资源
2022-06-15 10:54:15 INFO Current task status:RUNNING
2022-06-15 10:54:15 INFO Start execute shell on node sh-base-biz-gateway18.cloud.cm12.
2022-06-15 10:54:15 INFO Current working dir /home/admin/alisatasknode/taskinfo/20220615/datastudio/10/54/10/up13grqrusbu93r9iiigxw34
2022-06-15 10:54:15 INFO Full Command .. 
2022-06-15 10:54:15 INFO -------------------------
2022-06-15 10:54:15 INFO /home/admin/synccenter/datasync.py /home/admin/alisatasknode/taskinfo//20220615/datastudio/10/54/10/up13grqrusbu93r9iiigxw34//main.sql -p"bizdate=20220614 "
2022-06-15 10:54:15 INFO -------------------------
2022-06-15 10:54:15 INFO List of passing environment ..
2022-06-15 10:54:15 INFO -------------------------
2022-06-15 10:54:15 INFO SKYNET_BUSINESS_NAME=数据集成测试:
2022-06-15 10:54:15 INFO SKYNET_PTYPE=23:
2022-06-15 10:54:15 INFO SKYNET_ONDUTY=239298316640307693:
2022-06-15 10:54:15 INFO SKYNET_SYSTEMID=dev:
2022-06-15 10:54:15 INFO SKYNET_FILEID=502631703:
2022-06-15 10:54:15 INFO SKYNET_SOURCEID=:
2022-06-15 10:54:15 INFO SKYNET_PACKAGEID=mubi_dataworks_test:
2022-06-15 10:54:15 INFO SKYNET_DIDE_JOBID=fbf53714-17bd-4848-8e1d-7f6f752651e0:
2022-06-15 10:54:15 INFO SKYNET_ADVANCE_SETTINGS=:
2022-06-15 10:54:15 INFO SKYNET_TENANT_ID=306752103647458:
2022-06-15 10:54:15 INFO SKYNET_NODENAME=离线同步1:
2022-06-15 10:54:15 INFO ODPS_SQL_RETRY=false:
2022-06-15 10:54:15 INFO SKYNET_MASK_PROJECT=mubi_dataworks_test_dev:
2022-06-15 10:54:15 INFO SKYNET_TENANTOWNER=1721440269810815:
2022-06-15 10:54:15 INFO SKYNET_DI_RESOURCE_GROUP=S_res_group_306752103647458_1622714782009:
2022-06-15 10:54:15 INFO SKYNET_ARGS={"dataworks":"dataworks"}:
2022-06-15 10:54:15 INFO IS_NEW_SCHEDULE=true:
2022-06-15 10:54:15 INFO SKYNET_DAGTYPE=100:
2022-06-15 10:54:15 INFO SKYNET_SYSTEM_ENV=dev:
2022-06-15 10:54:15 INFO SKYNET_GMTDATE=20220615:
2022-06-15 10:54:15 INFO SKYNET_BIZDATE=20220615:
2022-06-15 10:54:15 INFO SKYNET_PROJECTID=188087:
2022-06-15 10:54:15 INFO SKYNET_BUSINESS_ID=10621189:
2022-06-15 10:54:15 INFO SKYNET_APP_ID=188087:
2022-06-15 10:54:15 INFO SKYNET_ARGS_ENABLE=true:
2022-06-15 10:54:15 INFO SKYNET_ACCOUNT_ID=gtscloudtest:mubi:
2022-06-15 10:54:15 INFO SKYNET_REGION=cn-shanghai:
2022-06-15 10:54:15 INFO LINK_FILE_ID=502631703:
2022-06-15 10:54:15 INFO TASK_PLUGIN_NAME=ide_cdp:
2022-06-15 10:54:15 INFO ALISA_TASK_ID=T3_3000797407:
2022-06-15 10:54:15 INFO ALISA_TASK_EXEC_TARGET=group_306752103647458_dev:
2022-06-15 10:54:15 INFO ALISA_TASK_PRIORITY=0:
2022-06-15 10:54:15 INFO --- Invoking Shell command line now ---
2022-06-15 10:54:15 INFO =================================================================

2022-06-15 10:54:29 [INFO] Begin to fetch abtest info for tenant [306752103647458]
2022-06-15 10:54:29 [INFO] Fetch abtest info for tenant 306752103647458 , the result is {"success": true, "data": true, "requestId": "0bc0621416552616692284314e7f0f", "errMsg": "success", "errCode": 0}
2022-06-15 10:54:29 [INFO] Use new di service...
2022-06-15 10:54:29 [INFO] Begin to route for data synchronization(current pid: 1)...
2022-06-15 10:54:29 [INFO] Environ variable replacement details: ${bdp.system.bizdate}->20220615 
2022-06-15 10:54:29 [INFO] Origin variable replacement details: bizdate=20220614 
2022-06-15 10:54:29 [INFO] Parsed variable replacement details(-p): ['bizdate=20220614']
2022-06-15 10:54:29 [INFO] Final Parsed variable replacement details: ['bizdate=20220614']
2022-06-15 10:54:29 [INFO] SKYNET_APP_ID:188087
2022-06-15 10:54:29 [INFO] ALISA_TASK_EXEC_TARGET:group_306752103647458_dev
2022-06-15 10:54:29 [INFO] SKYNET_SOURCENAME:None
2022-06-15 10:54:29 [INFO] ALISA_TASK_ID:T3_3000797407
2022-06-15 10:54:29 [INFO] JAVA_HOME:/opt/taobao/java
2022-06-15 10:54:29 [INFO] SKYNET_BIZDATE:20220615
2022-06-15 10:54:29 [INFO] SKYNET_CYCTIME:None
2022-06-15 10:54:29 [INFO] Data transport tunnel is DI.
2022-06-15 10:54:29 [INFO] DI job config file path: /home/admin/alisatasknode/taskinfo/20220615/datastudio/10/54/10/up13grqrusbu93r9iiigxw34/T3_3000797407.di.json
2022-06-15 10:54:29 [INFO] Begin to get di pipeline with parameter projectId: [188087].
2022-06-15 10:54:29 [INFO] Begin to get di id and key with parameter projectId: [188087].
2022-06-15 10:54:29 [INFO] Configuration conversion correctly, begin to synchronize the data.
Alibaba DI Console, Build 201910280000 .
Copyright 2018 Alibaba Group, All rights reserved .
2022-06-15 10:54:30 : The requestId for this DI job is: b27241c4-93e1-4a20-b7c6-3e1f8afbfe3e
2022-06-15 10:54:30 : ---
Reader: mysql 
                          envType=[0                             ]
                           column=[["detail_id","sale_date","province","city","product_id","cnt","amt"]]
                            where=[                              ]
                       connection=[[{"datasource":"mysql_first","table":["t_dml_data"]}]]
                          splitPk=[detail_id                     ]
Writer: odps 
                        partition=[pt=20220614                   ]
                         truncate=[true                          ]
                          envType=[0                             ]
                       datasource=[odps_first                    ]
                           column=[["detail_id","sale_date","province","city","product_id","cnt","amt"]]
                      emptyAsNull=[true                          ]
                     tableComment=[null                          ]
                            table=[t_dml_data_0615               ]
Setting:
                       errorLimit=[{"record":""}                 ]
                           locale=[zh                            ]
                            speed=[{"concurrent":8,"throttle":false}]

2022-06-15 10:54:33 : Start Job[775876854], traceId [306752103647458#188087#None#None#239298316640307693#None#None#离线同步1], running in Pipeline[basecommon_S_res_group_306752103647458_1622714782009]
2022-06-15 10:54:33 : The Job[775876854] will run in PhysicsPipeline [basecommon_S_res_group_306752103647458_1622714782009] with requestId [b27241c4-93e1-4a20-b7c6-3e1f8afbfe3e]
2022-06-15 10:54:33 : Detail log url: https://di-cn-shanghai.data.aliyun.com/web/di/instanceLog?id=775876854&resourceGroup=S_res_group_306752103647458_1622714782009&requestId=b27241c4-93e1-4a20-b7c6-3e1f8afbfe3e&projectId=188087
2022-06-15 10:54:33 : State: 1(SUBMIT) | Total: 0R 0B | Speed: 0R/s 0B/s | Stage: 0.0%

2022-06-15 10:54:48 : State: 3(RUN) | Total: 0R 0B | Speed: 0R/s 0B/s | Stage: 0.0%

2022-06-15 10:55:03 : State: 3(RUN) | Total: 0R 0B | Speed: 0R/s 0B/s | Stage: 0.0%

2022-06-15 10:55:18 : State: 0(SUCCESS) | Total: 10000R 248.9KB | Speed: 660R/s 16.4KB/s | Stage: 100.0%
2022-06-15 10:55:19 : DI Job[775876854] completed successfully.
2022-06-15 10:55:19 : ---
DI Submit at            : 2022-06-15 10:54:31
DI Start at             : 2022-06-15 10:54:39
DI Finish at            : 2022-06-15 10:55:04
2022-06-15 10:55:19 : Use "cdp job -log 775876854 [-p basecommon_S_res_group_306752103647458_1622714782009]" for more detail.
2022-06-15 10:55:19 : Detail log url: https://di-cn-shanghai.data.aliyun.com/web/di/instanceLog?id=775876854&resourceGroup=S_res_group_306752103647458_1622714782009&requestId=b27241c4-93e1-4a20-b7c6-3e1f8afbfe3e&projectId=188087
Exit with SUCCESS. 
2022-06-15 10:55:19 [INFO] Sandbox context cleanup temp file success.
2022-06-15 10:55:19 [INFO] Data synchronization ended with return code: [0].
2022-06-15 10:55:19 INFO =================================================================
2022-06-15 10:55:19 INFO Exit code of the Shell command 0
2022-06-15 10:55:19 INFO --- Invocation of Shell command completed ---
2022-06-15 10:55:19 INFO Shell run successfully!
2022-06-15 10:55:19 INFO Current task status: FINISH
2022-06-15 10:55:19 INFO Cost time is: 50.157s
/home/admin/alisatasknode/taskinfo//20220615/datastudio/10/54/10/up13grqrusbu93r9iiigxw34/T3_3000797407.log-END-EOF

数据集成调优

提高任务期望最大并发数

通过离线同步的最大并发数,可以提高离线数据同步的速度。

如何确认离线同步在以多线程运行? 在数据集成任务运行日志中看到如下内容: Configure concurrent number: 8 Split task number: 41 Actual running concurrent number: 8 并发日志 image.png 非并发日志 image.png

如何正确选择切分键?

image.png

如何正确选择最大并发数?

切分键的工作原理是什么样的? 根据切分字段将数据均等分后,多线程进行读取 image.png

确认任务状态是否正常

若日志出现长时间WAIT状态,说明当前任务运行所使用的独享数据集成资源组剩余可运行的并发数不足以运行当前任务。或者是无调度资源组进行任务的调度。 可以在运维中心 -> 资源运维处查看资源组是否过载 image.png

确认任务慢在哪一阶段

运维中心 -> 周期任务 查看日志,但是目前还不明确各个阶段具体的含义。

=== total summarize info === 
   1. all phase average time info and max time task info: 
PHASE                |  AVERAGE USED TIME |       ALL TASK NUM |      MAX USED TIME |        MAX TASK ID | MAX TASK INFO                                                                                       
TASK_TOTAL           |             1.725s |                 41 |             3.178s |              0-1-3 | t_dml_data,jdbcUrl:[jdbc:mysql://rm-uf6k57ol7u62i9fjv.mysql.rds.aliyuncs.com:3306/mubi_mysql4dataworks]
READ_TASK_INIT       |             0.005s |                 41 |             0.139s |              0-0-0 | t_dml_data,jdbcUrl:[jdbc:mysql://rm-uf6k57ol7u62i9fjv.mysql.rds.aliyuncs.com:3306/mubi_mysql4dataworks]
READ_TASK_PREPARE    |             0.000s |                 41 |             0.001s |              0-0-4 | t_dml_data,jdbcUrl:[jdbc:mysql://rm-uf6k57ol7u62i9fjv.mysql.rds.aliyuncs.com:3306/mubi_mysql4dataworks]
READ_TASK_DATA       |             0.293s |                 41 |             1.051s |              0-0-6 | t_dml_data,jdbcUrl:[jdbc:mysql://rm-uf6k57ol7u62i9fjv.mysql.rds.aliyuncs.com:3306/mubi_mysql4dataworks]
READ_TASK_POST       |             0.000s |                 41 |             0.000s |              0-1-7 | t_dml_data,jdbcUrl:[jdbc:mysql://rm-uf6k57ol7u62i9fjv.mysql.rds.aliyuncs.com:3306/mubi_mysql4dataworks]
READ_TASK_DESTROY    |             0.000s |                 41 |             0.000s |             0-1-17 | t_dml_data,jdbcUrl:[jdbc:mysql://rm-uf6k57ol7u62i9fjv.mysql.rds.aliyuncs.com:3306/mubi_mysql4dataworks]
WRITE_TASK_INIT      |             0.013s |                 41 |             0.090s |              0-1-3 | t_dml_data,jdbcUrl:[jdbc:mysql://rm-uf6k57ol7u62i9fjv.mysql.rds.aliyuncs.com:3306/mubi_mysql4dataworks]
WRITE_TASK_PREPARE   |             0.181s |                 41 |             0.600s |              0-1-5 | t_dml_data,jdbcUrl:[jdbc:mysql://rm-uf6k57ol7u62i9fjv.mysql.rds.aliyuncs.com:3306/mubi_mysql4dataworks]
WRITE_TASK_DATA      |             0.300s |                 41 |             0.930s |              0-0-6 | t_dml_data,jdbcUrl:[jdbc:mysql://rm-uf6k57ol7u62i9fjv.mysql.rds.aliyuncs.com:3306/mubi_mysql4dataworks]
WRITE_TASK_POST      |             0.765s |                 41 |             1.984s |             0-1-33 | t_dml_data,jdbcUrl:[jdbc:mysql://rm-uf6k57ol7u62i9fjv.mysql.rds.aliyuncs.com:3306/mubi_mysql4dataworks]
WRITE_TASK_DESTROY   |             0.000s |                 41 |             0.000s |              0-0-0 | t_dml_data,jdbcUrl:[jdbc:mysql://rm-uf6k57ol7u62i9fjv.mysql.rds.aliyuncs.com:3306/mubi_mysql4dataworks]
SQL_QUERY            |             0.239s |                 41 |             0.956s |              0-0-6 | t_dml_data,jdbcUrl:[jdbc:mysql://rm-uf6k57ol7u62i9fjv.mysql.rds.aliyuncs.com:3306/mubi_mysql4dataworks]
RESULT_NEXT_ALL      |             0.005s |                 41 |             0.179s |             0-1-33 | t_dml_data,jdbcUrl:[jdbc:mysql://rm-uf6k57ol7u62i9fjv.mysql.rds.aliyuncs.com:3306/mubi_mysql4dataworks]
ODPS_BLOCK_CLOSE     |             0.126s |                 41 |             0.282s |             0-0-34 | t_dml_data,jdbcUrl:[jdbc:mysql://rm-uf6k57ol7u62i9fjv.mysql.rds.aliyuncs.com:3306/mubi_mysql4dataworks]
WAIT_READ_TIME       |             0.070s |                 41 |             0.443s |              0-0-6 | t_dml_data,jdbcUrl:[jdbc:mysql://rm-uf6k57ol7u62i9fjv.mysql.rds.aliyuncs.com:3306/mubi_mysql4dataworks]
WAIT_WRITE_TIME      |             0.000s |                 41 |             0.001s |              0-1-7 | t_dml_data,jdbcUrl:[jdbc:mysql://rm-uf6k57ol7u62i9fjv.mysql.rds.aliyuncs.com:3306/mubi_mysql4dataworks]

查看数据源、网络的负载情况