apache / seatunnel

SeaTunnel is a next-generation super high-performance, distributed, massive data integration tool.
https://seatunnel.apache.org/
Apache License 2.0
8.02k stars 1.82k forks source link

[Bug] [mysql-cdc 同步es]如果值为NULL,ES插入的时候会被忽略 #7429

Closed klj218 closed 2 months ago

klj218 commented 2 months ago

Search before asking

What happened

通过mysql-cdc导入到es source: title content abc 123 cdf NULL 这时候就会发现数据同步到es之后,title=cdf的这条记录,content字段会被忽略,这个可以在哪里设置?

SeaTunnel Version

2.3.6

SeaTunnel Config

env {
  parallelism = 4
  job.mode = "STREAMING"
  checkpoint.interval = 10000
}

source {
  MySQL-CDC {
    base-url = "jdbc:mysql://127.0.0.1:3306/test_db"
    username = "test"
    password = "test"
    table-names = ["test_db.test"]
    result_table_name  = "src_test_log"
  }
}

transform {  
  Sql {
    source_table_name = "src_test_log"
        result_table_name = "dst_test_log"
    query = "select title, content from test"
  }
}

sink {
    Elasticsearch {
        hosts = ["10.199.26.14:9200"]
        index = "test"

    result_table_name  = "dst_test_log"
        # CDC required options
        primary_keys = ["title"]
    schema_save_mode = "CREATE_SCHEMA_WHEN_NOT_EXIST"
        data_save_mode = "APPEND_DATA"
    }
}

Running Command

seatunnel.cmd -e local --config ./test.conf.template

Error Exception

content为NONE的数据,es同步的时候会被忽略

Zeta or Flink or Spark Version

No response

Java or Scala Version

No response

Screenshots

No response

Are you willing to submit PR?

Code of Conduct

corgy-w commented 2 months ago

set mapping before creating index

klj218 commented 2 months ago

set mapping before creating index

同样的配置文件 2.3.3版本同步的时候,是没有忽略NONE 2.3.6版本就会忽略

klj218 commented 2 months ago

分析了一下源代码:ElasticsearchRowSerializer.java 2.3.3版本: private Map<String, Object> toDocumentMap(SeaTunnelRow row) { String[] fieldNames = seaTunnelRowType.getFieldNames(); Map<String, Object> doc = new HashMap<>(fieldNames.length); Object[] fields = row.getFields(); for (int i = 0; i < fieldNames.length; i++) { Object value = fields[i]; if (value instanceof Temporal) { // jackson not support jdk8 new time api doc.put(fieldNames[i], value.toString()); } else { doc.put(fieldNames[i], value); } } return doc; }

2.3.6版本: private Map<String, Object> toDocumentMap(SeaTunnelRow row, SeaTunnelRowType rowType) { String[] fieldNames = rowType.getFieldNames(); Map<String, Object> doc = new HashMap<>(fieldNames.length); Object[] fields = row.getFields(); for (int i = 0; i < fieldNames.length; i++) { Object value = fields[i]; if (value == null) { } else if (value instanceof SeaTunnelRow) { doc.put( fieldNames[i], toDocumentMap( (SeaTunnelRow) value, (SeaTunnelRowType) rowType.getFieldType(i))); } else { doc.put(fieldNames[i], convertValue(value)); } } return doc; }

应该就是 if (value == null) {}这里引起的,那如果是这样的话,必须在同步之前强制进行es mapping提交了,想知道一下这里加null判断的初衷是什么?

corgy-w commented 2 months ago

分析了一下源代码:ElasticsearchRowSerializer.java 2.3.3版本: private Map<String, Object> toDocumentMap(SeaTunnelRow row) { String[] fieldNames = seaTunnelRowType.getFieldNames(); Map<String, Object> doc = new HashMap<>(fieldNames.length); Object[] fields = row.getFields(); for (int i = 0; i < fieldNames.length; i++) { Object value = fields[i]; if (value instanceof Temporal) { // jackson not support jdk8 new time api doc.put(fieldNames[i], value.toString()); } else { doc.put(fieldNames[i], value); } } return doc; }

2.3.6版本: private Map<String, Object> toDocumentMap(SeaTunnelRow row, SeaTunnelRowType rowType) { String[] fieldNames = rowType.getFieldNames(); Map<String, Object> doc = new HashMap<>(fieldNames.length); Object[] fields = row.getFields(); for (int i = 0; i < fieldNames.length; i++) { Object value = fields[i]; if (value == null) { } else if (value instanceof SeaTunnelRow) { doc.put( fieldNames[i], toDocumentMap( (SeaTunnelRow) value, (SeaTunnelRowType) rowType.getFieldType(i))); } else { doc.put(fieldNames[i], convertValue(value)); } } return doc; }

应该就是 if (value == null) {}这里引起的,那如果是这样的话,必须在同步之前强制进行es mapping提交了,想知道一下这里加null判断的初衷是什么?

ok, I'll check it and let you know