pingcap / tiflow

This repo maintains DM (a data migration platform) and TiCDC (change data capture for TiDB)
Apache License 2.0
422 stars 280 forks source link

Make EXCHANGE PARTITION idempotent #11458

Open kennytm opened 1 month ago

kennytm commented 1 month ago

Is your feature request related to a problem?

TiCDC relies on idempotence of DMLs and DDLs to ensure eventual consistency. Events are executed at least once, especially when retry / pause-resume is involved. In particular, TiCDC assumes that DDLs can be repeatedly executed, and the outcome should be one of:

However, EXCHANGE PARTITION violates this assumption. Retrying EXCHANGE PARTITION is not a no-op, and this will lead to data inconsistency.

Describe the feature you'd like

Find some way to force EXCHANGE PARTITION to be idempotent.

Because of barrierTs, I suppose that only one single DDL will be retried (i.e. seeing …→①→①→②→②→… downstream is possible, but …→①→②→①→②→… not, assuming DDLs ① and ② share an common set of affected tables).

I also assume for each downstream database, each table is replicated only by a single changefeed.

We create a table downstream to record the EXCHANGE PARTITION state

create table if not exists `tidb_cdc`.`ddl_status_v1` (
    `ts` bigint unsigned not null, -- upstream FinishTS of the DDL (or use JobID, whatever)
    `state` enum('prepared', 'executed') not null, -- downstream execution status of the DDL
    primary key (`ts`)
);
  1. Prepare the current execution state

    begin;
    insert ignore into `tidb_cdc`.`ddl_status_v1` values (?1, 'prepared');
    select row_count(), state from `tidb_cdc`.`ddl_status_v1` where ts = ?1;
    commit;
    • if the result is (1, 'prepared'), it means we have never executed the DDL before, so proceed.
    • if the result is (0, 'executed'), it means the DDL has been successfully executed, so skip.
    • if the result is (0, 'prepared'), we don't know whether the DDL has been executed or not, so error and stop (move to "warning" state).
  2. Execute the DDL.

    alter table `a` exchange partition `p0` with table `b`;
    • if the DDL result in any kind of error, just stop. Remember that the EXCHANGE PARTITION statement cannot be retried.
    • if the DDL executed successfully, proceed.
  3. Mark the DDL as executed.

    update `tidb_cdc`.`ddl_status_v1` set state = 'executed' where (ts, state) = (?1, 'prepared');
    • if row_count() is 0, it means we have got external interference and better stop and report an error.
  4. Periodically GC the `tidb_cdc`.`ddl_status_v1` table, removing entries preceding the upstream GC safepoint.

Note that currently the tidb_cdc schema is only reserved when syncpoint is enabled. Implementing this feature means the schema will be unconditionally reserved downstream.

Describe alternatives you've considered

Declare that EXCHANGE PARTITION cannot be replicated.

Note that a similar non-idempotent statement RENAME TABLE a to temp, b to a, temp to b; has been explicitly documented as non-compliant in https://docs.pingcap.com/tidb/stable/ticdc-ddl#rename-multiple-tables-in-a-ddl-statement.

Teachability, Documentation, Adoption, Migration Strategy

There should be clear instruction about how the user should remedy errors from each step.

User could check the total row count of the partition and exchanged table and determine if they should

  1. skip the DDL because it has already been executed but CDC did not notice, or
  2. manually run the DDL because CDC failed to run it, or
  3. other context-dependent solution (e.g. the table to be exchanged contained invalid data)
lance6716 commented 1 month ago

if the result is (0, 'prepared'), we don't know whether the DDL has been executed or not, so error and stop (move to "warning" state).

How about we record which table is partitioned before the DDL, then execute EXCHANGE PARTITION. So when failover we can query the current table structure and compare with the record to know if exchange is happened.

kennytm commented 1 month ago

@lance6716 EXCHANGE PARTITION just swaps the physical table ID of the two objects. The structures of the involved tables remained the same. You can't know if the exchange has happened based on SHOW CREATE TABLE alone.

create table a (a bigint primary key, b bigint) partition by hash (a) partitions 5;
create table b (a bigint primary key, b bigint);
insert into a values (1, 50), (6, 60), (11, 70), (16, 80), (21, 90), (26, 100), (2, 20), (3, 30), (4, 40);
insert into b values (6, -10), (16, -20), (26, -30), (36, -40), (46, -50), (56, -60);

alter table a exchange partition p1 with table b;

You can record the downstream Table IDs of the involved objects, e.g. on TiDB:

> select tidb_table_id from information_schema.tables where (table_schema, table_name) = ('test', 'b');
106
> select tidb_partition_id from information_schema.partitions where (table_schema, table_name, partition_name) = ('test', 'a', 'p1');
111
> alter table a exchange partition p1 with table b;
> select tidb_table_id from information_schema.tables where (table_schema, table_name) = ('test', 'b');
111
> select tidb_partition_id from information_schema.partitions where (table_schema, table_name, partition_name) = ('test', 'a', 'p1');
106

On MySQL (if using ENGINE=InnoDB, requires PROCESS privilege):

> select name, table_id from information_schema.innodb_tables where name in ('test/b', 'test/a#p#p1');
+-------------+----------+
| name        | table_id |
+-------------+----------+
| test/b      |     1076 |
| test/a#p#p1 |     1077 |
+-------------+----------+
> alter table a exchange partition p1 with table b;
> select name, table_id from information_schema.innodb_tables where name in ('test/b', 'test/a#p#p1');
+-------------+----------+
| name        | table_id |
+-------------+----------+
| test/b      |     1077 |
| test/a#p#p1 |     1076 |
+-------------+----------+

On MariaDB the info table to query is information_schema.innodb_sys_tables; but I haven't tested.

I don't consider these viable in general because of the extra privilege requirements.