itinycheng / flink-connector-clickhouse

Flink SQL connector for ClickHouse. Support ClickHouseCatalog and read/write primary data, maps, arrays to clickhouse.
Apache License 2.0
349 stars 149 forks source link

two table join in flink sink to clickhouse, update old data in mysql, it will delete this row in clickhouse and no insert #24

Closed xuanzhi201111 closed 8 months ago

xuanzhi201111 commented 2 years ago

I want use flink cdc sql create a wide table(sink_ck) in clickhouse, when I create two or more then two flink cdc source table from mysql , such as tb1 ,tb2.
I submit to a job in flink-client : insert into sink_ck SELECT a.*,b.order_name FROM tb1 a LEFT JOIN tb2 b ON a.order_id = b.order_id when tb1 row datas is update in mysql , the row will be delete in sink_ck , why ? I want tb1 row datas is update in mysql ,this row datas same update in clikchouse.

one table use flink cdc sink to clickhouse , insert \ update\ delete is No problem!

one table update is ok, message : INSERT INTO ck_order(id, order_id, merchant_code, merchant_name, create_time, update_time) FORMAT TabSeparated INSERT INTO ck_order(id, order_id, merchant_code, merchant_name, create_time, update_time) FORMAT TabSeparated ALTER TABLE flink.ck_order UPDATE order_id='20042100002', merchant_code='BU0000000002YGP', merchant_name='广州2贸易有限公司_2', create_time='2021-04-21 10:41:22', update_time='2022-04-25 10:33:58' WHERE id=2 ALTER TABLE flink.ck_order UPDATE order_id='20042100002', merchant_code='BU0000000002YGP', merchant_name='广州2贸易有限公司_2', create_time='2021-04-21 10:41:22', update_time='2022-04-25 10:33:58' WHERE id=2

wide table update ,it is delete, no alter table update: INSERT INTO ck_order_detail(id, order_id, merchant_code, merchant_name, create_time, update_time, order_name) FORMAT TabSeparated INSERT INTO ck_order_detail(id, order_id, merchant_code, merchant_name, create_time, update_time, order_name) FORMAT TabSeparated ALTER TABLE flink.ck_order_detail DELETE WHERE id=2 ALTER TABLE flink.ck_order_detail DELETE WHERE id=2

itinycheng commented 2 years ago

Is there any delete event generated by FlinkCDC? On the sink side, only records of RowKind.DELETE type will convert to delete SQL. Refer to:https://github.com/itinycheng/flink-connector-clickhouse/blob/master/src/main/java/org/apache/flink/connector/clickhouse/internal/executor/ClickHouseUpsertExecutor.java

zidane-wcp commented 1 year ago

我也遇到了同样的问题,用jdbc-connector sink到mysql是没问题的,用itinycheng/flink-connector-clickhouse sink到clickhouse时,在mysql中update左表的数据,在clickhouse被delete掉了。

itinycheng commented 1 year ago

@zidane-wcp @xuanzhi201111 This seems caused by the order of statement execution, execution order: insert -> update -> delete, refer to: ClickHouseUpsertExecutor.executeBatch. Check whether option sink.ignore-delete is ture, this conf will prevent the generation of delete statements.

zidane-wcp commented 1 year ago

@itinycheng Thank you for your reply. I tried with option sink.ignore-delete and it does not works. I modified the ClickHouseUpsertExecutor.executeBatch. It seems to solve my problem now, but I don't know if it will cause other problems.
@Override public void executeBatch() throws SQLException { for (ClickHousePreparedStatement clickHousePreparedStatement : Arrays.asList(deleteStmt, insertStmt, updateStmt)) { if (clickHousePreparedStatement != null) { attemptExecuteBatch(clickHousePreparedStatement, maxRetries); LOG.error("executeBatch==========================================================="); LOG.error(clickHousePreparedStatement.asSql()); } } }

itinycheng commented 1 year ago

@itinycheng Thank you for your reply. I tried with option sink.ignore-delete and it does not works. I modified the ClickHouseUpsertExecutor.executeBatch. It seems to solve my problem now, but I don't know if it will cause other problems. @Override public void executeBatch() throws SQLException { for (ClickHousePreparedStatement clickHousePreparedStatement : Arrays.asList(deleteStmt, insertStmt, updateStmt)) { if (clickHousePreparedStatement != null) { attemptExecuteBatch(clickHousePreparedStatement, maxRetries); LOG.error("executeBatch==========================================================="); LOG.error(clickHousePreparedStatement.asSql()); } } }

@zidane-wcp This is simply a statement execution order problem and the actual data order has been lost in a batch; No matter how to change the order, the problem will not be completely solved. I also have no better way to slove this at the connector side currently.

xuanzhi201111 commented 1 year ago

我也遇到了同样的问题,用jdbc-connector sink到mysql是没问题的,用itinycheng/flink-connector-clickhouse sink到clickhouse时,在mysql中update左表的数据,在clickhouse被delete掉了。

能把你的clickhouse连接器的jar包发给我测试一下吗? 949538827@qq.com ,我不会代码,不知道怎么修改。

xuanzhi201111 commented 1 year ago

@itinycheng Thank you for your reply. I tried with option sink.ignore-delete and it does not works. I modified the ClickHouseUpsertExecutor.executeBatch. It seems to solve my problem now, but I don't know if it will cause other problems. @Override public void executeBatch() throws SQLException { for (ClickHousePreparedStatement clickHousePreparedStatement : Arrays.asList(deleteStmt, insertStmt, updateStmt)) { if (clickHousePreparedStatement != null) { attemptExecuteBatch(clickHousePreparedStatement, maxRetries); LOG.error("executeBatch==========================================================="); LOG.error(clickHousePreparedStatement.asSql()); } } }

@zidane-wcp This is simply a statement execution order problem and the actual data order has been lost in a batch; No matter how to change the order, the problem will not be completely solved. I also have no better way to slove this at the connector side currently.

作者,你好,你能把@zidane-wcp的修改合到进代码里吗,这样通过flinkcdc 把mysql关联后的宽表写到CK,修改也能正常同步了。

xuanzhi201111 commented 1 year ago

@itinycheng Thank you for your reply. I tried with option sink.ignore-delete and it does not works. I modified the ClickHouseUpsertExecutor.executeBatch. It seems to solve my problem now, but I don't know if it will cause other problems. @Override public void executeBatch() throws SQLException { for (ClickHousePreparedStatement clickHousePreparedStatement : Arrays.asList(deleteStmt, insertStmt, updateStmt)) { if (clickHousePreparedStatement != null) { attemptExecuteBatch(clickHousePreparedStatement, maxRetries); LOG.error("executeBatch==========================================================="); LOG.error(clickHousePreparedStatement.asSql()); } } }

@zidane-wcp 能把你的clickhouse连接器的jar包发给我测试一下吗? 949538827@qq.com ,我不会代码,不知道怎么修改。

itinycheng commented 1 year ago

@itinycheng Thank you for your reply. I tried with option sink.ignore-delete and it does not works. I modified the ClickHouseUpsertExecutor.executeBatch. It seems to solve my problem now, but I don't know if it will cause other problems. @Override public void executeBatch() throws SQLException { for (ClickHousePreparedStatement clickHousePreparedStatement : Arrays.asList(deleteStmt, insertStmt, updateStmt)) { if (clickHousePreparedStatement != null) { attemptExecuteBatch(clickHousePreparedStatement, maxRetries); LOG.error("executeBatch==========================================================="); LOG.error(clickHousePreparedStatement.asSql()); } } }

@zidane-wcp This is simply a statement execution order problem and the actual data order has been lost in a batch; No matter how to change the order, the problem will not be completely solved. I also have no better way to slove this at the connector side currently.

作者,你好,你能把@zidane-wcp的修改合到进代码里吗,这样通过flinkcdc 把mysql关联后的宽表写到CK,修改也能正常同步了。

@xuanzhi201111 您好,这个代码并没真正解决乱序问题,所以不能合并; 你可以直接直接用上边的代码将ClickHouseUpsertExecutor.executeBatch覆盖一下即可,这只是一个执行顺序的调整; 另外,connector 默认sink.ignore-delete = true, 是不会写出delete语句的,这个配置应该是没问题的;如遇问题请详细给出版本和测试用例;

itinycheng commented 1 year ago

@xuanzhi201111 @zidane-wcp 修复了个sink.ignore-delete配置不生效的问题,应该能通过不生成delete来解决当前问题;https://github.com/itinycheng/flink-connector-clickhouse/commit/06d0aedb1cb606d85cb328f4bbe6befd0892d4ca