DTStack / chunjun

A data integration framework
https://dtstack.github.io/chunjun/
Apache License 2.0
4.01k stars 1.69k forks source link

[Bug] [chunjun-connector-jdbc-base] binlog2mysql报IndexOutOfBoundsException错误 #1208

Open gzusgw opened 2 years ago

gzusgw commented 2 years ago

Search before asking

What happened

binlog2mysql报IndexOutOfBoundsException错误,debug发现: dynamicPreparedStmt.getColumnNameList()方法添加的ColumnNameList元素不是表的字段,只有before和after两个无关的属性 image, 导致在dynamicPreparedStmt.getColumnMeta()中此段逻辑报错(before和after不是目标表的字段,获取的index为-1,报IndexOutOfBoundsException异常) for (String columnName : columnNameList) { int index = nameList.indexOf(columnName); columnTypeList.add(typeList.get(index)); } image

What you expected to happen

cdc实时采集成功

How to reproduce

{ "job": { "content": [ { "reader": { "parameter": { "password": "Pwd@1234_Cmict", "port": 33061, "cat": "insert,update,delete", "host": "10.136.106.163", "jdbcUrl": "jdbc:mysql://10.136.106.163:33061/sgw_test", "column": [ { "name": "test_int", "type": "int" }, { "name": "test_float", "type": "float" }, { "name": "test_double", "type": "double" }, { "name": "test_varchar", "type": "varchar" }, { "name": "test_char", "type": "char" }, { "name": "test_datetime", "type": "datetime" } ], "start": {}, "pavingData": false, "table": [ "sgw_test.src_test_table" ], "username": "root" }, "name": "binlogreader" }, "writer": { "parameter": { "password": "Pwd@1234_Cmict", "updateKey": { "updateKey": [ "test_int" ] }, "column": [ { "name": "test_int", "type": "int" }, { "name": "test_float", "type": "float" }, { "name": "test_double", "type": "double" }, { "name": "test_varchar", "type": "varchar" }, { "name": "test_char", "type": "char" }, { "name": "test_datetime", "type": "datetime" } ], "connection": [ { "jdbcUrl": "jdbc:mysql://10.136.106.163:33061/sgw_test", "table": [ "sgw_test.sink_test_table" ] } ], "writeMode": "insert", "batchSize": 1024, "flushIntervalMills": 10, "username": "root" }, "name": "mysqlwriter" } } ], "setting": { "speed": { "channel": 1, "readerChannel": 1, "writerChannel": 1, "bytes": -1048576 }, "restore": { "isRestore": true, "isStream": true } } } }

Anything else

No response

Version

master

Are you willing to submit PR?

Code of Conduct

FlechazoW commented 2 years ago

首先,这个可能是设计存在问题,上游是实时采集插件时,比如,binlog、logminer时,下游的table参数应该要为"*",同时column 参数可以不需要配置,上下游的匹配关系则是通过“nameMapping”这个配置项。给你一个参考吧

{
  "job": {
    "content": [
      {
        "nameMapping": {
          "schemaMappings": {
            // key 为source端的schema name,value 为sink端的schema name
            "ORACLE": "chunjun_sink"
          },
          "tableMappings": {
            // 下面这个配置表示,source端 ORACLE schema 下端source1表 对应下游的 chunjun_sink schema下的sink1表
            "ORACLE": {
              "source1": "sink1"
            }
          },
          "fieldMappings": {
            // 这个配置是字段映射
            "ORACLE": {
              "source1": {
                "C1": "id",
                "C2": "name"
              }
            }
          }
        },
        "reader": {
          "parameter": {
            "jdbcUrl": "jdbc:oracle:thin:@172.16.100.243:1521:orcl",
            "username": "oracle",
            "password": "oracle",
            "split": true,
            "table": [
              "ORACLE.source1"
            ],
            "listenerOperations": "UPDATE,INSERT,DELETE",
            "startSCN": "482165",
            "readPosition": "current",
            "startTime": 1576540477000,
            "pavingData": false,
            "queryTimeout": 300
          },
          "name": "oraclelogminerreader"
        },
        "writer": {
          "parameter": {
            "postSql": [],
            "writeMode": "INSERT",
            "password": "*****",
            "column": [],
            "connection": [
              {
                "jdbcUrl": "jdbc:mysql://k3:3306/chunjun_sink?useSSL=false",
                "table": [
                  "*"
                ]
              }
            ],
            "preSql": [],
            "username": "root"
          },
          "name": "mysqlwriter"
        }
      }
    ],
    "setting": {
      "speed": {
        "bytes": 0,
        "channel": 1
      }
    }
  }
}
gzusgw commented 2 years ago

josn修改后报错是一样的 { "job": { "content": [ { "nameMapping": { "schemaMappings": { "sgw_test": "sgw_test" }, "tableMappings": { "sgw_test": { "src_test_table": "sink_test_table" } }, "fieldMappings": { "sgw_test": { "src_test_table": { "test_int": "test_int", "test_float": "test_float", "test_double": "test_double", "test_varchar": "test_varchar", "test_char": "test_char", "test_datetime": "test_datetime"

          }
        }
      }
    },
    "reader": {
      "parameter": {
        "password": "Pwd@1234_Cmict",
        "port": 33061,
        "cat": "insert,update,delete",
        "host": "10.136.106.163",
        "jdbcUrl": "jdbc:mysql://10.136.106.163:33061/sgw_test",
        "splitUpdate" : true,
        "start": {},
        "pavingData": false,
        "table": [
          "src_test_table"
        ],
        "username": "root"
      },
      "name": "binlogreader",
      "table": {
        "tableName": "sourceTable"
      }
    },
    "writer": {
      "parameter": {
        "password": "Pwd@1234_Cmict",
        "updateKey": {
          "updateKey": [
            "test_int"
          ]
        },
        "connection": [
          {
            "jdbcUrl": "jdbc:mysql://10.136.106.163:33061/sgw_test",
            "table": [
              "*"
            ]
          }
        ],
        "writeMode": "insert",
        "batchSize": 1024,
        "flushIntervalMills": 10,
        "username": "root"
      },
      "name": "mysqlwriter",
      "table": {
        "tableName": "sinkTable"
      }
    }
  }
],
"setting": {
  "speed": {
    "channel": 1,
    "readerChannel": 1,
    "writerChannel": 1,
    "bytes": -1048576
  },
  "restore": {
    "isRestore": true,
    "isStream": true
  }
}

} }

waryars commented 1 year ago

这边测试下,字段类型是int\varchar是没有问题,但涉及到decimal,datetime类型就不行。