apache / seatunnel

SeaTunnel is a next-generation super high-performance, distributed, massive data integration tool.
https://seatunnel.apache.org/
Apache License 2.0
8.06k stars 1.83k forks source link

[Bug] [JDBC] JDBC Source lost data, occasionally reoccur #7071

Open wangzhiwei61 opened 4 months ago

wangzhiwei61 commented 4 months ago

Search before asking

What happened

Data migration is now performed through the REST API. It is basically executed all the time. The amount of data each time does not exceed 20,000. Data loss occurs every 4-5 hours and reappears stably.

SeaTunnel Version

2.3.5

SeaTunnel Config

There are more than 200 tables, but not all are displayed for convenience.
{
    "env": {
        "parallelism": "1",
        "job.mode": "BATCH",
        "checkpoint.interval": "10000",
        "checkpoint.timeout": "60000",
        "execution.checkpoint.interval": "10000",
        "read_limit.rows_per_second": null,
        "read_limit.bytes_per_second": null,
        "job.retry.times": 3
    },
    "source": [{
        "tableInfo": {
            "columnName": "updated_time",
            "dataType": 93,
            "tableName": "work_order_detail",
            "primaryKeys": [{
                "primaryKey": "id",
                "dataType": -5,
                "autoColumn": true
            }],
            "tableEarliestTime": "1719193899073"
        },
        "password": "P@ssw0rd",
        "driver": "com.microsoft.sqlserver.jdbc.SQLServerDriver",
        "query": "select * from work_order_detail  where updated_time >= %s and  updated_time <= %s",
        "result_table_name": "switch_work_order_detail",
        "plugin_name": "Jdbc",
        "user": "sa",
        "url": "jdbc:sqlserver://118.118.1.127:1433;DatabaseName=hs_test;encrypt=false;sendStringParametersAsUnicode=false"
    }],
    "transform": [{
        "tableInfo": {
            "columnName": "updated_time",
            "dataType": 93,
            "tableName": "work_order_detail",
            "primaryKeys": [{
                "primaryKey": "id",
                "dataType": -5,
                "autoColumn": true
            }],
            "tableEarliestTime": "1719193899073"
        },
        "driver": "com.microsoft.sqlserver.jdbc.SQLServerDriver",
        "query": "select * from switch_work_order_detail  where updated_time >= %s and  updated_time <= %s",
        "source_table_name": "switch_work_order_detail",
        "result_table_name": "work_order_detail",
        "plugin_name": "Sql"
    }],
    "sink": [{
        "password": "P@ssw0rd",
        "database": "hs_target",
        "driver": "com.microsoft.sqlserver.jdbc.SQLServerDriver",
        "query": "SET IDENTITY_INSERT [work_order_detail] ON;MERGE INTO work_order_detail AS target USING (VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)) AS source ([id],[work_order_id],[plant_id],[description],[created_by],[created_time],[updated_by],[updated_time],[trx_id],[creation_pid],[update_pid],[version],[process_task_number],[task_status],[station_code],[tray_id],[serial_number],[laser_number],[out_okng],[is_last_operation],[parses_status],[rework_times],[sample],[next_rework_station]) ON (target.[id] = source.[id]) WHEN MATCHED THEN     UPDATE SET target.[work_order_id] =source.[work_order_id],target.[plant_id] =source.[plant_id],target.[description] =source.[description],target.[created_by] =source.[created_by],target.[created_time] =source.[created_time],target.[updated_by] =source.[updated_by],target.[updated_time] =source.[updated_time],target.[trx_id] =source.[trx_id],target.[creation_pid] =source.[creation_pid],target.[update_pid] =source.[update_pid],target.[version] =source.[version],target.[process_task_number] =source.[process_task_number],target.[task_status] =source.[task_status],target.[station_code] =source.[station_code],target.[tray_id] =source.[tray_id],target.[serial_number] =source.[serial_number],target.[laser_number] =source.[laser_number],target.[out_okng] =source.[out_okng],target.[is_last_operation] =source.[is_last_operation],target.[parses_status] =source.[parses_status],target.[rework_times] =source.[rework_times],target.[sample] =source.[sample],target.[next_rework_station] =source.[next_rework_station] WHEN NOT MATCHED THEN     INSERT ([id],[work_order_id],[plant_id],[description],[created_by],[created_time],[updated_by],[updated_time],[trx_id],[creation_pid],[update_pid],[version],[process_task_number],[task_status],[station_code],[tray_id],[serial_number],[laser_number],[out_okng],[is_last_operation],[parses_status],[rework_times],[sample],[next_rework_station]) VALUES (source.[id],source.[work_order_id],source.[plant_id],source.[description],source.[created_by],source.[created_time],source.[updated_by],source.[updated_time],source.[trx_id],source.[creation_pid],source.[update_pid],source.[version],source.[process_task_number],source.[task_status],source.[station_code],source.[tray_id],source.[serial_number],source.[laser_number],source.[out_okng],source.[is_last_operation],source.[parses_status],source.[rework_times],source.[sample],source.[next_rework_station]);SET IDENTITY_INSERT [work_order_detail] OFF;",
        "source_table_name": "work_order_detail",
        "plugin_name": "Jdbc",
        "user": "sa",
        "url": "jdbc:sqlserver://118.118.1.127:1433;DatabaseName=hs_target;encrypt=false;sendStringParametersAsUnicode=false"
    }]
}

Running Command

java -Xms2G -Xmx4G -Dseatunnel.config=/usr/local/src/apache-seatunnel-2.3.5/config/seatunnel.yaml -Dhazelcast.config=/usr/local/src/apache-seatunnel-2.3.5/config/hazelcast.yaml -Dlog4j2.contextSelector=org.apache.logging.log4j.core.async.AsyncLoggerContextSelector -Dlog4j2.configurationFile=/usr/local/src/apache-seatunnel-2.3.5/config/log4j2.properties -Dseatunnel.logs.path=/usr/local/src/apache-seatunnel-2.3.5/logs -Dseatunnel.logs.file_name=seatunnel-engine-server -Xms2g -Xmx2g -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/tmp/seatunnel/dump/zeta-server -XX:MaxMetaspaceSize=2g -XX:+UseG1GC -cp /usr/local/src/apache-seatunnel-2.3.5/lib/*:/usr/local/src/apache-seatunnel-2.3.5/starter/seatunnel-starter.jar org.apache.seatunnel.core.starter.seatunnel.SeaTunnelServer -d

Error Exception

No Exception

Zeta or Flink or Spark Version

zeta

Java or Scala Version

1.8

Screenshots

No response

Are you willing to submit PR?

Code of Conduct

wangzhiwei61 commented 4 months ago

@liugddx Do you need to provide any other information? Can you help me?

github-actions[bot] commented 3 months ago

This issue has been automatically marked as stale because it has not had recent activity for 30 days. It will be closed in next 7 days if no further activity occurs.