itinycheng / flink-connector-clickhouse

Flink SQL connector for ClickHouse. Support ClickHouseCatalog and read/write primary data, maps, arrays to clickhouse.
Apache License 2.0
349 stars 149 forks source link

Clickhouse source will pull up all of the table data when the job start, TM will crash down if table data too big #25

Closed LinhaiXu closed 2 years ago

LinhaiXu commented 2 years ago

Any connect options to support limit the data pulling when job start ?

itinycheng commented 2 years ago

Sorry, not support for now, you can read data by partition or limit the number of reads. In batch mode, seems all data will be read at once, is there any way to read data in multiple batches in one source instance?

LinhaiXu commented 2 years ago

Sorry, not support for now, you can read data by partition or limit the number of reads. In batch mode, seems all data will be read at once, is there any way to read data in multiple batches in one source instance?

Thank you for your reply. But my job is in streaming mode, and the source DDL looks like this:

tableEnv.executeSql(
      """
        |CREATE TABLE stock_tbl (
        | `id` string,
        | `stock_id` string,
        | `display_type` int,
        | `display_name` string,
        | `data_type` int,
        | `compare_type` int,
        | `value` double,
        | `data_source_id` string,
        | `date_range_begin` string,
        | `date_range_end` string
        |) WITH (
        | 'connector' = 'clickhouse',
        | 'url' = 'clickhouse://xxx:8123',
        | 'username' = 'xxx',
        | 'password' = 'xxx',
        | 'database-name' = 'xxx',
        | 'table-name' = 'xxx',
        | 'scan.partition.column' = 'xxx',
        | 'scan.partition.num' = '4',
        | 'scan.partition.lower-bound' = '1',
        | 'scan.partition.upper-bound' = '4'
        |)
        |""".stripMargin)

The clickhouse table has 130 million records, how can I limit the number of reads? I didn't see the relevant options in your examples

itinycheng commented 2 years ago
  1. I mean select * from table limit n, but this does not meet your needs.
  2. scan.partition.* can help to read data in parallel, each subtask reads a part of the whole table data.
  3. In streaming mode, data is consumed one by one and sent downstream, it shouldn't cause out of memory, I did a test, in streaming mode, read ClickHouse and write MySQL: image

About your problem: You can use config pipeline.operator-chaining = false to disable chaining job operators, this can help to locate the problem.