apache / seatunnel

SeaTunnel is a next-generation super high-performance, distributed, massive data integration tool.
https://seatunnel.apache.org/
Apache License 2.0
8.04k stars 1.82k forks source link

[Bug] [Elasticsearch][Sink]sink cannot write to documents that contain ids #7675

Closed FuYouJ closed 2 weeks ago

FuYouJ commented 2 months ago

Search before asking

What happened

When I want to update a document in ES based on the primary key, an error occurs. The specific reason is that in the ES serializer, if a configurable primary key is detected, it adds the _id property to the document, whereas _id is not actually a property of the document。 QQ_1726984092451 QQ_1726984137936

Analyze the specific reasons for the code: Because the _id field is metadata, it cannot be written as document content.

SeaTunnel Version

2.3.8 dev

SeaTunnel Config

es mapping 
{
  "read_index1" : {
    "mappings" : {
      "properties" : {
        "c_array" : {
          "type" : "long"
        },
        "c_bigint" : {
          "type" : "long"
        },
        "c_boolean" : {
          "type" : "boolean"
        },
        "c_bytes" : {
          "type" : "text",
          "fields" : {
            "keyword" : {
              "type" : "keyword",
              "ignore_above" : 256
            }
          }
        },
        "c_date" : {
          "type" : "date"
        },
        "c_decimal" : {
          "type" : "float"
        },
        "c_double" : {
          "type" : "float"
        },
        "c_float" : {
          "type" : "float"
        },
        "c_int" : {
          "type" : "long"
        },
        "c_map" : {
          "properties" : {
            "key" : {
              "type" : "long"
            }
          }
        },
        "c_smallint" : {
          "type" : "long"
        },
        "c_string" : {
          "type" : "text",
          "fields" : {
            "keyword" : {
              "type" : "keyword",
              "ignore_above" : 256
            }
          }
        },
        "c_timestamp" : {
          "type" : "long"
        },
        "c_tinyint" : {
          "type" : "long"
        }
      }
    }
  }
}

#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

######
###### This config file is a demonstration of streaming processing in seatunnel config
######

env {
  parallelism = 1
  job.mode = "BATCH"
  #checkpoint.interval = 10000
}

source {
  Elasticsearch {
    hosts = ["http://seatunnel-j5l.xxxxxxxxx.com:9200"]
    username = "seatunnel-j5l"
    password = "xxxxxxxxxxxx"
    tls_verify_certificate = false
    tls_verify_hostname = false

    index = "read_index1"
    query = {"match_all":{}}
    source=["_id","c_int"]
  }
}

transform {
}

sink {
  Elasticsearch {
    hosts = ["http://seatunnel-j5l.public.cn-hangzhou.es-serverless.aliyuncs.com:9200"]
    username = "seatunnel-j5l"
    password = "123fu1997JIE"
    tls_verify_certificate = false
    tls_verify_hostname = false
    primary_keys = "_id"

    index = "write_index1"
    "schema_save_mode"="CREATE_SCHEMA_WHEN_NOT_EXIST"
    "data_save_mode"="APPEND_DATA"
  }
}

Running Command

run with example

Error Exception

Caused by: org.elasticsearch.client.ResponseException: method [POST], host [http://seatunnel-j5l.public.cn-hangzhou.es-serverless.aliyuncs.com:9200], URI [/_bulk], status line [HTTP/1.1 400 Bad Request]
{"error":{"root_cause":[{"type":"mapper_parsing_exception","reason":"Field [_id] is a metadata field and cannot be added inside a document. Use the index API request parameters."}],"type":"mapper_parsing_exception","reason":"failed to parse field [_id] of type [_id] in document with id 'JndimZEBRF9Q93Q5GAyM'. Preview of field's value: 'JndimZEBRF9Q93Q5GAyM'","caused_by":{"type":"mapper_parsing_exception","reason":"Field [_id] is a metadata field and cannot be added inside a document. Use the index API request parameters."}},"status":400}
    at org.elasticsearch.client.RestClient.convertResponse(RestClient.java:283)
    at org.elasticsearch.client.RestClient.performRequest(RestClient.java:261)
    at org.elasticsearch.client.RestClient.performRequest(RestClient.java:235)
    at org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsRestClient.bulk(EsRestClient.java:211)
    ... 13 more

Zeta or Flink or Spark Version

No response

Java or Scala Version

No response

Screenshots

No response

Are you willing to submit PR?

Code of Conduct

FuYouJ commented 1 month ago

I started fixing this today

FuYouJ commented 1 month ago

Not sure if this should be treated as a bug

github-actions[bot] commented 3 weeks ago

This issue has been automatically marked as stale because it has not had recent activity for 30 days. It will be closed in next 7 days if no further activity occurs.

github-actions[bot] commented 2 weeks ago

This issue has been closed because it has not received response for too long time. You could reopen it if you encountered similar problems in the future.