Shopify / ghostferry

The swiss army knife of live data migrations
https://shopify.github.io/ghostferry
MIT License
694 stars 65 forks source link

Data corruption with composite PK and non-unique ID column with ghostferry-sharding only #285

Closed shuhaowu closed 3 years ago

shuhaowu commented 3 years ago

This is a very specific corner case, but the corruption is caught by the inlineverifier without actually corrupting data for real. I'm not sure why it is the case for now, but I can reliably reproduce it.

To reproduce it yourself, clone this repo, checkout the branch sharding-data-corruption-with-pagination-key-not-ordered, then:

$ cd examples/sharding
$ ./bugreport.sh

This will run you through an interactive session on bug, which should give you approximately the following output:

Building with native build. Learn about native build in Compose here: https://docs.docker.com/go/compose-native-build/
ghostferry_mysql-1_1 is up-to-date
ghostferry_mysql-3_1 is up-to-date
ghostferry_mysql-2_1 is up-to-date
Here's a composite PK table
*************************** 1. row ***************************
       Table: t
Create Table: CREATE TABLE `t` (
  `tenant_id` bigint(20) NOT NULL,
  `col1` varchar(255) COLLATE utf8mb4_unicode_ci NOT NULL,
  `id` bigint(20) NOT NULL AUTO_INCREMENT,
  `d` datetime(6) NOT NULL,
  PRIMARY KEY (`tenant_id`,`col1`),
  KEY `id` (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci
Press enter to continue...

It has two rows in it. We can SELECT * FROM abc.t:
+-----------+------+----+----------------------------+
| tenant_id | col1 | id | d                          |
+-----------+------+----+----------------------------+
|         1 | a    |  2 | 2021-05-20 21:25:53.000000 |
|         1 | z    |  1 | 2021-05-20 21:25:53.000000 |
+-----------+------+----+----------------------------+
Notice how the id column are in reverse order in the above case, likely because the order is in PK order, which is composite on (tenant_id, col1).

ghostferry-sharding built with ghostferry 1.1.0+20210520205254+43a8894
will move tenant tenant_id=1
INFO[0000] hello world from 1.1.0+20210520205254+43a8894  tag=ferry
INFO[0000] connecting to database                        dbname=source dsn="root:<masked>@tcp(127.0.0.1:29291)/?collation=utf8mb4_unicode_ci&multiStatements=true&maxAllowedPacket=0&charset=utf8mb4&sql_mode=%27STRICT_ALL_TABLES%2CNO_BACKSLAS
H_ESCAPES%27&time_zone=%27%2B00%3A00%27" tag=ferry

[cut off for brevity]

ERRO[0001] cutover verification failed for: abc.t [paginationKeys: 1 2 ]   incorrect_tables="[abc.t]" tag=inline-verifier
ERRO[0001] verification failed, aborting run             error="verifier detected data discrepancy: cutover verification failed for: abc.t [paginationKeys: 1 2 ] " tag=sharding
ERRO[0001] ghostferry failed to notify error             error="Post \"http/error\": unsupported protocol scheme \"\"" tag=error_handler
ghostferry-sharding could not emit statsd metric  {{sharding.VerifyCutover [{SourceDB abc} {TargetDB abc}] 1} 958.46µs}
ERRO[0001] fatal error detected                          errfrom=inline_verifier error="verifier detected data discrepancy: cutover verification failed for: abc.t [paginationKeys: 1 2 ] " tag=error_handler
panic: fatal error detected, see logs for details

[cut off for brevity]

Ghostferry failed, with a verifier error, let's take a look at what the table looks like                                                                                                                                                        Press enter to continue

Source database:
+-----------+------+----+----------------------------+
| tenant_id | col1 | id | d                          |
+-----------+------+----+----------------------------+
|         1 | a    |  2 | 2021-05-20 21:25:53.000000 |
|         1 | z    |  1 | 2021-05-20 21:25:53.000000 |
+-----------+------+----+----------------------------+
Target database:
+-----------+------+----+----------------------------+
| tenant_id | col1 | id | d                          |
+-----------+------+----+----------------------------+
|         1 | 1    |  2 | 2021-05-20 21:25:53.000000 |
|         2 | 1    |  1 | 2021-05-20 21:25:53.000000 |
+-----------+------+----+----------------------------+
This is a major amount of corruption...

To see this work with copydb (after running the above script):

It will get to wait for cutover (INFO[0000] data copy is complete, waiting for cutover tag=ferry). This is fine. At this point, query the databases:

Source:

$ mysql -u root -h 127.0.0.1 -P 29291 -e "select * from abc.t" #
+-----------+------+----+----------------------------+
| tenant_id | col1 | id | d                          |
+-----------+------+----+----------------------------+
|         1 | a    |  2 | 2021-05-20 21:25:53.000000 |
|         1 | z    |  1 | 2021-05-20 21:25:53.000000 |
+-----------+------+----+----------------------------+

Target:

$ mysql -u root -h 127.0.0.1 -P 29292 -e "select * from abc.t"
+-----------+------+----+----------------------------+
| tenant_id | col1 | id | d                          |
+-----------+------+----+----------------------------+
|         1 | a    |  2 | 2021-05-20 21:25:53.000000 |
|         1 | z    |  1 | 2021-05-20 21:25:53.000000 |
+-----------+------+----+----------------------------+
karmakaze commented 3 years ago

Local tests. Am able to reproduce error conditions in description above.

    return sq.Select(f.selectString(table)).            From(quotedTable + " " + f.shardingKeyIndexHint(table)).            Where(sq.Eq{quotedShardingKey: f.ShardingValue}).           Where(sq.Gt{quotedPK: lastPk}).         Limit(batchSize).           OrderBy(quotedPK), nil

Test with this change to filter.go effectively reverting the optimization to use self-join for performance:

-   return sq.Select(columns...).
-       From(quotedTable).
-       Join("("+selectPaginationKeys+") AS `batch` USING("+quotedPaginationKey+")", f.ShardingValue, lastPaginationKey), nil
+   return sq.Select(columns...).
+       From(quotedTable).
+       Where(sq.Eq{quotedShardingKey: f.ShardingValue}).
+       Where(sq.Gt{quotedPaginationKey: lastPaginationKey}).
+       OrderBy(quotedPaginationKey).
+       Limit(batchSize), nil

Running bugreport.sh produces the correct result:

Target database:
+-----------+------+----+----------------------------+
| tenant_id | col1 | id | d                          |
+-----------+------+----+----------------------------+
|         1 | a    |  2 | 2021-06-03 23:01:27.000000 |
|         1 | z    |  1 | 2021-06-03 23:01:27.000000 |
+-----------+------+----+----------------------------+

Note that when we do a self-join we end up with a duplicate id (paginationKey) column from the joined table compared to the simple table query.

karmakaze commented 3 years ago

Back to unmodified code, now lets try fiddling with the column names. Again in filter.go:

-   selectPaginationKeys := "SELECT " + quotedPaginationKey + " FROM " + quotedTable + " " + f.shardingKeyIndexHint(table) +
+   selectPaginationKeys := "SELECT " + quotedPaginationKey + " AS __ghostferry_nested_pagination_key FROM " + quotedTable + " " + f.shardingKeyIndexHint(table) +

This shouldn't have change the behaviour but it did producing these error messages:

ERRO[0000] failed to prepare query                       args="[1 0]" error="Error 1054: Unknown column 'id' in 'from clause'" sql="SELECT [omitted] FROM  `abc`.`t` JOIN (SELECT `id` AS __ghostferry_nested_pagination_key " table=abc.t tag=cursor
ERRO[0000] failed to fetch rows, 1 of 5 max retries      error="Error 1054: Unknown column 'id' in 'from clause'" table=abc.t tag=cursor
...
ERRO[0000] failed to fetch rows after 5 attempts, retry limit exceeded  error="Error 1054: Unknown column 'id' in 'from clause'" table=abc.t tag=cursor
ERRO[0000] failed to iterate table                       error="Error 1054: Unknown column 'id' in 'from clause'" table=abc.t tag=data_iterator
ERRO[0000] ghostferry failed to notify error             error="Post \"http/error\": unsupported protocol scheme \"\"" tag=error_handler
ERRO[0000] fatal error detected                          errfrom=data_iterator error="Error 1054: Unknown column 'id' in 'from clause'" tag=error_handler

Maybe(?) selecting an unqualified column name with Squirrel, selects multiple occurrences. [Edit: defaulting to SELECT * all columns below is more likely the culprit.]

OK so undo the above change so we're back to original source, and instead change data_iterator.go:

    if len(cursor.ColumnsToSelect) == 0 {
-       cursor.ColumnsToSelect = []string{"*"}
+       cursor.ColumnsToSelect = []string{"tenant_id", "col1", "id", "d"}
    }

Running bugreport.sh produces the correct result:

Target database:
+-----------+------+----+----------------------------+
| tenant_id | col1 | id | d                          |
+-----------+------+----+----------------------------+
|         1 | a    |  2 | 2021-06-03 23:27:47.000000 |
|         1 | z    |  1 | 2021-06-03 23:27:47.000000 |
+-----------+------+----+----------------------------+

So defaulting to select * all columns, when using the self-join optimization could change the number of selected columns throwing things off.

karmakaze commented 3 years ago

While playing around, I did also see some logs that had the formatting like:

some info or error message... 2 -> 1

and searching the source matches something similar to:

    return fmt.Errorf("during generating sql query at paginationKey %v -> %v: %v", startPaginationKeypos, endPaginationKeypos, err)

It's strange that the first (start) is larger than the second (end) value. This may indicate that the last row of a batch is used to remember the next batch start position which will not be correct if the batch query does not specify an ORDER BY and naturally doesn't order by the pagination key.

If adding back ORDER BY hinders performance, the next batch position could be the max(pagination column) of all the fetched batch rows instead.

karmakaze commented 3 years ago

All the above are guesses/hints as to what the actual sources of the problem are but is at least some starting points for someone more familiar with how the program is meant to operate. GL HF.

tiwilliam commented 3 years ago

After some more digging, this doesn't seem to have anything to do with the order of the rows, but rather the column order. The two queries below generate same row order and content, but have different column order which leads to corruption in my local env with the first query.

ShardedCopyFilter.BuildSelect with bug:

mysql> SELECT *, MD5(CONCAT(MD5(COALESCE(`tenant_id`, 'NULL_PBj}b]74P@JTo$5G_null')),MD5(COALESCE(`col1`, 'NULL_PBj}b]74P@JTo$5G_null')),MD5(COALESCE(`id`, 'NULL_PBj}b]74P@JTo$5G_null')),MD5(COALESCE(`d`, 'NULL_PBj}b]74P@JTo$5G_null')))) AS __ghostferry_row_md5 FROM `abc`.`t` JOIN (SELECT `id` FROM `abc`.`t` IGNORE INDEX (PRIMARY) WHERE `tenant_id` = 1 AND `id` > 0 ORDER BY `id` LIMIT 200) AS `batch` USING(`id`) ORDER BY `id`;
+----+-----------+------+----------------------------+----------------------------------+
| id | tenant_id | col1 | d                          | __ghostferry_row_md5             |
+----+-----------+------+----------------------------+----------------------------------+
|  1 |         1 | z    | 2021-06-04 12:55:15.000000 | a23415aea273e1eb40d823185efb4980 |
|  2 |         1 | a    | 2021-06-04 12:55:15.000000 | f81a32a00a35d7e71c42d7f8585fd0bd |
+----+-----------+------+----------------------------+----------------------------------+
2 rows in set (0.00 sec)

ShardedCopyFilter.BuildSelect without bug:

mysql> SELECT *, MD5(CONCAT(MD5(COALESCE(`tenant_id`, 'NULL_PBj}b]74P@JTo$5G_null')),MD5(COALESCE(`col1`, 'NULL_PBj}b]74P@JTo$5G_null')),MD5(COALESCE(`id`, 'NULL_PBj}b]74P@JTo$5G_null')),MD5(COALESCE(`d`, 'NULL_PBj}b]74P@JTo$5G_null')))) AS __ghostferry_row_md5 FROM `abc`.`t` WHERE `tenant_id` = 1 AND `id` > 0 ORDER BY `id` LIMIT 200;
+-----------+------+----+----------------------------+----------------------------------+
| tenant_id | col1 | id | d                          | __ghostferry_row_md5             |
+-----------+------+----+----------------------------+----------------------------------+
|         1 | z    |  1 | 2021-06-04 12:55:15.000000 | a23415aea273e1eb40d823185efb4980 |
|         1 | a    |  2 | 2021-06-04 12:55:15.000000 | f81a32a00a35d7e71c42d7f8585fd0bd |
+-----------+------+----+----------------------------+----------------------------------+
2 rows in set (0.01 sec)

Below is the batch writer INSERT when running the faulty code, data in the RowBatch struct is maintained from Cursor, but columns are in order of columns passed to BuildSelect. We fail to map these back properly when our JOIN change the return order in Fetch.

INSERT IGNORE INTO `abc`.`t` (`tenant_id`,`col1`,`id`,`d`) VALUES (
    2, 1, 'a', '2021-06-04 12:55:15'
),(
    1, 1, 'z', '2021-06-04 12:55:15'
)