itinycheng / flink-connector-clickhouse

Flink SQL connector for ClickHouse. Support ClickHouseCatalog and read/write primary data, maps, arrays to clickhouse.
Apache License 2.0
374 stars 156 forks source link

[Bug]: Checkpointing doesn't trigger records flush to clickhouse #156

Open karpoftea opened 1 month ago

karpoftea commented 1 month ago

What happened?

Originally I've tested connector agains type error failures (type incompatibility between kafka source table and clickhouse sink table): selected from kafka-table integer column (say, json number field as cnt INTEGER in kafka table) and inserted it to clickhouse table column (Int64). If cnt=1 everything works as expected - value is saved to clickhouse. If in clickhouse I change column type to UInt64 and cnt=-1 then exception occurs (which is OK), task restart and after several restart it changes state to RUNNING (so just leaving corrupted message behind). That is not an expected behaviour, because data was lost. Expected behaviour is to stuck and wait for manual resolve (either move offset or change clickhouse table schema).

After I digged into code and found that DynamicTableSink is implemented using OutputFormatProvider/OutputFormat. My guess that OutputFormat does not call flush() when checkpoint occurs and thus checkpointing is always OK. Then I changed connector sink.flush-interval to 10min and set flink checkpoint to 1min, and saw that ClickHouseBatchOutputFormat.flush() is not triggered by checkpoint. Seems like my guess is right.

Can you kindly tell If using OutputFormat as a SinkRuntimeProvider was a design choice? If yes what was the reason for not choosing SinkAPI (org.apache.flink.api.connector.sink2.Sink) for implementation?

Affects Versions

master/1.16.0

What are you seeing the problem on?

Flink-Table-Api (SQL)

How to reproduce

  1. create kafka source table and clickhouse sink table
  2. select from kafka source and insert selected value to clickhouse sink
  3. set flink checkpoint interval to 1m
  4. set sink.flush-interval to 10min
  5. start cluster and submit pipeline
  6. push 1 message to kafka.
  7. wait for a checkpoint
  8. after checkpoint occurs see that (1) offset has moved forward (2) message was not delivered to clickhouse

Relevant log output

No response

Anything else

the core problem is that checkpointing does not trigger flush, so event if sink has exception (flushException) it will be healthy for a flink runtime

Are you willing to submit a PR?

Code of Conduct

itinycheng commented 1 month ago

Hi @karpoftea

Thanks for the insight! The initial version of the project was developed with reference to flink-connector-jdbc 1.12 which uses SinkRuntimeProvider. I didn't realize before that the checkpoint doesn't trigger the flush function, so thank you very much. Maybe I want to use SinkFunctionProvider as a temporary solution, due to its simplicity and ease of implementation, org.apache.flink.api.connector.sink2.Sink is better but requires more effort.

LJSJackson commented 1 month ago

I seem to have encountered the same problem, have you solved it yet? Could you please help me and tell me how to solve it? I hope to receive your reply. Thank you @karpoftea

karpoftea commented 1 month ago

@LJSJackson no I haven’t. I’m on a way to launch this connector as is on production workloads, an if no critical issue arises, then propose a patch for this. I can’t see an easy (1-2 days of coding) way to fix it right now.

itinycheng commented 1 month ago

Hello all:

I made a temporary fix, sorry for the late submission.

Hi @karpoftea: If you have any suggestions for improvements, please feel free to share your thoughts. Your feedback is greatly appreciated. Thank you!