itinycheng / flink-connector-clickhouse

Flink SQL connector for ClickHouse. Support ClickHouseCatalog and read/write primary data, maps, arrays to clickhouse.
Apache License 2.0
349 stars 149 forks source link

多并行度读取重复数据问题 #21

Closed clisho closed 2 years ago

clisho commented 2 years ago

你好,下载代码测试,发现Source在不设置scan.partition.*等参数时,多并行度下,读取了N份数据出来。能如何解决这个问题呢?谢谢!

itinycheng commented 2 years ago

你好,下载代码测试,发现Source在不设置scan.partition.*等参数时,多并行度下,读取了N份数据出来。能如何解决这个问题呢?谢谢!

您好,能提供下具体配置和表相关情况不?

clisho commented 2 years ago

你好,下载代码测试,发现Source在不设置scan.partition.*等参数时,多并行度下,读取了N份数据出来。能如何解决这个问题呢?谢谢!

您好,能提供下具体配置和表相关情况不?

您好! ====================flink代码====================

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment() .setRuntimeMode(RuntimeExecutionMode.BATCH); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

tableEnv.executeSql( "CREATE TABLE event_jian_ying_2 (\n" + " arg STRING,\n" + " user_id DECIMAL(11,0),\n" + " event_time TIMESTAMP\n" + ") WITH (\n" + " 'connector' = 'clickhouse',\n" + " 'username' = 'default',\n" + " 'password' = '...',\n" + " 'url' = '...',\n" + " 'database-name' = '...',\n" + " 'table-name' = 'event_jian_ying_2',\n" + " 'sink.batch-size' = '500',\n" + " 'sink.flush-interval' = '1000',\n" + " 'sink.max-retries' = '3'" + ")");

Table table = tableEnv.sqlQuery("select * from event_jian_ying_2"); table.execute().print();

====================表结构==================== CREATE TABLE dxp.event_jian_ying_2 (

`user_id` UInt64,

`event_time` DateTime,

`arg` String

) ENGINE = MergeTree ORDER BY event_time SETTINGS index_granularity = 8192

我看ClickHouseBatchInputFormat中的createInputSplits方法

@Override public InputSplit[] createInputSplits(int minNumSplits) { int splitNum = parameterValues != null ? parameterValues.length : minNumSplits; return createGenericInputSplits(splitNum); }

还没全看代码,parameterValues 不知道存在什么内容。但是minNumSplits这个的值是执行的并行度,若parameterValues 为空且我执行的查询语句是没分片的,这样每个Split执行同样的查询语句,就导致查询N份了。这种情况splitNum应该设置为1吧?

另外下面个人的建议 1、scan.partition.column能否支持“数值”跟“日期”? 2、scan.partition.num,若不设置取并行度 3、scan.partition.lower-bound、scan.partition.upper-bound若不传值,使用scan.partition.column去获取最小值与最大值,然后进行分片

谢谢!

itinycheng commented 2 years ago

@clisho , parameterValues 是存放 between ? and ? 所需的两个参数的;

  1. scan.partition.column 当前只支持整型数值,其他暂时还没计划支持;
  2. 在不设置scan.partition.num 时没办法多并行度运行(不能确定数据以何种方式并行查找),分布式表除外,可以并行读取local 表;
  3. lower-bound, upper-bound赋默认值这个是个不错的功能,但近期没空闲时间做这块,欢迎帮实现该Feature;

多次读取相同数据是一个bug,计划今晚修复;

itinycheng commented 2 years ago

@clisho Bug fixed https://github.com/itinycheng/flink-connector-clickhouse/commit/96ec3f04a11a6f684b967b413fab0d3f0149635d

clisho commented 2 years ago

@clisho Bug fixed 96ec3f0

ok,谢谢!目前也忙得像狗,等缓过神若有增加一些特性,到时再提交上去