DTStack / chunjun

A data integration framework
https://dtstack.github.io/chunjun/
Apache License 2.0
3.99k stars 1.69k forks source link

flinkx1.12性能问题:mysql-hive 数据量9000万+ #1136

Open biandou1313 opened 2 years ago

biandou1313 commented 2 years ago

Search before asking

What happened

问题描述: 1、来源数据为700万+ 时,通过跳针flink job相关参数,可以在6分钟之内处理完 2、来源为9000万+时,mysql-hive同步时,来源端mysqlsource无法拉取数据,是因为source端需要一次查询数据 无法范围rs 来源端执行SQL: SELECT id, device_id, point, hash, value, acq_time, ratio, max_val, min_val, out_param, item_code, descr, save_hst, del_flag, create_user, update_user, create_time, update_time, ext_first, ext_second, ext_third, ext_fourth, ext_fifth, ext_sixth, ext_seventh, ext_eighth, ext_ninth, ext_tenth, device_add_id, today_start_value, unit FROM tb_device_point_data_hst_01 WHERE 1=1

==================================================================================== Caused by: java.lang.IllegalArgumentException: open() failed.The last packet successfully received from the server was 373,982 milliseconds ago. The last packet sent successfully to the server was 373,983 milliseconds ago. is longer than the server configured value of 'wait_timeout'. You should consider either expiring and/or testing connection validity before use in your application, increasing the server configured values for client timeouts, or using the Connector/J connection property 'autoReconnect=true' to avoid this problem. querySQL: SELECT id, device_id, point, hash, value, acq_time, ratio, max_val, min_val, out_param, item_code, descr, save_hst, del_flag, create_user, update_user, create_time, update_time, ext_first, ext_second, ext_third, ext_fourth, ext_fifth, ext_sixth, ext_seventh, ext_eighth, ext_ninth, ext_tenth, device_add_id, today_start_value, unit FROM tb_device_point_data_hst_01 WHERE 1=1 at com.dtstack.flinkx.connector.jdbc.source.JdbcInputFormat.openInternal(JdbcInputFormat.java:139) at com.dtstack.flinkx.source.format.BaseRichInputFormat.open(BaseRichInputFormat.java:148) at com.dtstack.flinkx.source.DtInputFormatSourceFunction.run(DtInputFormatSourceFunction.java:126) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:263) Caused by: com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: The last packet successfully received from the server was 373,982 milliseconds ago. The last packet sent successfully to the server was 373,983 milliseconds ago. is longer than the server configured value of 'wait_timeout'. You should consider either expiring and/or testing connection validity before use in your application, increasing the server configured values for client timeouts, or using the Connector/J connection property 'autoReconnect=true' to avoid this problem. at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:423) at com.mysql.jdbc.Util.handleNewInstance(Util.java:425) at com.mysql.jdbc.SQLError.createCommunicationsException(SQLError.java:990) at com.mysql.jdbc.MysqlIO.nextRowFast(MysqlIO.java:2232) at com.mysql.jdbc.MysqlIO.nextRow(MysqlIO.java:1992) at com.mysql.jdbc.MysqlIO.readSingleRowSet(MysqlIO.java:3413) at com.mysql.jdbc.MysqlIO.getResultSet(MysqlIO.java:471) at com.mysql.jdbc.MysqlIO.readResultsForQueryOrUpdate(MysqlIO.java:3115) at com.mysql.jdbc.MysqlIO.readAllResults(MysqlIO.java:2344) at com.mysql.jdbc.MysqlIO.sqlQueryDirect(MysqlIO.java:2739) at com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2482) at com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2440) at com.mysql.jdbc.StatementImpl.executeQuery(StatementImpl.java:1381) at com.dtstack.flinkx.connector.jdbc.source.JdbcInputFormat.executeQuery(JdbcInputFormat.java:769) at com.dtstack.flinkx.connector.jdbc.source.JdbcInputFormat.openInternal(JdbcInputFormat.java:122) ... 5 common frames omitted Caused by: java.io.EOFException: Can not read response from server. Expected to read 255 bytes, read 0 bytes before connection was unexpectedly lost. at com.mysql.jdbc.MysqlIO.readFully(MysqlIO.java:3014) at com.mysql.jdbc.MysqlIO.nextRowFast(MysqlIO.java:2215) ... 16 common frames omitted

What you expected to happen

{ "job": { "content": [{ "reader": { "parameter": { "password": "root", "dataSourceId": 14, "column": [{ "precision": 20, "name": "id", "columnDisplaySize": 20, "type": "BIGINT" }, { "precision": 32, "name": "device_id", "columnDisplaySize": 32, "type": "VARCHAR" }, { "precision": 12, "name": "point", "columnDisplaySize": 12, "type": "VARCHAR" }, { "precision": 32, "name": "hash", "columnDisplaySize": 32, "type": "VARCHAR" }, { "precision": 32, "name": "value", "columnDisplaySize": 32, "type": "VARCHAR" }, { "precision": 19, "name": "acq_time", "columnDisplaySize": 19, "type": "DATETIME" }, { "precision": 12, "name": "ratio", "columnDisplaySize": 12, "type": "FLOAT" }, { "precision": 32, "name": "max_val", "columnDisplaySize": 32, "type": "VARCHAR" }, { "precision": 32, "name": "min_val", "columnDisplaySize": 32, "type": "VARCHAR" }, { "precision": 32, "name": "out_param", "columnDisplaySize": 32, "type": "VARCHAR" }, { "precision": 8, "name": "item_code", "columnDisplaySize": 8, "type": "VARCHAR" }, { "precision": 32, "name": "descr", "columnDisplaySize": 32, "type": "VARCHAR" }, { "precision": 16, "name": "save_hst", "columnDisplaySize": 16, "type": "SMALLINT" }, { "precision": 1, "name": "del_flag", "columnDisplaySize": 1, "type": "TINYINT" }, { "precision": 20, "name": "create_user", "columnDisplaySize": 20, "type": "BIGINT" }, { "precision": 20, "name": "update_user", "columnDisplaySize": 20, "type": "BIGINT" }, { "precision": 19, "name": "create_time", "columnDisplaySize": 19, "type": "DATETIME" }, { "precision": 19, "name": "update_time", "columnDisplaySize": 19, "type": "DATETIME" }, { "precision": 50, "name": "ext_first", "columnDisplaySize": 50, "type": "VARCHAR" }, { "precision": 50, "name": "ext_second", "columnDisplaySize": 50, "type": "VARCHAR" }, { "precision": 50, "name": "ext_third", "columnDisplaySize": 50, "type": "VARCHAR" }, { "precision": 50, "name": "ext_fourth", "columnDisplaySize": 50, "type": "VARCHAR" }, { "precision": 50, "name": "ext_fifth", "columnDisplaySize": 50, "type": "VARCHAR" }, { "precision": 50, "name": "ext_sixth", "columnDisplaySize": 50, "type": "VARCHAR" }, { "precision": 50, "name": "ext_seventh", "columnDisplaySize": 50, "type": "VARCHAR" }, { "precision": 50, "name": "ext_eighth", "columnDisplaySize": 50, "type": "VARCHAR" }, { "precision": 50, "name": "ext_ninth", "columnDisplaySize": 50, "type": "VARCHAR" }, { "precision": 50, "name": "ext_tenth", "columnDisplaySize": 50, "type": "VARCHAR" }, { "precision": 50, "name": "device_add_id", "columnDisplaySize": 50, "type": "VARCHAR" }, { "precision": 8, "name": "today_start_value", "columnDisplaySize": 10, "type": "DECIMAL" }, { "precision": 75, "name": "unit", "columnDisplaySize": 75, "type": "VARCHAR" }], "connection": [{ "jdbcUrl": ["jdbc:mysql://172.18.8.114:3306/Vasyslink_yag001?useSSL=false&useUnicode=true&characterEncoding=utf8"], "table": ["tb_device_point_data_hst_01"] }], "splitPk": "id", "username": "root" }, "name": "mysqlreader" }, "writer": { "parameter": { "tablesColumn": "{\"tb_device_point_data_hst_78\":[{\"key\":\"id\",\"type\":\"bigint\",\"precision\":19,\"columnDisplaySize\":20},{\"key\":\"device_id\",\"type\":\"string\",\"precision\":2147483647,\"columnDisplaySize\":2147483647},{\"key\":\"point\",\"type\":\"string\",\"precision\":2147483647,\"columnDisplaySize\":2147483647},{\"key\":\"hash\",\"type\":\"string\",\"precision\":2147483647,\"columnDisplaySize\":2147483647},{\"key\":\"value\",\"type\":\"string\",\"precision\":2147483647,\"columnDisplaySize\":2147483647},{\"key\":\"acq_time\",\"type\":\"string\",\"precision\":2147483647,\"columnDisplaySize\":2147483647},{\"key\":\"ratio\",\"type\":\"float\",\"precision\":7,\"columnDisplaySize\":24},{\"key\":\"max_val\",\"type\":\"string\",\"precision\":2147483647,\"columnDisplaySize\":2147483647},{\"key\":\"min_val\",\"type\":\"string\",\"precision\":2147483647,\"columnDisplaySize\":2147483647},{\"key\":\"out_param\",\"type\":\"string\",\"precision\":2147483647,\"columnDisplaySize\":2147483647},{\"key\":\"item_code\",\"type\":\"string\",\"precision\":2147483647,\"columnDisplaySize\":2147483647},{\"key\":\"descr\",\"type\":\"string\",\"precision\":2147483647,\"columnDisplaySize\":2147483647},{\"key\":\"save_hst\",\"type\":\"smallint\",\"precision\":5,\"columnDisplaySize\":6},{\"key\":\"del_flag\",\"type\":\"tinyint\",\"precision\":3,\"columnDisplaySize\":4},{\"key\":\"create_user\",\"type\":\"bigint\",\"precision\":19,\"columnDisplaySize\":20},{\"key\":\"update_user\",\"type\":\"bigint\",\"precision\":19,\"columnDisplaySize\":20},{\"key\":\"create_time\",\"type\":\"string\",\"precision\":2147483647,\"columnDisplaySize\":2147483647},{\"key\":\"update_time\",\"type\":\"string\",\"precision\":2147483647,\"columnDisplaySize\":2147483647},{\"key\":\"ext_first\",\"type\":\"string\",\"precision\":2147483647,\"columnDisplaySize\":2147483647},{\"key\":\"ext_second\",\"type\":\"string\",\"precision\":2147483647,\"columnDisplaySize\":2147483647},{\"key\":\"ext_third\",\"type\":\"string\",\"precision\":2147483647,\"columnDisplaySize\":2147483647},{\"key\":\"ext_fourth\",\"type\":\"string\",\"precision\":2147483647,\"columnDisplaySize\":2147483647},{\"key\":\"ext_fifth\",\"type\":\"string\",\"precision\":2147483647,\"columnDisplaySize\":2147483647},{\"key\":\"ext_sixth\",\"type\":\"string\",\"precision\":2147483647,\"columnDisplaySize\":2147483647},{\"key\":\"ext_seventh\",\"type\":\"string\",\"precision\":2147483647,\"columnDisplaySize\":2147483647},{\"key\":\"ext_eighth\",\"type\":\"string\",\"precision\":2147483647,\"columnDisplaySize\":2147483647},{\"key\":\"ext_ninth\",\"type\":\"string\",\"precision\":2147483647,\"columnDisplaySize\":2147483647},{\"key\":\"ext_tenth\",\"type\":\"string\",\"precision\":2147483647,\"columnDisplaySize\":2147483647},{\"key\":\"device_add_id\",\"type\":\"string\",\"precision\":2147483647,\"columnDisplaySize\":2147483647},{\"key\":\"today_start_value\",\"type\":\"decimal\",\"precision\":8,\"columnDisplaySize\":10},{\"key\":\"unit\",\"type\":\"string\",\"precision\":2147483647,\"columnDisplaySize\":2147483647}]}", "dataSourceId": 54, "partition": "pt", "jdbcUrl": "jdbc:hive2://172.18.8.208:10000/Vasyslink_yag001", "defaultFS": "hdfs://172.18.8.207:8020", "writeMode": "overwrite", "maxFileSize": 1073741824, "fieldDelimiter": "\t", "partitionType": "HOUR", "fileType": "text", "charsetName": "UTF-8" }, "name": "hivewriter" } }], "setting": { "log": { "isLogger": false }, "errorLimit": {}, "speed": { "bytes": 0, "channel": 1 } } } }

How to reproduce

1、mysql数据库9000万+ 2、一次性同步:mysql-hive 3、通道设置1

Anything else

No response

Version

1.12_release

Are you willing to submit PR?

Code of Conduct

FlechazoW commented 2 years ago

这个应该是连接池中连接空闲时间过长断开了。可以提个PR去优化。

biandou1313 commented 2 years ago

我通过设置 "fetchSize":"2048000" 可以同步 但是同步时间太慢了 大概需要4个小时 而我使用sqoop 只需要20分钟

------------------ 原始邮件 ------------------ 发件人: "Paddy @.>; 发送时间: 2022年8月11日(星期四) 上午10:14 收件人: @.>; 抄送: @.>; @.>; 主题: Re: [DTStack/chunjun] flinkx1.12性能问题:mysql-hive 数据量9000万+ (Issue #1136)

这个应该是连接池中连接空闲时间过长断开了。可以提个PR去优化。

— Reply to this email directly, view it on GitHub, or unsubscribe. You are receiving this because you authored the thread.Message ID: @.***>

biandou1313 commented 2 years ago

我想咨询一下,flinkx1.12应该也支持flink高可用吧

biandou1313 commented 2 years ago

hive sink不支持多通道

FlechazoW commented 2 years ago

,flinkx1.12应该也支持flink高可用吧

这个是支持的。

biandou1313 commented 2 years ago

image 1660185601(1) 这个效率非常低 需要4个小时

biandou1313 commented 2 years ago

我们hadoop集群性能非常高4台都是60核128G 固态硬盘 使用sqoop 一张表9000万+ 也才20分钟以内

Paddy0523 commented 2 years ago

请问用的是chunjun哪个版本呢 是最新的吗