apache / seatunnel

SeaTunnel is a next-generation super high-performance, distributed, massive data integration tool.
https://seatunnel.apache.org/
Apache License 2.0
7.83k stars 1.77k forks source link

2.3导入ES报错 #4316

Closed EthanWang95 closed 1 year ago

EthanWang95 commented 1 year ago

Search before asking

What happened

2.3.0版本 由PG向ES导入数据报错 es mapping :{ "seatunneltest_tst_20230213_1" : { "mappings" : { "acount_y" : { "properties" : { "id" : { "type" : "text", "fields" : { "keyword" : { "type" : "keyword", "ignore_above" : 256 } } }, "type" : { "type" : "text", "fields" : { "keyword" : { "type" : "keyword", "ignore_above" : 256 } } } } } } } }

es settings{ "seatunneltest_tst_20230213_1" : { "settings" : { "index" : { "refresh_interval" : "1s", "number_of_shards" : "5", "provided_name" : "seatunneltest_tst_20230213_1", "creation_date" : "1676280835561", "number_of_replicas" : "0", "uuid" : "dk1wGNFfTzWvG_QtIfHpIg", "version" : { "created" : "6070099" } } } } }

SeaTunnel Version

2.3.0

SeaTunnel Config

env {
  # seatunnel defined streaming batch duration in seconds
 ## [spark.app.name](http://spark.app.name) = "SeaTunnel"
  #spark.sql.catalogImplementation = "hive"
  spark.executor.instances = 10
  spark.executor.cores = 20
  spark.executor.memory = "40g"
}

source {
  jdbc {
    driver = "org.postgresql.Driver"
    url = "jdbc:postgresql://x.x.x.x:5432/xxxxx"
    user = "a_appconnect"
    password = ""
    table = "seatunnel_test"
    result_table_name = "seatunnel_test"
    #jdbc.SSL = "true"
    #jdbc.SSLKeyStorePath = "/data/ludp_jks/keystore.jks"
    query = "select * from seatunnel_test"
 }

}

transform {
  # split data by specific delimiter

# you can also use other filter plugins, such as sql
  # sql {
  #   sql = "select * from accesslog where request_time > 1000"
  #} 
}

sink {
  # choose stdout output plugin to output data to console
  # Console {}
    elasticsearch {
        hosts = ["10.122.33.95:9201"]
        index = "seatunneltest_tst_20230213_1"
    }
}

Running Command

nohup ./bin/start-seatunnel-spark-connector-v2.sh  --master yarn --deploy-mode client --config config/spark.batch.pg.to.es.conf &

Error Exception

2023-03-09 15:59:03 INFO  YarnScheduler:54 - Killing all running tasks in stage 0: Stage cancelled
2023-03-09 15:59:03 INFO  DAGScheduler:54 - ResultStage 0 (save at SinkExecuteProcessor.java:85) failed in 12.895 s due to Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 3, cbi238.cdh.com, executor 5): org.apache.seatunnel.connectors.seatunnel.elasticsearch.exception.ElasticsearchConnectorException: ErrorCode:[COMMON-08], ErrorDescription:[Sql operation failed, such as (execute,addBatch,close) etc...] - ElasticSearch execute batch statement error
        at org.apache.seatunnel.connectors.seatunnel.elasticsearch.sink.ElasticsearchSinkWriter.bulkEsWithRetry(ElasticsearchSinkWriter.java:122)
        at org.apache.seatunnel.connectors.seatunnel.elasticsearch.sink.ElasticsearchSinkWriter.write(ElasticsearchSinkWriter.java:92)
        at org.apache.seatunnel.connectors.seatunnel.elasticsearch.sink.ElasticsearchSinkWriter.write(ElasticsearchSinkWriter.java:50)
        at org.apache.seatunnel.translation.spark.sink.SparkDataWriter.write(SparkDataWriter.java:58)
        at org.apache.seatunnel.translation.spark.sink.SparkDataWriter.write(SparkDataWriter.java:37)
        at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$$anonfun$run$3.apply(WriteToDataSourceV2Exec.scala:118)
        at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$$anonfun$run$3.apply(WriteToDataSourceV2Exec.scala:116)
        at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1394)
        at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:146)
        at org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec$$anonfun$doExecute$2.apply(WriteToDataSourceV2Exec.scala:67)
        at org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec$$anonfun$doExecute$2.apply(WriteToDataSourceV2Exec.scala:66)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:121)
        at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.RuntimeException: Execute given execution failed after retry 3 times
        at org.apache.seatunnel.common.utils.RetryUtils.retryWithException(RetryUtils.java:68)
        at org.apache.seatunnel.connectors.seatunnel.elasticsearch.sink.ElasticsearchSinkWriter.bulkEsWithRetry(ElasticsearchSinkWriter.java:108)
        ... 18 more
Caused by: org.apache.seatunnel.connectors.seatunnel.elasticsearch.exception.ElasticsearchConnectorException: ErrorCode:[ELASTICSEARCH-01], ErrorDescription:[Bulk es response error] - bulk es error,request boy={ "index" :{"_index":"seatunneltest_tst_20230213_1"}}

Flink or Spark Version

spark 2.4.0

Java or Scala Version

No response

Screenshots

No response

Are you willing to submit PR?

Code of Conduct

davidzollo commented 1 year ago

good job. I have assigned this issue to you, by the way, please describe the issue in English.

iture123 commented 1 year ago

This exception is caused by the user using the ES6 cluster without specifying index_type.I think Seatunnel should set a default ES type in ES6 cluster.