apache / seatunnel

SeaTunnel is a next-generation super high-performance, distributed, massive data integration tool.
https://seatunnel.apache.org/
Apache License 2.0
7.87k stars 1.78k forks source link

Description MYSQL-CDC failed to synchronize data #7571

Open gaotong521 opened 1 month ago

gaotong521 commented 1 month ago

Search before asking

What happened

When using mysqL-CDC to synchronize data, both the source table and the target table are of mysql type. If the two tables have different structures, data synchronization will fail even if the transform of field mapping is configured

SeaTunnel Version

2.3.5

SeaTunnel Config

{
    "env": {
        "job.mode": "STREAMING",
        "job.name": "seatunnel_mysql_cdc",
        "parallelism": 1,
        "checkpoint.interval": 1000
    },
    "source": [
        {
            "plugin_name": "MySQL-CDC",
            "base-url": "jdbc:mysql://127.0.0.1:3306/data_source_test",
            "username": "root",
            "password": "root",
            "table-names": [
                "data_source_test.cn_region_dict"
            ],
            "snapshot.split.size": 100000,
            "startup.mode": "initial",
            "result_table_name": "table_source_001"
        }
    ],
    "transform": [
        {
            "plugin_name": "Replace",
            "source_table_name": "table_source_001",
            "result_table_name": "table_source_002",
            "replace_field": "region_name",
            "pattern": "==",
            "replacement": "--",
            "is_regex": false,
            "replace_first": false
        },
        {
            "plugin_name": "FieldMapper",
            "source_table_name": "table_source_002",
            "result_table_name": "table_source_003",
            "field_mapper": {
                "id": "id_old",
                "region_code": "region_code_old",
                "parent_region_code": "parent_region_code",
                "region_name": "region_name_old",
                "region_level": "region_level_old",
                "longitude": "longitude",
                "latitude": "latitude",
                "zoom_level": "zoom_level",
                "sort_id": "sort_id",
                "deleted": "deleted",
                "create_time": "create_time"
            }
        }
    ],
    "sink": [
        {
            "source_table_name": "table_source_003",
            "plugin_name": "Jdbc",
            "url": "jdbc:mysql://127.0.0.1:3306/data_dest_test?rewriteBatchedStatements=true",
            "database": "data_dest_test",
            "table": "data_dest_test.cn_region_dict",
            "user": "root",
            "password": "root",
            "driver": "com.mysql.jdbc.Driver",
            "generate_sink_sql": true,
            "schema_save_mode": "CREATE_SCHEMA_WHEN_NOT_EXIST",
            "data_save_mode": "DROP_DATA"
        }
    ]
}

Running Command

Run via dolphinscheduler

Error Exception

When the source table is updated, the target table data does not change

Zeta or Flink or Spark Version

No response

Java or Scala Version

No response

Screenshots

No response

Are you willing to submit PR?

Code of Conduct

github-actions[bot] commented 5 days ago

This issue has been automatically marked as stale because it has not had recent activity for 30 days. It will be closed in next 7 days if no further activity occurs.