apache / seatunnel

SeaTunnel is a next-generation super high-performance, distributed, massive data integration tool.
https://seatunnel.apache.org/
Apache License 2.0
7.83k stars 1.77k forks source link

The problem of ST importing ES means that the https protocol is not supported #4341

Closed EthanWang95 closed 1 year ago

EthanWang95 commented 1 year ago

Search before asking

What happened

The problem of ST importing ES means that the https protocol is not supported

SeaTunnel Version

2.3.0

SeaTunnel Config

env {
  # seatunnel defined streaming batch duration in seconds
 ## [spark.app.name](http://spark.app.name) = "SeaTunnel"
  #spark.sql.catalogImplementation = "hive"
  spark.executor.instances = 10
  spark.executor.cores = 20
  spark.executor.memory = "40g"
}
source {
  jdbc {
    driver = "org.postgresql.Driver"
    url = "jdbc:postgresql://****:5432/cdp_edge"
    user = "dbadmin"
    password = "****"
    table = "cdp_rep_master"
    result_table_name = "cdp_rep_master"
    #jdbc.SSL = "true"
    #jdbc.SSLKeyStorePath = "/data/ludp_jks/keystore.jks"
    query = "SELECT *,zbpli as rowkey from cml.cdp_rep_master where to_timestamp(update_timestamp,'yyyyMMddhh24miss') > current_date - interval '7 day'"
 }
}
transform {
  # split data by specific delimiter

# you can also use other filter plugins, such as sql
  # sql {
  #   sql = "select * from accesslog where request_time > 1000"
  #} 
}

sink {
  # choose stdout output plugin to output data to console
  # Console {}
    elasticsearch {
        hosts = ["vpc-cdpsupportquotation-es-tst-nwniunh2eqrlgac5varhtd7b3u.us-east-1.es.amazonaws.com:443"]
        index = "cdp_rep_master_uat_20230313_01"
        #index_time_format = "yyyyMMdd"
        index_type = "seatunnel"
    }
}

Running Command

nohup ./bin/start-seatunnel-spark-connector-v2.sh  --master yarn --deploy-mode client --config config/toes/prod.cml.cdp_rep_master_append.conf &

Error Exception

2023-03-13 15:29:49 INFO  BlockManagerInfo:54 - Added broadcast_0_piece0 in memory on cbi238.cdh.com:33581 (size: 4.2 KB, free: 21.2 GB)
2023-03-13 15:29:51 WARN  TaskSetManager:66 - Lost task 0.0 in stage 0.0 (TID 0, cbi238.cdh.com, executor 5): org.apache.spark.util.TaskCompletionListenerException: java.lang.NullPointerException

Previous exception in task: ErrorCode:[ELASTICSEARCH-02], ErrorDescription:[Get elasticsearch version failed] - fail to get elasticsearch version.
    org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsRestClient.getClusterVersion(EsRestClient.java:145)
    org.apache.seatunnel.connectors.seatunnel.elasticsearch.sink.ElasticsearchSinkWriter.<init>(ElasticsearchSinkWriter.java:75)
    org.apache.seatunnel.connectors.seatunnel.elasticsearch.sink.ElasticsearchSink.createWriter(ElasticsearchSink.java:78)
    org.apache.seatunnel.translation.spark.sink.SparkDataWriterFactory.createDataWriter(SparkDataWriterFactory.java:45)
    org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:113)
    org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec$$anonfun$doExecute$2.apply(WriteToDataSourceV2Exec.scala:67)
    org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec$$anonfun$doExecute$2.apply(WriteToDataSourceV2Exec.scala:66)
    org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    org.apache.spark.scheduler.Task.run(Task.scala:121)
    org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
    org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
    org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
    java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    java.lang.Thread.run(Thread.java:748)
    at org.apache.spark.TaskContextImpl.invokeListeners(TaskContextImpl.scala:138)
    at org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:116)
    at org.apache.spark.scheduler.Task.run(Task.scala:131)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

2023-03-13 15:29:51 INFO  TaskSetManager:54 - Starting task 0.1 in stage 0.0 (TID 1, cbi238.cdh.com, executor 5, partition 0, PROCESS_LOCAL, 11210 bytes)
2023-03-13 15:29:51 INFO  TaskSetManager:54 - Lost task 0.1 in stage 0.0 (TID 1) on cbi238.cdh.com, executor 5: org.apache.spark.util.TaskCompletionListenerException (java.lang.NullPointerException

Previous exception in task: ErrorCode:[ELASTICSEARCH-02], ErrorDescription:[Get elasticsearch version failed] - fail to get elasticsearch version.
    org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsRestClient.getClusterVersion(EsRestClient.java:145)
    org.apache.seatunnel.connectors.seatunnel.elasticsearch.sink.ElasticsearchSinkWriter.<init>(ElasticsearchSinkWriter.java:75)
    org.apache.seatunnel.connectors.seatunnel.elasticsearch.sink.ElasticsearchSink.createWriter(ElasticsearchSink.java:78)
    org.apache.seatunnel.translation.spark.sink.SparkDataWriterFactory.createDataWriter(SparkDataWriterFactory.java:45)
    org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:113)
    org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec$$anonfun$doExecute$2.apply(WriteToDataSourceV2Exec.scala:67)
    org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec$$anonfun$doExecute$2.apply(WriteToDataSourceV2Exec.scala:66)
    org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    org.apache.spark.scheduler.Task.run(Task.scala:121)
    org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
    org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
    org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
    java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    java.lang.Thread.run(Thread.java:748)) [duplicate 1]
2023-03-13 15:29:51 INFO  TaskSetManager:54 - Starting task 0.2 in stage 0.0 (TID 2, cbi241.cdh.com, executor 4, partition 0, PROCESS_LOCAL, 11210 bytes)
2023-03-13 15:29:52 INFO  BlockManagerInfo:54 - Added broadcast_0_piece0 in memory on cbi241.cdh.com:38900 (size: 4.2 KB, free: 21.2 GB)
2023-03-13 15:29:53 INFO  TaskSetManager:54 - Lost task 0.2 in stage 0.0 (TID 2) on cbi241.cdh.com, executor 4: org.apache.spark.util.TaskCompletionListenerException (java.lang.NullPointerException

Previous exception in task: ErrorCode:[ELASTICSEARCH-02], ErrorDescription:[Get elasticsearch version failed] - fail to get elasticsearch version.
    org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsRestClient.getClusterVersion(EsRestClient.java:145)
    org.apache.seatunnel.connectors.seatunnel.elasticsearch.sink.ElasticsearchSinkWriter.<init>(ElasticsearchSinkWriter.java:75)
    org.apache.seatunnel.connectors.seatunnel.elasticsearch.sink.ElasticsearchSink.createWriter(ElasticsearchSink.java:78)
    org.apache.seatunnel.translation.spark.sink.SparkDataWriterFactory.createDataWriter(SparkDataWriterFactory.java:45)
    org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:113)
    org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec$$anonfun$doExecute$2.apply(WriteToDataSourceV2Exec.scala:67)
    org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec$$anonfun$doExecute$2.apply(WriteToDataSourceV2Exec.scala:66)
    org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    org.apache.spark.scheduler.Task.run(Task.scala:121)
    org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
    org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
    org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
    java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    java.lang.Thread.run(Thread.java:748)) [duplicate 2]
2023-03-13 15:29:53 INFO  TaskSetManager:54 - Starting task 0.3 in stage 0.0 (TID 3, cbi242.cdh.com, executor 1, partition 0, PROCESS_LOCAL, 11210 bytes)
2023-03-13 15:29:54 INFO  BlockManagerInfo:54 - Added broadcast_0_piece0 in memory on cbi242.cdh.com:40142 (size: 4.2 KB, free: 21.2 GB)
2023-03-13 15:29:56 INFO  TaskSetManager:54 - Lost task 0.3 in stage 0.0 (TID 3) on cbi242.cdh.com, executor 1: org.apache.spark.util.TaskCompletionListenerException (java.lang.NullPointerException

Flink or Spark Version

No response

Java or Scala Version

No response

Screenshots

No response

Are you willing to submit PR?

Code of Conduct

lightzhao commented 1 year ago

Please describe your problem in English.

EthanWang95 commented 1 year ago

The problem of ST importing ES means that the https protocol is not supported.

lightzhao commented 1 year ago

The problem of ST importing ES means that the https protocol is not supported.

Which ES version are you using?Currently supported ElasticSearch version is >= 2.x and < 8.x.

EthanWang95 commented 1 year ago

Yes, version 6.8.0.

laglangyue commented 1 year ago

image

could you try to add https://?

EthanWang95 commented 1 year ago

image

This exception after adding.

laglangyue commented 1 year ago

sorry,in v2.3.0. es-connector not supported https

laglangyue commented 1 year ago

maybe you need to compile dev and get the e2e-connector which supports HTTPS

github-actions[bot] commented 1 year ago

This issue has been automatically marked as stale because it has not had recent activity for 30 days. It will be closed in next 7 days if no further activity occurs.