GourdErwa / awesome-flink

awesome,Flink,Flink CEP
Apache License 2.0
13 stars 13 forks source link

checkpoint 状态很大的问题 #1

Open ethanlian opened 2 years ago

ethanlian commented 2 years ago

案例可以跑通,但是sink端的状态会很大,预计是changlog table状态保留,这种情况如果解决

GourdErwa commented 2 years ago

分流的方式中间会多出几个步骤,所以 ck 数据量大,如果业务没有需求建议还是以 sql-sql 方式执行, ck 很小的。

建议:

使用 rocksdb 存储并开启增量模式

以下是我的配置 可以参考

` execution.checkpointing.externalized-checkpoint-retention: RETAIN_ON_CANCELLATION execution.checkpointing.max-concurrent-checkpoints: 2 execution.checkpointing.min-pause: 1000 execution.checkpointing.mode: EXACTLY_ONCE execution.checkpointing.timeout: 10min execution.checkpointing.tolerable-failed-checkpoints: 2 execution.checkpointing.unaligned: false state.backend: rocksdb state.checkpoints.dir: hdfs:///user/flink/checkpoints-data state.savepoints.dir: hdfs:///user/flink/savepoints-data state.backend.incremental: true jobmanager.execution.failover-strategy: region

自定义扩展 rocksdb 优化 https://blog.csdn.net/Johnson8702/article/details/123841695

state.backend.rocksdb.block.cache-size: 64m # 默认 8m state.backend.rocksdb.memory.partitioned-index-filters: true # 默认 false state.backend.rocksdb.predefined-options: SPINNING_DISK_OPTIMIZED_HIGH_MEM state.backend.rocksdb.writebuffer.size: 128m state.backend.rocksdb.compaction.level.max-size-level-base: 320m state.backend.rocksdb.writebuffer.count: 5 state.backend.rocksdb.thread.num: 4 state.backend.rocksdb.writebuffer.number-to-merge: 3 state.backend.latency-track.keyed-state-enabled: true `

ethanlian commented 2 years ago

tableConfig 设置这个参数 table.exec.sink.upsert-materialize=NONE ,经测试不会生成大状态 ,CDC场景对结果似乎不会有影响 https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/config/

1032851561 commented 2 years ago

分流的方式中间会多出几个步骤,所以 ck 数据量大,如果业务没有需求建议还是以 sql-sql 方式执行, ck 很小的。

建议:

使用 rocksdb 存储并开启增量模式

以下是我的配置 可以参考

` execution.checkpointing.externalized-checkpoint-retention: RETAIN_ON_CANCELLATION execution.checkpointing.max-concurrent-checkpoints: 2 execution.checkpointing.min-pause: 1000 execution.checkpointing.mode: EXACTLY_ONCE execution.checkpointing.timeout: 10min execution.checkpointing.tolerable-failed-checkpoints: 2 execution.checkpointing.unaligned: false state.backend: rocksdb state.checkpoints.dir: hdfs:///user/flink/checkpoints-data state.savepoints.dir: hdfs:///user/flink/savepoints-data state.backend.incremental: true jobmanager.execution.failover-strategy: region

自定义扩展 rocksdb 优化 https://blog.csdn.net/Johnson8702/article/details/123841695 state.backend.rocksdb.block.cache-size: 64m # 默认 8m state.backend.rocksdb.memory.partitioned-index-filters: true # 默认 false state.backend.rocksdb.predefined-options: SPINNING_DISK_OPTIMIZED_HIGH_MEM state.backend.rocksdb.writebuffer.size: 128m state.backend.rocksdb.compaction.level.max-size-level-base: 320m state.backend.rocksdb.writebuffer.count: 5 state.backend.rocksdb.thread.num: 4 state.backend.rocksdb.writebuffer.number-to-merge: 3 state.backend.latency-track.keyed-state-enabled: true `

分流会多出哪些步骤?我知道的有SinkUpsertMaterializer,会保存cdc的历史记录,感觉状态大应该就是这个地方,看了源码可以通过配置去定期清理。

    public SinkUpsertMaterializer(
            StateTtlConfig ttlConfig,
            TypeSerializer<RowData> serializer,
            GeneratedRecordEqualiser generatedEqualiser) {
        this.ttlConfig = ttlConfig;
        this.serializer = serializer;
        this.generatedEqualiser = generatedEqualiser;
    }
1032851561 commented 2 years ago

分流的方式中间会多出几个步骤,所以 ck 数据量大,如果业务没有需求建议还是以 sql-sql 方式执行, ck 很小的。

建议:

使用 rocksdb 存储并开启增量模式

以下是我的配置 可以参考

` execution.checkpointing.externalized-checkpoint-retention: RETAIN_ON_CANCELLATION execution.checkpointing.max-concurrent-checkpoints: 2 execution.checkpointing.min-pause: 1000 execution.checkpointing.mode: EXACTLY_ONCE execution.checkpointing.timeout: 10min execution.checkpointing.tolerable-failed-checkpoints: 2 execution.checkpointing.unaligned: false state.backend: rocksdb state.checkpoints.dir: hdfs:///user/flink/checkpoints-data state.savepoints.dir: hdfs:///user/flink/savepoints-data state.backend.incremental: true jobmanager.execution.failover-strategy: region

自定义扩展 rocksdb 优化 https://blog.csdn.net/Johnson8702/article/details/123841695 state.backend.rocksdb.block.cache-size: 64m # 默认 8m state.backend.rocksdb.memory.partitioned-index-filters: true # 默认 false state.backend.rocksdb.predefined-options: SPINNING_DISK_OPTIMIZED_HIGH_MEM state.backend.rocksdb.writebuffer.size: 128m state.backend.rocksdb.compaction.level.max-size-level-base: 320m state.backend.rocksdb.writebuffer.count: 5 state.backend.rocksdb.thread.num: 4 state.backend.rocksdb.writebuffer.number-to-merge: 3 state.backend.latency-track.keyed-state-enabled: true `

分流会多出哪些步骤?我知道的有SinkUpsertMaterializer,会保存cdc的历史记录,感觉状态大应该就是这个地方,不清楚这里的状态是否可以通过配置去定期清理?

1032851561 commented 2 years ago

tableConfig 设置这个参数 table.exec.sink.upsert-materialize=NONE ,经测试不会生成大状态 ,CDC场景对结果似乎不会有影响 https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/config/

这个去掉会有问题吗,特别是update操作产生update_before和update_after,对应delete和insert操作,如果乱序了,会引起数据丢失吧?我目前也启用了SinkUpsertMaterializer,但是我发现数据乱序,只能强制不去处理update_before