DTStack / chunjun

A data integration framework
https://dtstack.github.io/chunjun/
Apache License 2.0
3.98k stars 1.69k forks source link

[Bug] [connector][elasticsearch7] java.lang.OutOfMemoryError: Direct buffer memory #1750

Closed libailin closed 1 year ago

libailin commented 1 year ago

Search before asking

What happened

当从es->mysql同步时, 当mysql字符集插入报错时,es 报以下错误,并且导致tm一直无法退出。

2023-06-27 14:09:14,573 WARN  com.dtstack.chunjun.connector.jdbc.sink.JdbcOutputFormat     [] - write Multiple Records error, start to rollback connection, row size = 1024, first row = {}

2023-06-27 14:09:15,051 INFO  com.dtstack.chunjun.dirty.utils.TablePrintUtil               [] - 
+------------------+----------------------------------+-----------+---------------+---------------------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------+-------------------------+
| serialVersionUID |              jobId               |  jobName  | operatorName  |          dirtyContent           |                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                          errorMessage                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                          | fieldName |       createTime        |
+------------------+----------------------------------+-----------+---------------+---------------------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-----------+-------------------------+
|        1         | ad209126b97fd56699bc622dcc5c1986 | Flink_Job | Sink: sink[3] | {"rowKind":"INSERT","arity":16} | com.dtstack.chunjun.throwable.WriteRecordException: 
JdbcOutputFormat [Flink_Job] writeRecord error: when converting field[0] in Row(+I(https://www.google.com/search?hl=en&q=testing,12345'"\'\");|]*%00{%0d%0a<%00>%bf%27'💡,432746352103916511,http,OK,null,0.002,null,pion.xxxx.cn,1x.249.30.xx,GET,null,/backend/cvat/api/v1/users/self,null,7f8981xxxx1dcd7de,401))
java.sql.SQLException: Incorrect string value: '\xF0\x9F\x92\xA1' for column 'agent' at row 1
    at com.dtstack.chunjun.connector.jdbc.sink.JdbcOutputFormat.processWriteException(JdbcOutputFormat.java:426)
    at com.dtstack.chunjun.connector.jdbc.sink.JdbcOutputFormat.writeSingleRecordInternal(JdbcOutputFormat.java:225)
    at com.dtstack.chunjun.sink.format.BaseRichOutputFormat.writeSingleRecord(BaseRichOutputFormat.java:488)
    at com.dtstack.chunjun.sink.format.BaseRichOutputFormat.lambda$writeRecordInternal$1(BaseRichOutputFormat.java:509)
    at java.util.ArrayList.forEach(ArrayList.java:1249)
    at com.dtstack.chunjun.sink.format.BaseRichOutputFormat.writeRecordInternal(BaseRichOutputFormat.java:509)
    at com.dtstack.chunjun.sink.format.BaseRichOutputFormat.writeRecord(BaseRichOutputFormat.java:308)
    at com.dtstack.chunjun.sink.format.BaseRichOutputFormat.writeRecord(BaseRichOutputFormat.java:90)
    at com.dtstack.chunjun.sink.DtOutputFormatSinkFunction.invoke(DtOutputFormatSinkFunction.java:112)
    at org.apache.flink.table.runtime.operators.sink.SinkOperator.processElement(SinkOperator.java:65)
    at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82)
    at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57)
    at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
    at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56)
    at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29)
    at StreamExecCalc$42.processElement_split3(Unknown Source)
    at StreamExecCalc$42.processElement(Unknown Source)
    at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82)
    at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57)
    at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
    at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56)
    at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29)
    at org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollect(StreamSourceContexts.java:418)
    at org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collect(StreamSourceContexts.java:513)
    at org.apache.flink.streaming.api.operators.StreamSourceContexts$SwitchingOnClose.collect(StreamSourceContexts.java:103)
    at com.dtstack.chunjun.source.DtInputFormatSourceFunction.run(DtInputFormatSourceFunction.java:127)
    at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
    at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:67)
    at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:333)
Caused by: java.sql.SQLException: Incorrect string value: '\xF0\x9F\x92\xA1' for column 'agent' at row 1
    at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:965)
    at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:3933)
    at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:3869)
    at com.mysql.jdbc.MysqlIO.sendCommand(MysqlIO.java:2524)
    at com.mysql.jdbc.ServerPreparedStatement.serverExecute(ServerPreparedStatement.java:1281)
    at com.mysql.jdbc.ServerPreparedStatement.executeInternal(ServerPreparedStatement.java:787)
    at com.mysql.jdbc.PreparedStatement.execute(PreparedStatement.java:1254)
    at com.dtstack.chunjun.connector.jdbc.statement.FieldNamedPreparedStatementImpl.execute(FieldNamedPreparedStatementImpl.java:173)
    at com.dtstack.chunjun.connector.jdbc.sink.wrapper.SimpleStatementWrapper.writeSingleRecord(SimpleStatementWrapper.java:54)
    at com.dtstack.chunjun.connector.jdbc.sink.wrapper.SimpleStatementWrapper.writeSingleRecord(SimpleStatementWrapper.java:29)
    at com.dtstack.chunjun.connector.jdbc.sink.JdbcOutputFormat.writeSingleRecordInternal(JdbcOutputFormat.java:217)
    ... 27 more
 |   null    | 2023-06-27 14:09:15.049 |

以下是主要报错,


2023-06-27 14:18:21,942 ERROR org.apache.flink.runtime.util.ClusterUncaughtExceptionHandler [] - WARNING: Thread 'I/O dispatcher 24' produced an uncaught exception. If you want to fail on uncaught exceptions, then configure cluster.uncaught-exception-handling accordingly
java.lang.OutOfMemoryError: Direct buffer memory
    at java.nio.Bits.reserveMemory(Bits.java:694) ~[?:1.8.0_144]
    at java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:123) ~[?:1.8.0_144]
    at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311) ~[?:1.8.0_144]
    at sun.nio.ch.Util.getTemporaryDirectBuffer(Util.java:241) ~[?:1.8.0_144]
    at sun.nio.ch.IOUtil.read(IOUtil.java:195) ~[?:1.8.0_144]
    at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380) ~[?:1.8.0_144]
    at com.dtstack.chunjun.connector.elasticsearch7.org.apache.http.impl.nio.codecs.AbstractContentDecoder.readFromChannel(AbstractContentDecoder.java:154) ~[chunjun-connector-elasticsearch7-master.jar:?]
    at com.dtstack.chunjun.connector.elasticsearch7.org.apache.http.impl.nio.codecs.LengthDelimitedDecoder.read(LengthDelimitedDecoder.java:84) ~[chunjun-connector-elasticsearch7-master.jar:?]
    at com.dtstack.chunjun.connector.elasticsearch7.org.apache.http.nio.util.SimpleInputBuffer.consumeContent(SimpleInputBuffer.java:66) ~[chunjun-connector-elasticsearch7-master.jar:?]
    at com.dtstack.chunjun.connector.elasticsearch7.org.elasticsearch.client.HeapBufferedAsyncResponseConsumer.onContentReceived(HeapBufferedAsyncResponseConsumer.java:96) ~[chunjun-connector-elasticsearch7-master.jar:?]
    at com.dtstack.chunjun.connector.elasticsearch7.org.apache.http.nio.protocol.AbstractAsyncResponseConsumer.consumeContent(AbstractAsyncResponseConsumer.java:147) ~[chunjun-connector-elasticsearch7-master.jar:?]
    at com.dtstack.chunjun.connector.elasticsearch7.org.apache.http.impl.nio.client.MainClientExec.consumeContent(MainClientExec.java:329) ~[chunjun-connector-elasticsearch7-master.jar:?]
    at com.dtstack.chunjun.connector.elasticsearch7.org.apache.http.impl.nio.client.DefaultClientExchangeHandlerImpl.consumeContent(DefaultClientExchangeHandlerImpl.java:157) ~[chunjun-connector-elasticsearch7-master.jar:?]
    at com.dtstack.chunjun.connector.elasticsearch7.org.apache.http.nio.protocol.HttpAsyncRequestExecutor.inputReady(HttpAsyncRequestExecutor.java:336) ~[chunjun-connector-elasticsearch7-master.jar:?]
    at com.dtstack.chunjun.connector.elasticsearch7.org.apache.http.impl.nio.DefaultNHttpClientConnection.consumeInput(DefaultNHttpClientConnection.java:265) ~[chunjun-connector-elasticsearch7-master.jar:?]
    at com.dtstack.chunjun.connector.elasticsearch7.org.apache.http.impl.nio.client.InternalIODispatch.onInputReady(InternalIODispatch.java:81) ~[chunjun-connector-elasticsearch7-master.jar:?]
    at com.dtstack.chunjun.connector.elasticsearch7.org.apache.http.impl.nio.client.InternalIODispatch.onInputReady(InternalIODispatch.java:39) ~[chunjun-connector-elasticsearch7-master.jar:?]
    at com.dtstack.chunjun.connector.elasticsearch7.org.apache.http.impl.nio.reactor.AbstractIODispatch.inputReady(AbstractIODispatch.java:114) ~[chunjun-connector-elasticsearch7-master.jar:?]
    at com.dtstack.chunjun.connector.elasticsearch7.org.apache.http.impl.nio.reactor.BaseIOReactor.readable(BaseIOReactor.java:162) ~[chunjun-connector-elasticsearch7-master.jar:?]
    at com.dtstack.chunjun.connector.elasticsearch7.org.apache.http.impl.nio.reactor.AbstractIOReactor.processEvent(AbstractIOReactor.java:337) ~[chunjun-connector-elasticsearch7-master.jar:?]
    at com.dtstack.chunjun.connector.elasticsearch7.org.apache.http.impl.nio.reactor.AbstractIOReactor.processEvents(AbstractIOReactor.java:315) ~[chunjun-connector-elasticsearch7-master.jar:?]
    at com.dtstack.chunjun.connector.elasticsearch7.org.apache.http.impl.nio.reactor.AbstractIOReactor.execute(AbstractIOReactor.java:276) ~[chunjun-connector-elasticsearch7-master.jar:?]
    at com.dtstack.chunjun.connector.elasticsearch7.org.apache.http.impl.nio.reactor.BaseIOReactor.execute(BaseIOReactor.java:104) ~[chunjun-connector-elasticsearch7-master.jar:?]
    at com.dtstack.chunjun.connector.elasticsearch7.org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor$Worker.run(AbstractMultiworkerIOReactor.java:591) ~[chunjun-connector-elasticsearch7-master.jar:?]
    at java.lang.Thread.run(Thread.java:748) [?:1.8.0_144]
2023-06-27 14:18:21,945 ERROR com.dtstack.chunjun.source.DtInputFormatSourceFunction       [] - Exception happened, start to close format
com.dtstack.chunjun.connector.elasticsearch7.org.apache.http.ConnectionClosedException: Connection closed unexpectedly
    at com.dtstack.chunjun.connector.elasticsearch7.org.elasticsearch.client.RestClient.extractAndWrapCause(RestClient.java:813) ~[chunjun-connector-elasticsearch7-master.jar:?]
    at com.dtstack.chunjun.connector.elasticsearch7.org.elasticsearch.client.RestClient.performRequest(RestClient.java:248) ~[chunjun-connector-elasticsearch7-master.jar:?]
    at com.dtstack.chunjun.connector.elasticsearch7.org.elasticsearch.client.RestClient.performRequest(RestClient.java:235) ~[chunjun-connector-elasticsearch7-master.jar:?]
    at com.dtstack.chunjun.connector.elasticsearch7.org.elasticsearch.client.RestHighLevelClient.internalPerformRequest(RestHighLevelClient.java:1514) ~[chunjun-connector-elasticsearch7-master.jar:?]
    at com.dtstack.chunjun.connector.elasticsearch7.org.elasticsearch.client.RestHighLevelClient.performRequest(RestHighLevelClient.java:1484) ~[chunjun-connector-elasticsearch7-master.jar:?]
    at com.dtstack.chunjun.connector.elasticsearch7.org.elasticsearch.client.RestHighLevelClient.performRequestAndParseEntity(RestHighLevelClient.java:1454) ~[chunjun-connector-elasticsearch7-master.jar:?]
    at com.dtstack.chunjun.connector.elasticsearch7.org.elasticsearch.client.RestHighLevelClient.scroll(RestHighLevelClient.java:1079) ~[chunjun-connector-elasticsearch7-master.jar:?]
    at com.dtstack.chunjun.connector.elasticsearch7.org.elasticsearch.client.RestHighLevelClient.searchScroll(RestHighLevelClient.java:1066) ~[chunjun-connector-elasticsearch7-master.jar:?]
    at com.dtstack.chunjun.connector.elasticsearch7.source.ElasticsearchInputFormat.searchScroll(ElasticsearchInputFormat.java:166) ~[chunjun-connector-elasticsearch7-master.jar:?]
    at com.dtstack.chunjun.connector.elasticsearch7.source.ElasticsearchInputFormat.reachedEnd(ElasticsearchInputFormat.java:152) ~[chunjun-connector-elasticsearch7-master.jar:?]
    at com.dtstack.chunjun.source.DtInputFormatSourceFunction.run(DtInputFormatSourceFunction.java:123) [chunjun-core.jar:?]
    at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110) [flink-dist-1.16.1.jar:1.16.1]
    at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:67) [flink-dist-1.16.1.jar:1.16.1]
    at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:333) [flink-dist-1.16.1.jar:1.16.1]
Caused by: com.dtstack.chunjun.connector.elasticsearch7.org.apache.http.ConnectionClosedException: Connection closed unexpectedly
    at com.dtstack.chunjun.connector.elasticsearch7.org.apache.http.nio.protocol.HttpAsyncRequestExecutor.closed(HttpAsyncRequestExecutor.java:146) ~[chunjun-connector-elasticsearch7-master.jar:?]
    at com.dtstack.chunjun.connector.elasticsearch7.org.apache.http.impl.nio.client.InternalIODispatch.onClosed(InternalIODispatch.java:71) ~[chunjun-connector-elasticsearch7-master.jar:?]
    at com.dtstack.chunjun.connector.elasticsearch7.org.apache.http.impl.nio.client.InternalIODispatch.onClosed(InternalIODispatch.java:39) ~[chunjun-connector-elasticsearch7-master.jar:?]
    at com.dtstack.chunjun.connector.elasticsearch7.org.apache.http.impl.nio.reactor.AbstractIODispatch.disconnected(AbstractIODispatch.java:100) ~[chunjun-connector-elasticsearch7-master.jar:?]
    at com.dtstack.chunjun.connector.elasticsearch7.org.apache.http.impl.nio.reactor.BaseIOReactor.sessionClosed(BaseIOReactor.java:277) ~[chunjun-connector-elasticsearch7-master.jar:?]
    at com.dtstack.chunjun.connector.elasticsearch7.org.apache.http.impl.nio.reactor.AbstractIOReactor.processClosedSessions(AbstractIOReactor.java:449) ~[chunjun-connector-elasticsearch7-master.jar:?]
    at com.dtstack.chunjun.connector.elasticsearch7.org.apache.http.impl.nio.reactor.AbstractIOReactor.hardShutdown(AbstractIOReactor.java:590) ~[chunjun-connector-elasticsearch7-master.jar:?]
    at com.dtstack.chunjun.connector.elasticsearch7.org.apache.http.impl.nio.reactor.AbstractIOReactor.execute(AbstractIOReactor.java:305) ~[chunjun-connector-elasticsearch7-master.jar:?]
    at com.dtstack.chunjun.connector.elasticsearch7.org.apache.http.impl.nio.reactor.BaseIOReactor.execute(BaseIOReactor.java:104) ~[chunjun-connector-elasticsearch7-master.jar:?]
    at com.dtstack.chunjun.connector.elasticsearch7.org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor$Worker.run(AbstractMultiworkerIOReactor.java:591) ~[chunjun-connector-elasticsearch7-master.jar:?]
    at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_144]
2023-06-27 14:18:21,952 ERROR com.dtstack.chunjun.connector.elasticsearch7.org.apache.http.impl.nio.client.InternalHttpAsyncClient [] - I/O reactor terminated abnormally
com.dtstack.chunjun.connector.elasticsearch7.org.apache.http.nio.reactor.IOReactorException: I/O dispatch worker terminated abnormally
    at com.dtstack.chunjun.connector.elasticsearch7.org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor.execute(AbstractMultiworkerIOReactor.java:359) ~[chunjun-connector-elasticsearch7-master.jar:?]
    at com.dtstack.chunjun.connector.elasticsearch7.org.apache.http.impl.nio.conn.PoolingNHttpClientConnectionManager.execute(PoolingNHttpClientConnectionManager.java:221) ~[chunjun-connector-elasticsearch7-master.jar:?]
    at com.dtstack.chunjun.connector.elasticsearch7.org.apache.http.impl.nio.client.CloseableHttpAsyncClientBase$1.run(CloseableHttpAsyncClientBase.java:64) [chunjun-connector-elasticsearch7-master.jar:?]
    at java.lang.Thread.run(Thread.java:748) [?:1.8.0_144]
Caused by: java.lang.OutOfMemoryError: Direct buffer memory
    at java.nio.Bits.reserveMemory(Bits.java:694) ~[?:1.8.0_144]
    at java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:123) ~[?:1.8.0_144]
    at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311) ~[?:1.8.0_144]
    at sun.nio.ch.Util.getTemporaryDirectBuffer(Util.java:241) ~[?:1.8.0_144]
    at sun.nio.ch.IOUtil.read(IOUtil.java:195) ~[?:1.8.0_144]
    at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380) ~[?:1.8.0_144]
    at com.dtstack.chunjun.connector.elasticsearch7.org.apache.http.impl.nio.codecs.AbstractContentDecoder.readFromChannel(AbstractContentDecoder.java:154) ~[chunjun-connector-elasticsearch7-master.jar:?]
    at com.dtstack.chunjun.connector.elasticsearch7.org.apache.http.impl.nio.codecs.LengthDelimitedDecoder.read(LengthDelimitedDecoder.java:84) ~[chunjun-connector-elasticsearch7-master.jar:?]
    at com.dtstack.chunjun.connector.elasticsearch7.org.apache.http.nio.util.SimpleInputBuffer.consumeContent(SimpleInputBuffer.java:66) ~[chunjun-connector-elasticsearch7-master.jar:?]
    at com.dtstack.chunjun.connector.elasticsearch7.org.elasticsearch.client.HeapBufferedAsyncResponseConsumer.onContentReceived(HeapBufferedAsyncResponseConsumer.java:96) ~[chunjun-connector-elasticsearch7-master.jar:?]
    at com.dtstack.chunjun.connector.elasticsearch7.org.apache.http.nio.protocol.AbstractAsyncResponseConsumer.consumeContent(AbstractAsyncResponseConsumer.java:147) ~[chunjun-connector-elasticsearch7-master.jar:?]
    at com.dtstack.chunjun.connector.elasticsearch7.org.apache.http.impl.nio.client.MainClientExec.consumeContent(MainClientExec.java:329) ~[chunjun-connector-elasticsearch7-master.jar:?]
    at com.dtstack.chunjun.connector.elasticsearch7.org.apache.http.impl.nio.client.DefaultClientExchangeHandlerImpl.consumeContent(DefaultClientExchangeHandlerImpl.java:157) ~[chunjun-connector-elasticsearch7-master.jar:?]
    at com.dtstack.chunjun.connector.elasticsearch7.org.apache.http.nio.protocol.HttpAsyncRequestExecutor.inputReady(HttpAsyncRequestExecutor.java:336) ~[chunjun-connector-elasticsearch7-master.jar:?]
    at com.dtstack.chunjun.connector.elasticsearch7.org.apache.http.impl.nio.DefaultNHttpClientConnection.consumeInput(DefaultNHttpClientConnection.java:265) ~[chunjun-connector-elasticsearch7-master.jar:?]
    at com.dtstack.chunjun.connector.elasticsearch7.org.apache.http.impl.nio.client.InternalIODispatch.onInputReady(InternalIODispatch.java:81) ~[chunjun-connector-elasticsearch7-master.jar:?]
    at com.dtstack.chunjun.connector.elasticsearch7.org.apache.http.impl.nio.client.InternalIODispatch.onInputReady(InternalIODispatch.java:39) ~[chunjun-connector-elasticsearch7-master.jar:?]
    at com.dtstack.chunjun.connector.elasticsearch7.org.apache.http.impl.nio.reactor.AbstractIODispatch.inputReady(AbstractIODispatch.java:114) ~[chunjun-connector-elasticsearch7-master.jar:?]
    at com.dtstack.chunjun.connector.elasticsearch7.org.apache.http.impl.nio.reactor.BaseIOReactor.readable(BaseIOReactor.java:162) ~[chunjun-connector-elasticsearch7-master.jar:?]
    at com.dtstack.chunjun.connector.elasticsearch7.org.apache.http.impl.nio.reactor.AbstractIOReactor.processEvent(AbstractIOReactor.java:337) ~[chunjun-connector-elasticsearch7-master.jar:?]
    at com.dtstack.chunjun.connector.elasticsearch7.org.apache.http.impl.nio.reactor.AbstractIOReactor.processEvents(AbstractIOReactor.java:315) ~[chunjun-connector-elasticsearch7-master.jar:?]
    at com.dtstack.chunjun.connector.elasticsearch7.org.apache.http.impl.nio.reactor.AbstractIOReactor.execute(AbstractIOReactor.java:276) ~[chunjun-connector-elasticsearch7-master.jar:?]
    at com.dtstack.chunjun.connector.elasticsearch7.org.apache.http.impl.nio.reactor.BaseIOReactor.execute(BaseIOReactor.java:104) ~[chunjun-connector-elasticsearch7-master.jar:?]
    at com.dtstack.chunjun.connector.elasticsearch7.org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor$Worker.run(AbstractMultiworkerIOReactor.java:591) ~[chunjun-connector-elasticsearch7-master.jar:?]
    ... 1 more
2023-06-28 00:18:11,808 WARN  akka.remote.transport.netty.NettyTransport                   [] - Remote connection to [/127.0.0.1:44222] failed with org.jboss.netty.handler.codec.frame.TooLongFrameException: Adjusted frame length exceeds 10485760: 1195725860 - discarded
2023-06-28 00:18:11,811 WARN  akka.remote.transport.netty.NettyTransport                   [] - Remote connection to [/127.0.0.1:44224] failed with org.jboss.netty.handler.codec.frame.TooLongFrameException: Adjusted frame length exceeds 10485760: 369295620 - discarded
2023-06-28 00:18:11,816 WARN  akka.remote.transport.netty.NettyTransport                   [] - Remote connection to [/127.0.0.1:45796] failed with org.jboss.netty.handler.codec.frame.TooLongFrameException: Adjusted frame length exceeds 10485760: 1195725860 - discarded
2023-06-28 00:18:11,818 WARN  akka.remote.transport.netty.NettyTransport                   [] - Remote connection to [/127.0.0.1:45798] failed with org.jboss.netty.handler.codec.frame.TooLongFrameException: Adjusted frame length exceeds 10485760: 369295620 - discarded
2023-06-29 00:35:33,571 WARN  akka.remote.transport.netty.NettyTransport                   [] - Remote connection to [/127.0.0.1:59296] failed with org.jboss.netty.handler.codec.frame.TooLongFrameException: Adjusted frame length exceeds 10485760: 1195725860 - discarded
2023-06-29 00:35:33,574 WARN  akka.remote.transport.netty.NettyTransport                   [] - Remote connection to [/127.0.0.1:59298] failed with org.jboss.netty.handler.codec.frame.TooLongFrameException: Adjusted frame length exceeds 10485760: 369295620 - discarded
2023-06-29 00:35:33,578 WARN  akka.remote.transport.netty.NettyTransport                   [] - Remote connection to [/127.0.0.1:60870] failed with org.jboss.netty.handler.codec.frame.TooLongFrameException: Adjusted frame length exceeds 10485760: 1195725860 - discarded
2023-06-29 00:35:33,580 WARN  akka.remote.transport.netty.NettyTransport                   [] - Remote connection to [/127.0.0.1:60872] failed with org.jboss.netty.handler.codec.frame.TooLongFrameException: Adjusted frame length exceeds 10485760: 369295620 - discarded

另一个报错:

2023-06-27 16:28:29,102 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Source: source[1] -> Calc[2] -> Sink: sink[3] (1/1) (f16bd62ce3291c9a308ffff18bdca60e_cbc357ccb763df2852fee8c4fc7d55f2_0_0) switched from RUNNING to FAILED on container_e04_1686304823865_0541_01_000002 @ rad.test.net (dataPort=40710).
org.apache.flink.util.SerializedThrowable: com.dtstack.chunjun.connector.elasticsearch7.org.elasticsearch.ElasticsearchStatusException: Unable to parse response body
    at com.dtstack.chunjun.connector.elasticsearch7.org.elasticsearch.client.RestHighLevelClient.parseResponseException(RestHighLevelClient.java:1773) ~[chunjun-connector-elasticsearch7-master.jar:?]
    at com.dtstack.chunjun.connector.elasticsearch7.org.elasticsearch.client.RestHighLevelClient.internalPerformRequest(RestHighLevelClient.java:1527) ~[chunjun-connector-elasticsearch7-master.jar:?]
    at com.dtstack.chunjun.connector.elasticsearch7.org.elasticsearch.client.RestHighLevelClient.performRequest(RestHighLevelClient.java:1484) ~[chunjun-connector-elasticsearch7-master.jar:?]
    at com.dtstack.chunjun.connector.elasticsearch7.org.elasticsearch.client.RestHighLevelClient.performRequestAndParseEntity(RestHighLevelClient.java:1454) ~[chunjun-connector-elasticsearch7-master.jar:?]
    at com.dtstack.chunjun.connector.elasticsearch7.org.elasticsearch.client.RestHighLevelClient.clearScroll(RestHighLevelClient.java:1126) ~[chunjun-connector-elasticsearch7-master.jar:?]
    at com.dtstack.chunjun.connector.elasticsearch7.source.ElasticsearchInputFormat.clearScroll(ElasticsearchInputFormat.java:142) ~[chunjun-connector-elasticsearch7-master.jar:?]
    at com.dtstack.chunjun.connector.elasticsearch7.source.ElasticsearchInputFormat.closeInternal(ElasticsearchInputFormat.java:127) ~[chunjun-connector-elasticsearch7-master.jar:?]
    at com.dtstack.chunjun.source.format.BaseRichInputFormat.close(BaseRichInputFormat.java:218) ~[chunjun-core.jar:?]
    at com.dtstack.chunjun.source.DtInputFormatSourceFunction.gracefulClose(DtInputFormatSourceFunction.java:162) ~[chunjun-core.jar:?]
    at com.dtstack.chunjun.source.DtInputFormatSourceFunction.run(DtInputFormatSourceFunction.java:143) ~[chunjun-core.jar:?]
    at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110) ~[flink-dist-1.16.1.jar:1.16.1]
    at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:67) ~[flink-dist-1.16.1.jar:1.16.1]
    at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:333) ~[flink-dist-1.16.1.jar:1.16.1]
    Suppressed: org.apache.flink.util.SerializedThrowable: com.dtstack.chunjun.connector.elasticsearch7.org.elasticsearch.common.ParsingException: Failed to parse object: expecting field with name [error] but found [succeeded]
        at com.dtstack.chunjun.connector.elasticsearch7.org.elasticsearch.common.xcontent.XContentParserUtils.ensureFieldName(XContentParserUtils.java:50) ~[chunjun-connector-elasticsearch7-master.jar:?]
        at com.dtstack.chunjun.connector.elasticsearch7.org.elasticsearch.ElasticsearchException.failureFromXContent(ElasticsearchException.java:592) ~[chunjun-connector-elasticsearch7-master.jar:?]
        at com.dtstack.chunjun.connector.elasticsearch7.org.elasticsearch.rest.BytesRestResponse.errorFromXContent(BytesRestResponse.java:169) ~[chunjun-connector-elasticsearch7-master.jar:?]
        at com.dtstack.chunjun.connector.elasticsearch7.org.elasticsearch.client.RestHighLevelClient.parseEntity(RestHighLevelClient.java:1793) ~[chunjun-connector-elasticsearch7-master.jar:?]
        at com.dtstack.chunjun.connector.elasticsearch7.org.elasticsearch.client.RestHighLevelClient.parseResponseException(RestHighLevelClient.java:1770) ~[chunjun-connector-elasticsearch7-master.jar:?]
        at com.dtstack.chunjun.connector.elasticsearch7.org.elasticsearch.client.RestHighLevelClient.internalPerformRequest(RestHighLevelClient.java:1527) ~[chunjun-connector-elasticsearch7-master.jar:?]
        at com.dtstack.chunjun.connector.elasticsearch7.org.elasticsearch.client.RestHighLevelClient.performRequest(RestHighLevelClient.java:1484) ~[chunjun-connector-elasticsearch7-master.jar:?]
        at com.dtstack.chunjun.connector.elasticsearch7.org.elasticsearch.client.RestHighLevelClient.performRequestAndParseEntity(RestHighLevelClient.java:1454) ~[chunjun-connector-elasticsearch7-master.jar:?]
        at com.dtstack.chunjun.connector.elasticsearch7.org.elasticsearch.client.RestHighLevelClient.clearScroll(RestHighLevelClient.java:1126) ~[chunjun-connector-elasticsearch7-master.jar:?]
        at com.dtstack.chunjun.connector.elasticsearch7.source.ElasticsearchInputFormat.clearScroll(ElasticsearchInputFormat.java:142) ~[chunjun-connector-elasticsearch7-master.jar:?]
        at com.dtstack.chunjun.connector.elasticsearch7.source.ElasticsearchInputFormat.closeInternal(ElasticsearchInputFormat.java:127) ~[chunjun-connector-elasticsearch7-master.jar:?]
        at com.dtstack.chunjun.source.format.BaseRichInputFormat.close(BaseRichInputFormat.java:218) ~[chunjun-core.jar:?]
        at com.dtstack.chunjun.source.DtInputFormatSourceFunction.gracefulClose(DtInputFormatSourceFunction.java:162) ~[chunjun-core.jar:?]
        at com.dtstack.chunjun.source.DtInputFormatSourceFunction.run(DtInputFormatSourceFunction.java:143) ~[chunjun-core.jar:?]
        at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110) ~[flink-dist-1.16.1.jar:1.16.1]
        at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:67) ~[flink-dist-1.16.1.jar:1.16.1]
        at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:333) ~[flink-dist-1.16.1.jar:1.16.1]
    Suppressed: org.apache.flink.util.SerializedThrowable: com.dtstack.chunjun.connector.elasticsearch7.org.elasticsearch.ElasticsearchStatusException: Unable to parse response body
        at com.dtstack.chunjun.connector.elasticsearch7.org.elasticsearch.client.RestHighLevelClient.parseResponseException(RestHighLevelClient.java:1773) ~[chunjun-connector-elasticsearch7-master.jar:?]
        at com.dtstack.chunjun.connector.elasticsearch7.org.elasticsearch.client.RestHighLevelClient.internalPerformRequest(RestHighLevelClient.java:1527) ~[chunjun-connector-elasticsearch7-master.jar:?]
        at com.dtstack.chunjun.connector.elasticsearch7.org.elasticsearch.client.RestHighLevelClient.performRequest(RestHighLevelClient.java:1484) ~[chunjun-connector-elasticsearch7-master.jar:?]
        at com.dtstack.chunjun.connector.elasticsearch7.org.elasticsearch.client.RestHighLevelClient.performRequestAndParseEntity(RestHighLevelClient.java:1454) ~[chunjun-connector-elasticsearch7-master.jar:?]
        at com.dtstack.chunjun.connector.elasticsearch7.org.elasticsearch.client.RestHighLevelClient.clearScroll(RestHighLevelClient.java:1126) ~[chunjun-connector-elasticsearch7-master.jar:?]
        at com.dtstack.chunjun.connector.elasticsearch7.source.ElasticsearchInputFormat.clearScroll(ElasticsearchInputFormat.java:142) ~[chunjun-connector-elasticsearch7-master.jar:?]
        at com.dtstack.chunjun.connector.elasticsearch7.source.ElasticsearchInputFormat.closeInternal(ElasticsearchInputFormat.java:127) ~[chunjun-connector-elasticsearch7-master.jar:?]
        at com.dtstack.chunjun.source.format.BaseRichInputFormat.close(BaseRichInputFormat.java:218) ~[chunjun-core.jar:?]
        at com.dtstack.chunjun.source.DtInputFormatSourceFunction.gracefulClose(DtInputFormatSourceFunction.java:162) ~[chunjun-core.jar:?]
        at com.dtstack.chunjun.source.DtInputFormatSourceFunction.close(DtInputFormatSourceFunction.java:157) ~[chunjun-core.jar:?]
        at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:41) ~[flink-dist-1.16.1.jar:1.16.1]
        at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.close(AbstractUdfStreamOperator.java:114) ~[flink-dist-1.16.1.jar:1.16.1]
        at org.apache.flink.streaming.api.operators.StreamSource.close(StreamSource.java:124) ~[flink-dist-1.16.1.jar:1.16.1]
        at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:163) ~[flink-dist-1.16.1.jar:1.16.1]
        at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.closeAllOperators(RegularOperatorChain.java:125) ~[flink-dist-1.16.1.jar:1.16.1]
        at org.apache.flink.streaming.runtime.tasks.StreamTask.closeAllOperators(StreamTask.java:1025) ~[flink-dist-1.16.1.jar:1.16.1]
        at org.apache.flink.util.IOUtils.closeAll(IOUtils.java:255) ~[flink-dist-1.16.1.jar:1.16.1]
        at org.apache.flink.core.fs.AutoCloseableRegistry.doClose(AutoCloseableRegistry.java:72) ~[flink-dist-1.16.1.jar:1.16.1]
        at org.apache.flink.util.AbstractAutoCloseableRegistry.close(AbstractAutoCloseableRegistry.java:127) ~[flink-dist-1.16.1.jar:1.16.1]
        at org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUp(StreamTask.java:943) ~[flink-dist-1.16.1.jar:1.16.1]
        at org.apache.flink.runtime.taskmanager.Task.lambda$restoreAndInvoke$0(Task.java:917) ~[flink-dist-1.16.1.jar:1.16.1]
        at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935) ~[flink-dist-1.16.1.jar:1.16.1]
        at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:917) ~[flink-dist-1.16.1.jar:1.16.1]
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728) ~[flink-dist-1.16.1.jar:1.16.1]
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550) ~[flink-dist-1.16.1.jar:1.16.1]
        at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_144]
        Suppressed: org.apache.flink.util.SerializedThrowable: com.dtstack.chunjun.connector.elasticsearch7.org.elasticsearch.common.ParsingException: Failed to parse object: expecting field with name [error] but found [succeeded]
            at com.dtstack.chunjun.connector.elasticsearch7.org.elasticsearch.common.xcontent.XContentParserUtils.ensureFieldName(XContentParserUtils.java:50) ~[chunjun-connector-elasticsearch7-master.jar:?]
            at com.dtstack.chunjun.connector.elasticsearch7.org.elasticsearch.ElasticsearchException.failureFromXContent(ElasticsearchException.java:592) ~[chunjun-connector-elasticsearch7-master.jar:?]
            at com.dtstack.chunjun.connector.elasticsearch7.org.elasticsearch.rest.BytesRestResponse.errorFromXContent(BytesRestResponse.java:169) ~[chunjun-connector-elasticsearch7-master.jar:?]
            at com.dtstack.chunjun.connector.elasticsearch7.org.elasticsearch.client.RestHighLevelClient.parseEntity(RestHighLevelClient.java:1793) ~[chunjun-connector-elasticsearch7-master.jar:?]
            at com.dtstack.chunjun.connector.elasticsearch7.org.elasticsearch.client.RestHighLevelClient.parseResponseException(RestHighLevelClient.java:1770) ~[chunjun-connector-elasticsearch7-master.jar:?]
            at com.dtstack.chunjun.connector.elasticsearch7.org.elasticsearch.client.RestHighLevelClient.internalPerformRequest(RestHighLevelClient.java:1527) ~[chunjun-connector-elasticsearch7-master.jar:?]
            at com.dtstack.chunjun.connector.elasticsearch7.org.elasticsearch.client.RestHighLevelClient.performRequest(RestHighLevelClient.java:1484) ~[chunjun-connector-elasticsearch7-master.jar:?]
            at com.dtstack.chunjun.connector.elasticsearch7.org.elasticsearch.client.RestHighLevelClient.performRequestAndParseEntity(RestHighLevelClient.java:1454) ~[chunjun-connector-elasticsearch7-master.jar:?]
            at com.dtstack.chunjun.connector.elasticsearch7.org.elasticsearch.client.RestHighLevelClient.clearScroll(RestHighLevelClient.java:1126) ~[chunjun-connector-elasticsearch7-master.jar:?]
            at com.dtstack.chunjun.connector.elasticsearch7.source.ElasticsearchInputFormat.clearScroll(ElasticsearchInputFormat.java:142) ~[chunjun-connector-elasticsearch7-master.jar:?]
            at com.dtstack.chunjun.connector.elasticsearch7.source.ElasticsearchInputFormat.closeInternal(ElasticsearchInputFormat.java:127) ~[chunjun-connector-elasticsearch7-master.jar:?]
            at com.dtstack.chunjun.source.format.BaseRichInputFormat.close(BaseRichInputFormat.java:218) ~[chunjun-core.jar:?]
            at com.dtstack.chunjun.source.DtInputFormatSourceFunction.gracefulClose(DtInputFormatSourceFunction.java:162) ~[chunjun-core.jar:?]
            at com.dtstack.chunjun.source.DtInputFormatSourceFunction.close(DtInputFormatSourceFunction.java:157) ~[chunjun-core.jar:?]
            at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:41) ~[flink-dist-1.16.1.jar:1.16.1]
            at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.close(AbstractUdfStreamOperator.java:114) ~[flink-dist-1.16.1.jar:1.16.1]
            at org.apache.flink.streaming.api.operators.StreamSource.close(StreamSource.java:124) ~[flink-dist-1.16.1.jar:1.16.1]
            at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:163) ~[flink-dist-1.16.1.jar:1.16.1]
            at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.closeAllOperators(RegularOperatorChain.java:125) ~[flink-dist-1.16.1.jar:1.16.1]
            at org.apache.flink.streaming.runtime.tasks.StreamTask.closeAllOperators(StreamTask.java:1025) ~[flink-dist-1.16.1.jar:1.16.1]
            at org.apache.flink.util.IOUtils.closeAll(IOUtils.java:255) ~[flink-dist-1.16.1.jar:1.16.1]
            at org.apache.flink.core.fs.AutoCloseableRegistry.doClose(AutoCloseableRegistry.java:72) ~[flink-dist-1.16.1.jar:1.16.1]
            at org.apache.flink.util.AbstractAutoCloseableRegistry.close(AbstractAutoCloseableRegistry.java:127) ~[flink-dist-1.16.1.jar:1.16.1]
            at org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUp(StreamTask.java:943) ~[flink-dist-1.16.1.jar:1.16.1]
            at org.apache.flink.runtime.taskmanager.Task.lambda$restoreAndInvoke$0(Task.java:917) ~[flink-dist-1.16.1.jar:1.16.1]
            at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935) ~[flink-dist-1.16.1.jar:1.16.1]
            at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:917) ~[flink-dist-1.16.1.jar:1.16.1]
            at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728) ~[flink-dist-1.16.1.jar:1.16.1]
            at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550) ~[flink-dist-1.16.1.jar:1.16.1]
            at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_144]
    Caused by: org.apache.flink.util.SerializedThrowable: com.dtstack.chunjun.connector.elasticsearch7.org.elasticsearch.client.ResponseException: method [DELETE], host [http://xxxx:18008], URI [/_search/scroll], status line [HTTP/1.1 404 Not Found]
{"succeeded":true,"num_freed":0}
        at com.dtstack.chunjun.connector.elasticsearch7.org.elasticsearch.client.RestClient.convertResponse(RestClient.java:283) ~[chunjun-connector-elasticsearch7-master.jar:?]
        at com.dtstack.chunjun.connector.elasticsearch7.org.elasticsearch.client.RestClient.performRequest(RestClient.java:261) ~[chunjun-connector-elasticsearch7-master.jar:?]
        at com.dtstack.chunjun.connector.elasticsearch7.org.elasticsearch.client.RestClient.performRequest(RestClient.java:235) ~[chunjun-connector-elasticsearch7-master.jar:?]
        at com.dtstack.chunjun.connector.elasticsearch7.org.elasticsearch.client.RestHighLevelClient.internalPerformRequest(RestHighLevelClient.java:1514) ~[chunjun-connector-elasticsearch7-master.jar:?]
        ... 24 more
Caused by: org.apache.flink.util.SerializedThrowable: com.dtstack.chunjun.connector.elasticsearch7.org.elasticsearch.client.ResponseException: method [DELETE], host [http://xxxx:18008], URI [/_search/scroll], status line [HTTP/1.1 404 Not Found]
{"succeeded":true,"num_freed":0}
    at com.dtstack.chunjun.connector.elasticsearch7.org.elasticsearch.client.RestClient.convertResponse(RestClient.java:283) ~[chunjun-connector-elasticsearch7-master.jar:?]
    at com.dtstack.chunjun.connector.elasticsearch7.org.elasticsearch.client.RestClient.performRequest(RestClient.java:261) ~[chunjun-connector-elasticsearch7-master.jar:?]
    at com.dtstack.chunjun.connector.elasticsearch7.org.elasticsearch.client.RestClient.performRequest(RestClient.java:235) ~[chunjun-connector-elasticsearch7-master.jar:?]
    at com.dtstack.chunjun.connector.elasticsearch7.org.elasticsearch.client.RestHighLevelClient.internalPerformRequest(RestHighLevelClient.java:1514) ~[chunjun-connector-elasticsearch7-master.jar:?]
    ... 11 more
2023-06-27 16:28:29,135 INFO  org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager [] - Clearing resource requirements of job 075a25a83a5ce14f82280f00b96508df
2023-06-27 16:28:29,141 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Job Flink_Job (075a25a83a5ce14f82280f00b96508df) switched from state RUNNING to FAILING.
org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
    at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:139) ~[flink-dist-1.16.1.jar:1.16.1]
    at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:83) ~[flink-dist-1.16.1.jar:1.16.1]
    at org.apache.flink.runtime.scheduler.DefaultScheduler.recordTaskFailure(DefaultScheduler.java:256) ~[flink-dist-1.16.1.jar:1.16.1]
    at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:247) ~[flink-dist-1.16.1.jar:1.16.1]
    at org.apache.flink.runtime.scheduler.DefaultScheduler.onTaskFailed(DefaultScheduler.java:240) ~[flink-dist-1.16.1.jar:1.16.1]
    at org.apache.flink.runtime.scheduler.SchedulerBase.onTaskExecutionStateUpdate(SchedulerBase.java:738) ~[flink-dist-1.16.1.jar:1.16.1]
    at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:715) ~[flink-dist-1.16.1.jar:1.16.1]
    at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:78) ~[flink-dist-1.16.1.jar:1.16.1]
    at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:477) ~[flink-dist-1.16.1.jar:1.16.1]
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_144]
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_144]
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_144]
    at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_144]
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRpcInvocation$1(AkkaRpcActor.java:309) ~[flink-rpc-akka_e7e92cbe-cc18-4378-8dab-5efa2fb08b19.jar:1.16.1]
    at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83) ~[flink-rpc-akka_e7e92cbe-cc18-4378-8dab-5efa2fb08b19.jar:1.16.1]
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:307) ~[flink-rpc-akka_e7e92cbe-cc18-4378-8dab-5efa2fb08b19.jar:1.16.1]
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:222) ~[flink-rpc-akka_e7e92cbe-cc18-4378-8dab-5efa2fb08b19.jar:1.16.1]
    at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:84) ~[flink-rpc-akka_e7e92cbe-cc18-4378-8dab-5efa2fb08b19.jar:1.16.1]
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:168) ~[flink-rpc-akka_e7e92cbe-cc18-4378-8dab-5efa2fb08b19.jar:1.16.1]
    at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24) [flink-rpc-akka_e7e92cbe-cc18-4378-8dab-5efa2fb08b19.jar:1.16.1]
    at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20) [flink-rpc-akka_e7e92cbe-cc18-4378-8dab-5efa2fb08b19.jar:1.16.1]
    at scala.PartialFunction.applyOrElse(PartialFunction.scala:123) [flink-rpc-akka_e7e92cbe-cc18-4378-8dab-5efa2fb08b19.jar:1.16.1]
    at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) [flink-rpc-akka_e7e92cbe-cc18-4378-8dab-5efa2fb08b19.jar:1.16.1]
    at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20) [flink-rpc-akka_e7e92cbe-cc18-4378-8dab-5efa2fb08b19.jar:1.16.1]
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) [flink-rpc-akka_e7e92cbe-cc18-4378-8dab-5efa2fb08b19.jar:1.16.1]
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) [flink-rpc-akka_e7e92cbe-cc18-4378-8dab-5efa2fb08b19.jar:1.16.1]
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) [flink-rpc-akka_e7e92cbe-cc18-4378-8dab-5efa2fb08b19.jar:1.16.1]
    at akka.actor.Actor.aroundReceive(Actor.scala:537) [flink-rpc-akka_e7e92cbe-cc18-4378-8dab-5efa2fb08b19.jar:1.16.1]
    at akka.actor.Actor.aroundReceive$(Actor.scala:535) [flink-rpc-akka_e7e92cbe-cc18-4378-8dab-5efa2fb08b19.jar:1.16.1]
    at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220) [flink-rpc-akka_e7e92cbe-cc18-4378-8dab-5efa2fb08b19.jar:1.16.1]
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580) [flink-rpc-akka_e7e92cbe-cc18-4378-8dab-5efa2fb08b19.jar:1.16.1]
    at akka.actor.ActorCell.invoke(ActorCell.scala:548) [flink-rpc-akka_e7e92cbe-cc18-4378-8dab-5efa2fb08b19.jar:1.16.1]
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270) [flink-rpc-akka_e7e92cbe-cc18-4378-8dab-5efa2fb08b19.jar:1.16.1]
    at akka.dispatch.Mailbox.run(Mailbox.scala:231) [flink-rpc-akka_e7e92cbe-cc18-4378-8dab-5efa2fb08b19.jar:1.16.1]
    at akka.dispatch.Mailbox.exec(Mailbox.scala:243) [flink-rpc-akka_e7e92cbe-cc18-4378-8dab-5efa2fb08b19.jar:1.16.1]
    at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) [?:1.8.0_144]
    at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) [?:1.8.0_144]
    at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) [?:1.8.0_144]
    at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157) [?:1.8.0_144]
Caused by: org.apache.flink.util.SerializedThrowable: com.dtstack.chunjun.connector.elasticsearch7.org.elasticsearch.ElasticsearchStatusException: Unable to parse response body
    at com.dtstack.chunjun.connector.elasticsearch7.org.elasticsearch.client.RestHighLevelClient.parseResponseException(RestHighLevelClient.java:1773) ~[chunjun-connector-elasticsearch7-master.jar:?]
    at com.dtstack.chunjun.connector.elasticsearch7.org.elasticsearch.client.RestHighLevelClient.internalPerformRequest(RestHighLevelClient.java:1527) ~[chunjun-connector-elasticsearch7-master.jar:?]
    at com.dtstack.chunjun.connector.elasticsearch7.org.elasticsearch.client.RestHighLevelClient.performRequest(RestHighLevelClient.java:1484) ~[chunjun-connector-elasticsearch7-master.jar:?]
    at com.dtstack.chunjun.connector.elasticsearch7.org.elasticsearch.client.RestHighLevelClient.performRequestAndParseEntity(RestHighLevelClient.java:1454) ~[chunjun-connector-elasticsearch7-master.jar:?]
    at com.dtstack.chunjun.connector.elasticsearch7.org.elasticsearch.client.RestHighLevelClient.clearScroll(RestHighLevelClient.java:1126) ~[chunjun-connector-elasticsearch7-master.jar:?]
    at com.dtstack.chunjun.connector.elasticsearch7.source.ElasticsearchInputFormat.clearScroll(ElasticsearchInputFormat.java:142) ~[chunjun-connector-elasticsearch7-master.jar:?]
    at com.dtstack.chunjun.connector.elasticsearch7.source.ElasticsearchInputFormat.closeInternal(ElasticsearchInputFormat.java:127) ~[chunjun-connector-elasticsearch7-master.jar:?]
    at com.dtstack.chunjun.source.format.BaseRichInputFormat.close(BaseRichInputFormat.java:218) ~[chunjun-core.jar:?]
    at com.dtstack.chunjun.source.DtInputFormatSourceFunction.gracefulClose(DtInputFormatSourceFunction.java:162) ~[chunjun-core.jar:?]
    at com.dtstack.chunjun.source.DtInputFormatSourceFunction.run(DtInputFormatSourceFunction.java:143) ~[chunjun-core.jar:?]
    at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110) ~[flink-dist-1.16.1.jar:1.16.1]
    at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:67) ~[flink-dist-1.16.1.jar:1.16.1]
    at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:333) ~[flink-dist-1.16.1.jar:1.16.1]
    Suppressed: org.apache.flink.util.SerializedThrowable: com.dtstack.chunjun.connector.elasticsearch7.org.elasticsearch.common.ParsingException: Failed to parse object: expecting field with name [error] but found [succeeded]
        at com.dtstack.chunjun.connector.elasticsearch7.org.elasticsearch.common.xcontent.XContentParserUtils.ensureFieldName(XContentParserUtils.java:50) ~[chunjun-connector-elasticsearch7-master.jar:?]
        at com.dtstack.chunjun.connector.elasticsearch7.org.elasticsearch.ElasticsearchException.failureFromXContent(ElasticsearchException.java:592) ~[chunjun-connector-elasticsearch7-master.jar:?]
        at com.dtstack.chunjun.connector.elasticsearch7.org.elasticsearch.rest.BytesRestResponse.errorFromXContent(BytesRestResponse.java:169) ~[chunjun-connector-elasticsearch7-master.jar:?]
        at com.dtstack.chunjun.connector.elasticsearch7.org.elasticsearch.client.RestHighLevelClient.parseEntity(RestHighLevelClient.java:1793) ~[chunjun-connector-elasticsearch7-master.jar:?]
        at com.dtstack.chunjun.connector.elasticsearch7.org.elasticsearch.client.RestHighLevelClient.parseResponseException(RestHighLevelClient.java:1770) ~[chunjun-connector-elasticsearch7-master.jar:?]
        at com.dtstack.chunjun.connector.elasticsearch7.org.elasticsearch.client.RestHighLevelClient.internalPerformRequest(RestHighLevelClient.java:1527) ~[chunjun-connector-elasticsearch7-master.jar:?]
        at com.dtstack.chunjun.connector.elasticsearch7.org.elasticsearch.client.RestHighLevelClient.performRequest(RestHighLevelClient.java:1484) ~[chunjun-connector-elasticsearch7-master.jar:?]
        at com.dtstack.chunjun.connector.elasticsearch7.org.elasticsearch.client.RestHighLevelClient.performRequestAndParseEntity(RestHighLevelClient.java:1454) ~[chunjun-connector-elasticsearch7-master.jar:?]
        at com.dtstack.chunjun.connector.elasticsearch7.org.elasticsearch.client.RestHighLevelClient.clearScroll(RestHighLevelClient.java:1126) ~[chunjun-connector-elasticsearch7-master.jar:?]
        at com.dtstack.chunjun.connector.elasticsearch7.source.ElasticsearchInputFormat.clearScroll(ElasticsearchInputFormat.java:142) ~[chunjun-connector-elasticsearch7-master.jar:?]
        at com.dtstack.chunjun.connector.elasticsearch7.source.ElasticsearchInputFormat.closeInternal(ElasticsearchInputFormat.java:127) ~[chunjun-connector-elasticsearch7-master.jar:?]
        at com.dtstack.chunjun.source.format.BaseRichInputFormat.close(BaseRichInputFormat.java:218) ~[chunjun-core.jar:?]
        at com.dtstack.chunjun.source.DtInputFormatSourceFunction.gracefulClose(DtInputFormatSourceFunction.java:162) ~[chunjun-core.jar:?]
        at com.dtstack.chunjun.source.DtInputFormatSourceFunction.run(DtInputFormatSourceFunction.java:143) ~[chunjun-core.jar:?]
        at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110) ~[flink-dist-1.16.1.jar:1.16.1]
        at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:67) ~[flink-dist-1.16.1.jar:1.16.1]
        at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:333) ~[flink-dist-1.16.1.jar:1.16.1]
    Suppressed: org.apache.flink.util.SerializedThrowable: com.dtstack.chunjun.connector.elasticsearch7.org.elasticsearch.ElasticsearchStatusException: Unable to parse response body
        at com.dtstack.chunjun.connector.elasticsearch7.org.elasticsearch.client.RestHighLevelClient.parseResponseException(RestHighLevelClient.java:1773) ~[chunjun-connector-elasticsearch7-master.jar:?]
        at com.dtstack.chunjun.connector.elasticsearch7.org.elasticsearch.client.RestHighLevelClient.internalPerformRequest(RestHighLevelClient.java:1527) ~[chunjun-connector-elasticsearch7-master.jar:?]
        at com.dtstack.chunjun.connector.elasticsearch7.org.elasticsearch.client.RestHighLevelClient.performRequest(RestHighLevelClient.java:1484) ~[chunjun-connector-elasticsearch7-master.jar:?]
        at com.dtstack.chunjun.connector.elasticsearch7.org.elasticsearch.client.RestHighLevelClient.performRequestAndParseEntity(RestHighLevelClient.java:1454) ~[chunjun-connector-elasticsearch7-master.jar:?]
        at com.dtstack.chunjun.connector.elasticsearch7.org.elasticsearch.client.RestHighLevelClient.clearScroll(RestHighLevelClient.java:1126) ~[chunjun-connector-elasticsearch7-master.jar:?]
        at com.dtstack.chunjun.connector.elasticsearch7.source.ElasticsearchInputFormat.clearScroll(ElasticsearchInputFormat.java:142) ~[chunjun-connector-elasticsearch7-master.jar:?]
        at com.dtstack.chunjun.connector.elasticsearch7.source.ElasticsearchInputFormat.closeInternal(ElasticsearchInputFormat.java:127) ~[chunjun-connector-elasticsearch7-master.jar:?]
        at com.dtstack.chunjun.source.format.BaseRichInputFormat.close(BaseRichInputFormat.java:218) ~[chunjun-core.jar:?]
        at com.dtstack.chunjun.source.DtInputFormatSourceFunction.gracefulClose(DtInputFormatSourceFunction.java:162) ~[chunjun-core.jar:?]
        at com.dtstack.chunjun.source.DtInputFormatSourceFunction.close(DtInputFormatSourceFunction.java:157) ~[chunjun-core.jar:?]
        at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:41) ~[flink-dist-1.16.1.jar:1.16.1]
        at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.close(AbstractUdfStreamOperator.java:114) ~[flink-dist-1.16.1.jar:1.16.1]
        at org.apache.flink.streaming.api.operators.StreamSource.close(StreamSource.java:124) ~[flink-dist-1.16.1.jar:1.16.1]
        at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:163) ~[flink-dist-1.16.1.jar:1.16.1]
        at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.closeAllOperators(RegularOperatorChain.java:125) ~[flink-dist-1.16.1.jar:1.16.1]
        at org.apache.flink.streaming.runtime.tasks.StreamTask.closeAllOperators(StreamTask.java:1025) ~[flink-dist-1.16.1.jar:1.16.1]
        at org.apache.flink.util.IOUtils.closeAll(IOUtils.java:255) ~[flink-dist-1.16.1.jar:1.16.1]
        at org.apache.flink.core.fs.AutoCloseableRegistry.doClose(AutoCloseableRegistry.java:72) ~[flink-dist-1.16.1.jar:1.16.1]
        at org.apache.flink.util.AbstractAutoCloseableRegistry.close(AbstractAutoCloseableRegistry.java:127) ~[flink-dist-1.16.1.jar:1.16.1]
        at org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUp(StreamTask.java:943) ~[flink-dist-1.16.1.jar:1.16.1]
        at org.apache.flink.runtime.taskmanager.Task.lambda$restoreAndInvoke$0(Task.java:917) ~[flink-dist-1.16.1.jar:1.16.1]
        at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935) ~[flink-dist-1.16.1.jar:1.16.1]
        at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:917) ~[flink-dist-1.16.1.jar:1.16.1]
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728) ~[flink-dist-1.16.1.jar:1.16.1]
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550) ~[flink-dist-1.16.1.jar:1.16.1]
        at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_144]
        Suppressed: org.apache.flink.util.SerializedThrowable: com.dtstack.chunjun.connector.elasticsearch7.org.elasticsearch.common.ParsingException: Failed to parse object: expecting field with name [error] but found [succeeded]
            at com.dtstack.chunjun.connector.elasticsearch7.org.elasticsearch.common.xcontent.XContentParserUtils.ensureFieldName(XContentParserUtils.java:50) ~[chunjun-connector-elasticsearch7-master.jar:?]
            at com.dtstack.chunjun.connector.elasticsearch7.org.elasticsearch.ElasticsearchException.failureFromXContent(ElasticsearchException.java:592) ~[chunjun-connector-elasticsearch7-master.jar:?]
            at com.dtstack.chunjun.connector.elasticsearch7.org.elasticsearch.rest.BytesRestResponse.errorFromXContent(BytesRestResponse.java:169) ~[chunjun-connector-elasticsearch7-master.jar:?]
            at com.dtstack.chunjun.connector.elasticsearch7.org.elasticsearch.client.RestHighLevelClient.parseEntity(RestHighLevelClient.java:1793) ~[chunjun-connector-elasticsearch7-master.jar:?]
            at com.dtstack.chunjun.connector.elasticsearch7.org.elasticsearch.client.RestHighLevelClient.parseResponseException(RestHighLevelClient.java:1770) ~[chunjun-connector-elasticsearch7-master.jar:?]
            at com.dtstack.chunjun.connector.elasticsearch7.org.elasticsearch.client.RestHighLevelClient.internalPerformRequest(RestHighLevelClient.java:1527) ~[chunjun-connector-elasticsearch7-master.jar:?]
            at com.dtstack.chunjun.connector.elasticsearch7.org.elasticsearch.client.RestHighLevelClient.performRequest(RestHighLevelClient.java:1484) ~[chunjun-connector-elasticsearch7-master.jar:?]
            at com.dtstack.chunjun.connector.elasticsearch7.org.elasticsearch.client.RestHighLevelClient.performRequestAndParseEntity(RestHighLevelClient.java:1454) ~[chunjun-connector-elasticsearch7-master.jar:?]
            at com.dtstack.chunjun.connector.elasticsearch7.org.elasticsearch.client.RestHighLevelClient.clearScroll(RestHighLevelClient.java:1126) ~[chunjun-connector-elasticsearch7-master.jar:?]
            at com.dtstack.chunjun.connector.elasticsearch7.source.ElasticsearchInputFormat.clearScroll(ElasticsearchInputFormat.java:142) ~[chunjun-connector-elasticsearch7-master.jar:?]
            at com.dtstack.chunjun.connector.elasticsearch7.source.ElasticsearchInputFormat.closeInternal(ElasticsearchInputFormat.java:127) ~[chunjun-connector-elasticsearch7-master.jar:?]
            at com.dtstack.chunjun.source.format.BaseRichInputFormat.close(BaseRichInputFormat.java:218) ~[chunjun-core.jar:?]
            at com.dtstack.chunjun.source.DtInputFormatSourceFunction.gracefulClose(DtInputFormatSourceFunction.java:162) ~[chunjun-core.jar:?]
            at com.dtstack.chunjun.source.DtInputFormatSourceFunction.close(DtInputFormatSourceFunction.java:157) ~[chunjun-core.jar:?]
            at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:41) ~[flink-dist-1.16.1.jar:1.16.1]
            at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.close(AbstractUdfStreamOperator.java:114) ~[flink-dist-1.16.1.jar:1.16.1]
            at org.apache.flink.streaming.api.operators.StreamSource.close(StreamSource.java:124) ~[flink-dist-1.16.1.jar:1.16.1]
            at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:163) ~[flink-dist-1.16.1.jar:1.16.1]
            at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.closeAllOperators(RegularOperatorChain.java:125) ~[flink-dist-1.16.1.jar:1.16.1]
            at org.apache.flink.streaming.runtime.tasks.StreamTask.closeAllOperators(StreamTask.java:1025) ~[flink-dist-1.16.1.jar:1.16.1]
            at org.apache.flink.util.IOUtils.closeAll(IOUtils.java:255) ~[flink-dist-1.16.1.jar:1.16.1]
            at org.apache.flink.core.fs.AutoCloseableRegistry.doClose(AutoCloseableRegistry.java:72) ~[flink-dist-1.16.1.jar:1.16.1]
            at org.apache.flink.util.AbstractAutoCloseableRegistry.close(AbstractAutoCloseableRegistry.java:127) ~[flink-dist-1.16.1.jar:1.16.1]
            at org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUp(StreamTask.java:943) ~[flink-dist-1.16.1.jar:1.16.1]
            at org.apache.flink.runtime.taskmanager.Task.lambda$restoreAndInvoke$0(Task.java:917) ~[flink-dist-1.16.1.jar:1.16.1]
            at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935) ~[flink-dist-1.16.1.jar:1.16.1]
            at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:917) ~[flink-dist-1.16.1.jar:1.16.1]
            at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728) ~[flink-dist-1.16.1.jar:1.16.1]
            at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550) ~[flink-dist-1.16.1.jar:1.16.1]
            at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_144]
    Caused by: org.apache.flink.util.SerializedThrowable: com.dtstack.chunjun.connector.elasticsearch7.org.elasticsearch.client.ResponseException: method [DELETE], host [http://xxxx:18008], URI [/_search/scroll], status line [HTTP/1.1 404 Not Found]
{"succeeded":true,"num_freed":0}
        at com.dtstack.chunjun.connector.elasticsearch7.org.elasticsearch.client.RestClient.convertResponse(RestClient.java:283) ~[chunjun-connector-elasticsearch7-master.jar:?]
        at com.dtstack.chunjun.connector.elasticsearch7.org.elasticsearch.client.RestClient.performRequest(RestClient.java:261) ~[chunjun-connector-elasticsearch7-master.jar:?]
        at com.dtstack.chunjun.connector.elasticsearch7.org.elasticsearch.client.RestClient.performRequest(RestClient.java:235) ~[chunjun-connector-elasticsearch7-master.jar:?]
        at com.dtstack.chunjun.connector.elasticsearch7.org.elasticsearch.client.RestHighLevelClient.internalPerformRequest(RestHighLevelClient.java:1514) ~[chunjun-connector-elasticsearch7-master.jar:?]
        ... 24 more
Caused by: org.apache.flink.util.SerializedThrowable: com.dtstack.chunjun.connector.elasticsearch7.org.elasticsearch.client.ResponseException: method [DELETE], host [http://xxxx:18008], URI [/_search/scroll], status line [HTTP/1.1 404 Not Found]
{"succeeded":true,"num_freed":0}
    at com.dtstack.chunjun.connector.elasticsearch7.org.elasticsearch.client.RestClient.convertResponse(RestClient.java:283) ~[chunjun-connector-elasticsearch7-master.jar:?]
    at com.dtstack.chunjun.connector.elasticsearch7.org.elasticsearch.client.RestClient.performRequest(RestClient.java:261) ~[chunjun-connector-elasticsearch7-master.jar:?]
    at com.dtstack.chunjun.connector.elasticsearch7.org.elasticsearch.client.RestClient.performRequest(RestClient.java:235) ~[chunjun-connector-elasticsearch7-master.jar:?]
    at com.dtstack.chunjun.connector.elasticsearch7.org.elasticsearch.client.RestHighLevelClient.internalPerformRequest(RestHighLevelClient.java:1514) ~[chunjun-connector-elasticsearch7-master.jar:?]
    at com.dtstack.chunjun.connector.elasticsearch7.org.elasticsearch.client.RestHighLevelClient.performRequest(RestHighLevelClient.java:1484) ~[chunjun-connector-elasticsearch7-master.jar:?]
    at com.dtstack.chunjun.connector.elasticsearch7.org.elasticsearch.client.RestHighLevelClient.performRequestAndParseEntity(RestHighLevelClient.java:1454) ~[chunjun-connector-elasticsearch7-master.jar:?]
    at com.dtstack.chunjun.connector.elasticsearch7.org.elasticsearch.client.RestHighLevelClient.clearScroll(RestHighLevelClient.java:1126) ~[chunjun-connector-elasticsearch7-master.jar:?]
    at com.dtstack.chunjun.connector.elasticsearch7.source.ElasticsearchInputFormat.clearScroll(ElasticsearchInputFormat.java:142) ~[chunjun-connector-elasticsearch7-master.jar:?]
    at com.dtstack.chunjun.connector.elasticsearch7.source.ElasticsearchInputFormat.closeInternal(ElasticsearchInputFormat.java:127) ~[chunjun-connector-elasticsearch7-master.jar:?]
    at com.dtstack.chunjun.source.format.BaseRichInputFormat.close(BaseRichInputFormat.java:218) ~[chunjun-core.jar:?]
    at com.dtstack.chunjun.source.DtInputFormatSourceFunction.gracefulClose(DtInputFormatSourceFunction.java:162) ~[chunjun-core.jar:?]
    at com.dtstack.chunjun.source.DtInputFormatSourceFunction.run(DtInputFormatSourceFunction.java:143) ~[chunjun-core.jar:?]
    at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110) ~[flink-dist-1.16.1.jar:1.16.1]
    at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:67) ~[flink-dist-1.16.1.jar:1.16.1]
    at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:333) ~[flink-dist-1.16.1.jar:1.16.1]
2023-06-27 16:28:29,146 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Job Flink_Job (075a25a83a5ce14f82280f00b96508df) switched from state FAILING to FAILED.
org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
    at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:139) ~[flink-dist-1.16.1.jar:1.16.1]
    at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:83) ~[flink-dist-1.16.1.jar:1.16.1]
    at org.apache.flink.runtime.scheduler.DefaultScheduler.recordTaskFailure(DefaultScheduler.java:256) ~[flink-dist-1.16.1.jar:1.16.1]
    at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:247) ~[flink-dist-1.16.1.jar:1.16.1]
    at org.apache.flink.runtime.scheduler.DefaultScheduler.onTaskFailed(DefaultScheduler.java:240) ~[flink-dist-1.16.1.jar:1.16.1]
    at org.apache.flink.runtime.scheduler.SchedulerBase.onTaskExecutionStateUpdate(SchedulerBase.java:738) ~[flink-dist-1.16.1.jar:1.16.1]
    at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:715) ~[flink-dist-1.16.1.jar:1.16.1]
    at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:78) ~[flink-dist-1.16.1.jar:1.16.1]
    at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:477) ~[flink-dist-1.16.1.jar:1.16.1]
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_144]
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_144]
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_144]
    at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_144]
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRpcInvocation$1(AkkaRpcActor.java:309) ~[flink-rpc-akka_e7e92cbe-cc18-4378-8dab-5efa2fb08b19.jar:1.16.1]
    at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83) ~[flink-rpc-akka_e7e92cbe-cc18-4378-8dab-5efa2fb08b19.jar:1.16.1]
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:307) ~[flink-rpc-akka_e7e92cbe-cc18-4378-8dab-5efa2fb08b19.jar:1.16.1]
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:222) ~[flink-rpc-akka_e7e92cbe-cc18-4378-8dab-5efa2fb08b19.jar:1.16.1]
    at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:84) ~[flink-rpc-akka_e7e92cbe-cc18-4378-8dab-5efa2fb08b19.jar:1.16.1]
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:168) ~[flink-rpc-akka_e7e92cbe-cc18-4378-8dab-5efa2fb08b19.jar:1.16.1]
    at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24) [flink-rpc-akka_e7e92cbe-cc18-4378-8dab-5efa2fb08b19.jar:1.16.1]
    at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20) [flink-rpc-akka_e7e92cbe-cc18-4378-8dab-5efa2fb08b19.jar:1.16.1]
    at scala.PartialFunction.applyOrElse(PartialFunction.scala:123) [flink-rpc-akka_e7e92cbe-cc18-4378-8dab-5efa2fb08b19.jar:1.16.1]
    at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) [flink-rpc-akka_e7e92cbe-cc18-4378-8dab-5efa2fb08b19.jar:1.16.1]
    at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20) [flink-rpc-akka_e7e92cbe-cc18-4378-8dab-5efa2fb08b19.jar:1.16.1]
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) [flink-rpc-akka_e7e92cbe-cc18-4378-8dab-5efa2fb08b19.jar:1.16.1]
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) [flink-rpc-akka_e7e92cbe-cc18-4378-8dab-5efa2fb08b19.jar:1.16.1]
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) [flink-rpc-akka_e7e92cbe-cc18-4378-8dab-5efa2fb08b19.jar:1.16.1]
    at akka.actor.Actor.aroundReceive(Actor.scala:537) [flink-rpc-akka_e7e92cbe-cc18-4378-8dab-5efa2fb08b19.jar:1.16.1]
    at akka.actor.Actor.aroundReceive$(Actor.scala:535) [flink-rpc-akka_e7e92cbe-cc18-4378-8dab-5efa2fb08b19.jar:1.16.1]
    at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220) [flink-rpc-akka_e7e92cbe-cc18-4378-8dab-5efa2fb08b19.jar:1.16.1]
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580) [flink-rpc-akka_e7e92cbe-cc18-4378-8dab-5efa2fb08b19.jar:1.16.1]
    at akka.actor.ActorCell.invoke(ActorCell.scala:548) [flink-rpc-akka_e7e92cbe-cc18-4378-8dab-5efa2fb08b19.jar:1.16.1]
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270) [flink-rpc-akka_e7e92cbe-cc18-4378-8dab-5efa2fb08b19.jar:1.16.1]
    at akka.dispatch.Mailbox.run(Mailbox.scala:231) [flink-rpc-akka_e7e92cbe-cc18-4378-8dab-5efa2fb08b19.jar:1.16.1]
    at akka.dispatch.Mailbox.exec(Mailbox.scala:243) [flink-rpc-akka_e7e92cbe-cc18-4378-8dab-5efa2fb08b19.jar:1.16.1]
    at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) [?:1.8.0_144]
    at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) [?:1.8.0_144]
    at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) [?:1.8.0_144]
    at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157) [?:1.8.0_144]
Caused by: org.apache.flink.util.SerializedThrowable: com.dtstack.chunjun.connector.elasticsearch7.org.elasticsearch.ElasticsearchStatusException: Unable to parse response body
    at com.dtstack.chunjun.connector.elasticsearch7.org.elasticsearch.client.RestHighLevelClient.parseResponseException(RestHighLevelClient.java:1773) ~[chunjun-connector-elasticsearch7-master.jar:?]
    at com.dtstack.chunjun.connector.elasticsearch7.org.elasticsearch.client.RestHighLevelClient.internalPerformRequest(RestHighLevelClient.java:1527) ~[chunjun-connector-elasticsearch7-master.jar:?]
    at com.dtstack.chunjun.connector.elasticsearch7.org.elasticsearch.client.RestHighLevelClient.performRequest(RestHighLevelClient.java:1484) ~[chunjun-connector-elasticsearch7-master.jar:?]
    at com.dtstack.chunjun.connector.elasticsearch7.org.elasticsearch.client.RestHighLevelClient.performRequestAndParseEntity(RestHighLevelClient.java:1454) ~[chunjun-connector-elasticsearch7-master.jar:?]
    at com.dtstack.chunjun.connector.elasticsearch7.org.elasticsearch.client.RestHighLevelClient.clearScroll(RestHighLevelClient.java:1126) ~[chunjun-connector-elasticsearch7-master.jar:?]
    at com.dtstack.chunjun.connector.elasticsearch7.source.ElasticsearchInputFormat.clearScroll(ElasticsearchInputFormat.java:142) ~[chunjun-connector-elasticsearch7-master.jar:?]
    at com.dtstack.chunjun.connector.elasticsearch7.source.ElasticsearchInputFormat.closeInternal(ElasticsearchInputFormat.java:127) ~[chunjun-connector-elasticsearch7-master.jar:?]
    at com.dtstack.chunjun.source.format.BaseRichInputFormat.close(BaseRichInputFormat.java:218) ~[chunjun-core.jar:?]
    at com.dtstack.chunjun.source.DtInputFormatSourceFunction.gracefulClose(DtInputFormatSourceFunction.java:162) ~[chunjun-core.jar:?]
    at com.dtstack.chunjun.source.DtInputFormatSourceFunction.run(DtInputFormatSourceFunction.java:143) ~[chunjun-core.jar:?]
    at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110) ~[flink-dist-1.16.1.jar:1.16.1]
    at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:67) ~[flink-dist-1.16.1.jar:1.16.1]
    at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:333) ~[flink-dist-1.16.1.jar:1.16.1]
    Suppressed: org.apache.flink.util.SerializedThrowable: com.dtstack.chunjun.connector.elasticsearch7.org.elasticsearch.common.ParsingException: Failed to parse object: expecting field with name [error] but found [succeeded]
        at com.dtstack.chunjun.connector.elasticsearch7.org.elasticsearch.common.xcontent.XContentParserUtils.ensureFieldName(XContentParserUtils.java:50) ~[chunjun-connector-elasticsearch7-master.jar:?]
        at com.dtstack.chunjun.connector.elasticsearch7.org.elasticsearch.ElasticsearchException.failureFromXContent(ElasticsearchException.java:592) ~[chunjun-connector-elasticsearch7-master.jar:?]
        at com.dtstack.chunjun.connector.elasticsearch7.org.elasticsearch.rest.BytesRestResponse.errorFromXContent(BytesRestResponse.java:169) ~[chunjun-connector-elasticsearch7-master.jar:?]
        at com.dtstack.chunjun.connector.elasticsearch7.org.elasticsearch.client.RestHighLevelClient.parseEntity(RestHighLevelClient.java:1793) ~[chunjun-connector-elasticsearch7-master.jar:?]
        at com.dtstack.chunjun.connector.elasticsearch7.org.elasticsearch.client.RestHighLevelClient.parseResponseException(RestHighLevelClient.java:1770) ~[chunjun-connector-elasticsearch7-master.jar:?]
        at com.dtstack.chunjun.connector.elasticsearch7.org.elasticsearch.client.RestHighLevelClient.internalPerformRequest(RestHighLevelClient.java:1527) ~[chunjun-connector-elasticsearch7-master.jar:?]
        at com.dtstack.chunjun.connector.elasticsearch7.org.elasticsearch.client.RestHighLevelClient.performRequest(RestHighLevelClient.java:1484) ~[chunjun-connector-elasticsearch7-master.jar:?]
        at com.dtstack.chunjun.connector.elasticsearch7.org.elasticsearch.client.RestHighLevelClient.performRequestAndParseEntity(RestHighLevelClient.java:1454) ~[chunjun-connector-elasticsearch7-master.jar:?]
        at com.dtstack.chunjun.connector.elasticsearch7.org.elasticsearch.client.RestHighLevelClient.clearScroll(RestHighLevelClient.java:1126) ~[chunjun-connector-elasticsearch7-master.jar:?]
        at com.dtstack.chunjun.connector.elasticsearch7.source.ElasticsearchInputFormat.clearScroll(ElasticsearchInputFormat.java:142) ~[chunjun-connector-elasticsearch7-master.jar:?]
        at com.dtstack.chunjun.connector.elasticsearch7.source.ElasticsearchInputFormat.closeInternal(ElasticsearchInputFormat.java:127) ~[chunjun-connector-elasticsearch7-master.jar:?]
        at com.dtstack.chunjun.source.format.BaseRichInputFormat.close(BaseRichInputFormat.java:218) ~[chunjun-core.jar:?]
        at com.dtstack.chunjun.source.DtInputFormatSourceFunction.gracefulClose(DtInputFormatSourceFunction.java:162) ~[chunjun-core.jar:?]
        at com.dtstack.chunjun.source.DtInputFormatSourceFunction.run(DtInputFormatSourceFunction.java:143) ~[chunjun-core.jar:?]
        at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110) ~[flink-dist-1.16.1.jar:1.16.1]
        at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:67) ~[flink-dist-1.16.1.jar:1.16.1]
        at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:333) ~[flink-dist-1.16.1.jar:1.16.1]
    Suppressed: org.apache.flink.util.SerializedThrowable: com.dtstack.chunjun.connector.elasticsearch7.org.elasticsearch.ElasticsearchStatusException: Unable to parse response body
        at com.dtstack.chunjun.connector.elasticsearch7.org.elasticsearch.client.RestHighLevelClient.parseResponseException(RestHighLevelClient.java:1773) ~[chunjun-connector-elasticsearch7-master.jar:?]
        at com.dtstack.chunjun.connector.elasticsearch7.org.elasticsearch.client.RestHighLevelClient.internalPerformRequest(RestHighLevelClient.java:1527) ~[chunjun-connector-elasticsearch7-master.jar:?]
        at com.dtstack.chunjun.connector.elasticsearch7.org.elasticsearch.client.RestHighLevelClient.performRequest(RestHighLevelClient.java:1484) ~[chunjun-connector-elasticsearch7-master.jar:?]
        at com.dtstack.chunjun.connector.elasticsearch7.org.elasticsearch.client.RestHighLevelClient.performRequestAndParseEntity(RestHighLevelClient.java:1454) ~[chunjun-connector-elasticsearch7-master.jar:?]
        at com.dtstack.chunjun.connector.elasticsearch7.org.elasticsearch.client.RestHighLevelClient.clearScroll(RestHighLevelClient.java:1126) ~[chunjun-connector-elasticsearch7-master.jar:?]
        at com.dtstack.chunjun.connector.elasticsearch7.source.ElasticsearchInputFormat.clearScroll(ElasticsearchInputFormat.java:142) ~[chunjun-connector-elasticsearch7-master.jar:?]
        at com.dtstack.chunjun.connector.elasticsearch7.source.ElasticsearchInputFormat.closeInternal(ElasticsearchInputFormat.java:127) ~[chunjun-connector-elasticsearch7-master.jar:?]
        at com.dtstack.chunjun.source.format.BaseRichInputFormat.close(BaseRichInputFormat.java:218) ~[chunjun-core.jar:?]
        at com.dtstack.chunjun.source.DtInputFormatSourceFunction.gracefulClose(DtInputFormatSourceFunction.java:162) ~[chunjun-core.jar:?]
        at com.dtstack.chunjun.source.DtInputFormatSourceFunction.close(DtInputFormatSourceFunction.java:157) ~[chunjun-core.jar:?]
        at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:41) ~[flink-dist-1.16.1.jar:1.16.1]
        at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.close(AbstractUdfStreamOperator.java:114) ~[flink-dist-1.16.1.jar:1.16.1]
        at org.apache.flink.streaming.api.operators.StreamSource.close(StreamSource.java:124) ~[flink-dist-1.16.1.jar:1.16.1]
        at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:163) ~[flink-dist-1.16.1.jar:1.16.1]
        at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.closeAllOperators(RegularOperatorChain.java:125) ~[flink-dist-1.16.1.jar:1.16.1]
        at org.apache.flink.streaming.runtime.tasks.StreamTask.closeAllOperators(StreamTask.java:1025) ~[flink-dist-1.16.1.jar:1.16.1]
        at org.apache.flink.util.IOUtils.closeAll(IOUtils.java:255) ~[flink-dist-1.16.1.jar:1.16.1]
        at org.apache.flink.core.fs.AutoCloseableRegistry.doClose(AutoCloseableRegistry.java:72) ~[flink-dist-1.16.1.jar:1.16.1]
        at org.apache.flink.util.AbstractAutoCloseableRegistry.close(AbstractAutoCloseableRegistry.java:127) ~[flink-dist-1.16.1.jar:1.16.1]
        at org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUp(StreamTask.java:943) ~[flink-dist-1.16.1.jar:1.16.1]
        at org.apache.flink.runtime.taskmanager.Task.lambda$restoreAndInvoke$0(Task.java:917) ~[flink-dist-1.16.1.jar:1.16.1]
        at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935) ~[flink-dist-1.16.1.jar:1.16.1]
        at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:917) ~[flink-dist-1.16.1.jar:1.16.1]
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728) ~[flink-dist-1.16.1.jar:1.16.1]
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550) ~[flink-dist-1.16.1.jar:1.16.1]
        at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_144]
        Suppressed: org.apache.flink.util.SerializedThrowable: com.dtstack.chunjun.connector.elasticsearch7.org.elasticsearch.common.ParsingException: Failed to parse object: expecting field with name [error] but found [succeeded]
            at com.dtstack.chunjun.connector.elasticsearch7.org.elasticsearch.common.xcontent.XContentParserUtils.ensureFieldName(XContentParserUtils.java:50) ~[chunjun-connector-elasticsearch7-master.jar:?]
            at com.dtstack.chunjun.connector.elasticsearch7.org.elasticsearch.ElasticsearchException.failureFromXContent(ElasticsearchException.java:592) ~[chunjun-connector-elasticsearch7-master.jar:?]
            at com.dtstack.chunjun.connector.elasticsearch7.org.elasticsearch.rest.BytesRestResponse.errorFromXContent(BytesRestResponse.java:169) ~[chunjun-connector-elasticsearch7-master.jar:?]
            at com.dtstack.chunjun.connector.elasticsearch7.org.elasticsearch.client.RestHighLevelClient.parseEntity(RestHighLevelClient.java:1793) ~[chunjun-connector-elasticsearch7-master.jar:?]
            at com.dtstack.chunjun.connector.elasticsearch7.org.elasticsearch.client.RestHighLevelClient.parseResponseException(RestHighLevelClient.java:1770) ~[chunjun-connector-elasticsearch7-master.jar:?]
            at com.dtstack.chunjun.connector.elasticsearch7.org.elasticsearch.client.RestHighLevelClient.internalPerformRequest(RestHighLevelClient.java:1527) ~[chunjun-connector-elasticsearch7-master.jar:?]
            at com.dtstack.chunjun.connector.elasticsearch7.org.elasticsearch.client.RestHighLevelClient.performRequest(RestHighLevelClient.java:1484) ~[chunjun-connector-elasticsearch7-master.jar:?]
            at com.dtstack.chunjun.connector.elasticsearch7.org.elasticsearch.client.RestHighLevelClient.performRequestAndParseEntity(RestHighLevelClient.java:1454) ~[chunjun-connector-elasticsearch7-master.jar:?]
            at com.dtstack.chunjun.connector.elasticsearch7.org.elasticsearch.client.RestHighLevelClient.clearScroll(RestHighLevelClient.java:1126) ~[chunjun-connector-elasticsearch7-master.jar:?]
            at com.dtstack.chunjun.connector.elasticsearch7.source.ElasticsearchInputFormat.clearScroll(ElasticsearchInputFormat.java:142) ~[chunjun-connector-elasticsearch7-master.jar:?]
            at com.dtstack.chunjun.connector.elasticsearch7.source.ElasticsearchInputFormat.closeInternal(ElasticsearchInputFormat.java:127) ~[chunjun-connector-elasticsearch7-master.jar:?]
            at com.dtstack.chunjun.source.format.BaseRichInputFormat.close(BaseRichInputFormat.java:218) ~[chunjun-core.jar:?]
            at com.dtstack.chunjun.source.DtInputFormatSourceFunction.gracefulClose(DtInputFormatSourceFunction.java:162) ~[chunjun-core.jar:?]
            at com.dtstack.chunjun.source.DtInputFormatSourceFunction.close(DtInputFormatSourceFunction.java:157) ~[chunjun-core.jar:?]
            at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:41) ~[flink-dist-1.16.1.jar:1.16.1]
            at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.close(AbstractUdfStreamOperator.java:114) ~[flink-dist-1.16.1.jar:1.16.1]
            at org.apache.flink.streaming.api.operators.StreamSource.close(StreamSource.java:124) ~[flink-dist-1.16.1.jar:1.16.1]
            at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:163) ~[flink-dist-1.16.1.jar:1.16.1]
            at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.closeAllOperators(RegularOperatorChain.java:125) ~[flink-dist-1.16.1.jar:1.16.1]
            at org.apache.flink.streaming.runtime.tasks.StreamTask.closeAllOperators(StreamTask.java:1025) ~[flink-dist-1.16.1.jar:1.16.1]
            at org.apache.flink.util.IOUtils.closeAll(IOUtils.java:255) ~[flink-dist-1.16.1.jar:1.16.1]
            at org.apache.flink.core.fs.AutoCloseableRegistry.doClose(AutoCloseableRegistry.java:72) ~[flink-dist-1.16.1.jar:1.16.1]
            at org.apache.flink.util.AbstractAutoCloseableRegistry.close(AbstractAutoCloseableRegistry.java:127) ~[flink-dist-1.16.1.jar:1.16.1]
            at org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUp(StreamTask.java:943) ~[flink-dist-1.16.1.jar:1.16.1]
            at org.apache.flink.runtime.taskmanager.Task.lambda$restoreAndInvoke$0(Task.java:917) ~[flink-dist-1.16.1.jar:1.16.1]
            at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935) ~[flink-dist-1.16.1.jar:1.16.1]
            at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:917) ~[flink-dist-1.16.1.jar:1.16.1]
            at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728) ~[flink-dist-1.16.1.jar:1.16.1]
            at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550) ~[flink-dist-1.16.1.jar:1.16.1]
            at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_144]
    Caused by: org.apache.flink.util.SerializedThrowable: com.dtstack.chunjun.connector.elasticsearch7.org.elasticsearch.client.ResponseException: method [DELETE], host [http://xxxx:18008], URI [/_search/scroll], status line [HTTP/1.1 404 Not Found]
{"succeeded":true,"num_freed":0}
        at com.dtstack.chunjun.connector.elasticsearch7.org.elasticsearch.client.RestClient.convertResponse(RestClient.java:283) ~[chunjun-connector-elasticsearch7-master.jar:?]
        at com.dtstack.chunjun.connector.elasticsearch7.org.elasticsearch.client.RestClient.performRequest(RestClient.java:261) ~[chunjun-connector-elasticsearch7-master.jar:?]
        at com.dtstack.chunjun.connector.elasticsearch7.org.elasticsearch.client.RestClient.performRequest(RestClient.java:235) ~[chunjun-connector-elasticsearch7-master.jar:?]
        at com.dtstack.chunjun.connector.elasticsearch7.org.elasticsearch.client.RestHighLevelClient.internalPerformRequest(RestHighLevelClient.java:1514) ~[chunjun-connector-elasticsearch7-master.jar:?]
        ... 24 more
Caused by: org.apache.flink.util.SerializedThrowable: com.dtstack.chunjun.connector.elasticsearch7.org.elasticsearch.client.ResponseException: method [DELETE], host [http://xxxx:18008], URI [/_search/scroll], status line [HTTP/1.1 404 Not Found]
{"succeeded":true,"num_freed":0}
    at com.dtstack.chunjun.connector.elasticsearch7.org.elasticsearch.client.RestClient.convertResponse(RestClient.java:283) ~[chunjun-connector-elasticsearch7-master.jar:?]
    at com.dtstack.chunjun.connector.elasticsearch7.org.elasticsearch.client.RestClient.performRequest(RestClient.java:261) ~[chunjun-connector-elasticsearch7-master.jar:?]
    at com.dtstack.chunjun.connector.elasticsearch7.org.elasticsearch.client.RestClient.performRequest(RestClient.java:235) ~[chunjun-connector-elasticsearch7-master.jar:?]
    at com.dtstack.chunjun.connector.elasticsearch7.org.elasticsearch.client.RestHighLevelClient.internalPerformRequest(RestHighLevelClient.java:1514) ~[chunjun-connector-elasticsearch7-master.jar:?]
    at com.dtstack.chunjun.connector.elasticsearch7.org.elasticsearch.client.RestHighLevelClient.performRequest(RestHighLevelClient.java:1484) ~[chunjun-connector-elasticsearch7-master.jar:?]
    at com.dtstack.chunjun.connector.elasticsearch7.org.elasticsearch.client.RestHighLevelClient.performRequestAndParseEntity(RestHighLevelClient.java:1454) ~[chunjun-connector-elasticsearch7-master.jar:?]
    at com.dtstack.chunjun.connector.elasticsearch7.org.elasticsearch.client.RestHighLevelClient.clearScroll(RestHighLevelClient.java:1126) ~[chunjun-connector-elasticsearch7-master.jar:?]
    at com.dtstack.chunjun.connector.elasticsearch7.source.ElasticsearchInputFormat.clearScroll(ElasticsearchInputFormat.java:142) ~[chunjun-connector-elasticsearch7-master.jar:?]
    at com.dtstack.chunjun.connector.elasticsearch7.source.ElasticsearchInputFormat.closeInternal(ElasticsearchInputFormat.java:127) ~[chunjun-connector-elasticsearch7-master.jar:?]
    at com.dtstack.chunjun.source.format.BaseRichInputFormat.close(BaseRichInputFormat.java:218) ~[chunjun-core.jar:?]
    at com.dtstack.chunjun.source.DtInputFormatSourceFunction.gracefulClose(DtInputFormatSourceFunction.java:162) ~[chunjun-core.jar:?]
    at com.dtstack.chunjun.source.DtInputFormatSourceFunction.run(DtInputFormatSourceFunction.java:143) ~[chunjun-core.jar:?]
    at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110) ~[flink-dist-1.16.1.jar:1.16.1]
    at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:67) ~[flink-dist-1.16.1.jar:1.16.1]
    at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:333) ~[flink-dist-1.16.1.jar:1.16.1]
2023-06-27 16:28:29,147 INFO  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Stopping checkpoint coordinator for job 075a25a83a5ce14f82280f00b96508df.
2023-06-27 16:28:29,158 INFO  org.apache.flink.runtime.dispatcher.MiniDispatcher           [] - Job 075a25a83a5ce14f82280f00b96508df reached terminal state FAILED.
org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
    at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:139)
    at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:83)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.recordTaskFailure(DefaultScheduler.java:256)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:247)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.onTaskFailed(DefaultScheduler.java:240)
    at org.apache.flink.runtime.scheduler.SchedulerBase.onTaskExecutionStateUpdate(SchedulerBase.java:738)
    at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:715)
    at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:78)
    at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:477)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRpcInvocation$1(AkkaRpcActor.java:309)
    at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:307)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:222)
    at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:84)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:168)
    at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24)
    at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20)
    at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
    at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
    at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
    at akka.actor.Actor.aroundReceive(Actor.scala:537)
    at akka.actor.Actor.aroundReceive$(Actor.scala:535)
    at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580)
    at akka.actor.ActorCell.invoke(ActorCell.scala:548)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)
    at akka.dispatch.Mailbox.run(Mailbox.scala:231)
    at akka.dispatch.Mailbox.exec(Mailbox.scala:243)
    at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
    at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
    at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
    at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
Caused by: com.dtstack.chunjun.connector.elasticsearch7.org.elasticsearch.ElasticsearchStatusException: com.dtstack.chunjun.connector.elasticsearch7.org.elasticsearch.ElasticsearchStatusException: Unable to parse response body
    at com.dtstack.chunjun.connector.elasticsearch7.org.elasticsearch.client.RestHighLevelClient.parseResponseException(RestHighLevelClient.java:1773)
    at com.dtstack.chunjun.connector.elasticsearch7.org.elasticsearch.client.RestHighLevelClient.internalPerformRequest(RestHighLevelClient.java:1527)
    at com.dtstack.chunjun.connector.elasticsearch7.org.elasticsearch.client.RestHighLevelClient.performRequest(RestHighLevelClient.java:1484)
    at com.dtstack.chunjun.connector.elasticsearch7.org.elasticsearch.client.RestHighLevelClient.performRequestAndParseEntity(RestHighLevelClient.java:1454)
    at com.dtstack.chunjun.connector.elasticsearch7.org.elasticsearch.client.RestHighLevelClient.clearScroll(RestHighLevelClient.java:1126)
    at com.dtstack.chunjun.connector.elasticsearch7.source.ElasticsearchInputFormat.clearScroll(ElasticsearchInputFormat.java:142)
    at com.dtstack.chunjun.connector.elasticsearch7.source.ElasticsearchInputFormat.closeInternal(ElasticsearchInputFormat.java:127)
    at com.dtstack.chunjun.source.format.BaseRichInputFormat.close(BaseRichInputFormat.java:218)
    at com.dtstack.chunjun.source.DtInputFormatSourceFunction.gracefulClose(DtInputFormatSourceFunction.java:162)
    at com.dtstack.chunjun.source.DtInputFormatSourceFunction.run(DtInputFormatSourceFunction.java:143)
    at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
    at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:67)
    at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:333)
    Suppressed: com.dtstack.chunjun.connector.elasticsearch7.org.elasticsearch.common.ParsingException: com.dtstack.chunjun.connector.elasticsearch7.org.elasticsearch.common.ParsingException: Failed to parse object: expecting field with name [error] but found [succeeded]
        at com.dtstack.chunjun.connector.elasticsearch7.org.elasticsearch.common.xcontent.XContentParserUtils.ensureFieldName(XContentParserUtils.java:50)
        at com.dtstack.chunjun.connector.elasticsearch7.org.elasticsearch.ElasticsearchException.failureFromXContent(ElasticsearchException.java:592)
        at com.dtstack.chunjun.connector.elasticsearch7.org.elasticsearch.rest.BytesRestResponse.errorFromXContent(BytesRestResponse.java:169)
        at com.dtstack.chunjun.connector.elasticsearch7.org.elasticsearch.client.RestHighLevelClient.parseEntity(RestHighLevelClient.java:1793)
        at com.dtstack.chunjun.connector.elasticsearch7.org.elasticsearch.client.RestHighLevelClient.parseResponseException(RestHighLevelClient.java:1770)
        ... 12 more
    Suppressed: com.dtstack.chunjun.connector.elasticsearch7.org.elasticsearch.ElasticsearchStatusException: com.dtstack.chunjun.connector.elasticsearch7.org.elasticsearch.ElasticsearchStatusException: Unable to parse response body
        at com.dtstack.chunjun.connector.elasticsearch7.org.elasticsearch.client.RestHighLevelClient.parseResponseException(RestHighLevelClient.java:1773)
        at com.dtstack.chunjun.connector.elasticsearch7.org.elasticsearch.client.RestHighLevelClient.internalPerformRequest(RestHighLevelClient.java:1527)
        at com.dtstack.chunjun.connector.elasticsearch7.org.elasticsearch.client.RestHighLevelClient.performRequest(RestHighLevelClient.java:1484)
        at com.dtstack.chunjun.connector.elasticsearch7.org.elasticsearch.client.RestHighLevelClient.performRequestAndParseEntity(RestHighLevelClient.java:1454)
        at com.dtstack.chunjun.connector.elasticsearch7.org.elasticsearch.client.RestHighLevelClient.clearScroll(RestHighLevelClient.java:1126)
        at com.dtstack.chunjun.connector.elasticsearch7.source.ElasticsearchInputFormat.clearScroll(ElasticsearchInputFormat.java:142)
        at com.dtstack.chunjun.connector.elasticsearch7.source.ElasticsearchInputFormat.closeInternal(ElasticsearchInputFormat.java:127)
        at com.dtstack.chunjun.source.format.BaseRichInputFormat.close(BaseRichInputFormat.java:218)
        at com.dtstack.chunjun.source.DtInputFormatSourceFunction.gracefulClose(DtInputFormatSourceFunction.java:162)
        at com.dtstack.chunjun.source.DtInputFormatSourceFunction.close(DtInputFormatSourceFunction.java:157)
        at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:41)
        at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.close(AbstractUdfStreamOperator.java:114)
        at org.apache.flink.streaming.api.operators.StreamSource.close(StreamSource.java:124)
        at org.apache.flink.streaming.runtime.tasks.StreamOperatorWrapper.close(StreamOperatorWrapper.java:163)
        at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.closeAllOperators(RegularOperatorChain.java:125)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.closeAllOperators(StreamTask.java:1025)
        at org.apache.flink.util.IOUtils.closeAll(IOUtils.java:255)
        at org.apache.flink.core.fs.AutoCloseableRegistry.doClose(AutoCloseableRegistry.java:72)
        at org.apache.flink.util.AbstractAutoCloseableRegistry.close(AbstractAutoCloseableRegistry.java:127)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUp(StreamTask.java:943)
        at org.apache.flink.runtime.taskmanager.Task.lambda$restoreAndInvoke$0(Task.java:917)
        at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935)
        at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:917)
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550)
        at java.lang.Thread.run(Thread.java:748)
        Suppressed: com.dtstack.chunjun.connector.elasticsearch7.org.elasticsearch.common.ParsingException: com.dtstack.chunjun.connector.elasticsearch7.org.elasticsearch.common.ParsingException: Failed to parse object: expecting field with name [error] but found [succeeded]
            at com.dtstack.chunjun.connector.elasticsearch7.org.elasticsearch.common.xcontent.XContentParserUtils.ensureFieldName(XContentParserUtils.java:50)
            at com.dtstack.chunjun.connector.elasticsearch7.org.elasticsearch.ElasticsearchException.failureFromXContent(ElasticsearchException.java:592)
            at com.dtstack.chunjun.connector.elasticsearch7.org.elasticsearch.rest.BytesRestResponse.errorFromXContent(BytesRestResponse.java:169)
            at com.dtstack.chunjun.connector.elasticsearch7.org.elasticsearch.client.RestHighLevelClient.parseEntity(RestHighLevelClient.java:1793)
            at com.dtstack.chunjun.connector.elasticsearch7.org.elasticsearch.client.RestHighLevelClient.parseResponseException(RestHighLevelClient.java:1770)
            ... 25 more
    Caused by: com.dtstack.chunjun.connector.elasticsearch7.org.elasticsearch.client.ResponseException: com.dtstack.chunjun.connector.elasticsearch7.org.elasticsearch.client.ResponseException: method [DELETE], host [http://xxxx:18008], URI [/_search/scroll], status line [HTTP/1.1 404 Not Found]
{"succeeded":true,"num_freed":0}
        at com.dtstack.chunjun.connector.elasticsearch7.org.elasticsearch.client.RestClient.convertResponse(RestClient.java:283)
        at com.dtstack.chunjun.connector.elasticsearch7.org.elasticsearch.client.RestClient.performRequest(RestClient.java:261)
        at com.dtstack.chunjun.connector.elasticsearch7.org.elasticsearch.client.RestClient.performRequest(RestClient.java:235)
        at com.dtstack.chunjun.connector.elasticsearch7.org.elasticsearch.client.RestHighLevelClient.internalPerformRequest(RestHighLevelClient.java:1514)
        ... 24 more
Caused by: com.dtstack.chunjun.connector.elasticsearch7.org.elasticsearch.client.ResponseException: com.dtstack.chunjun.connector.elasticsearch7.org.elasticsearch.client.ResponseException: method [DELETE], host [http://xxxx:18008], URI [/_search/scroll], status line [HTTP/1.1 404 Not Found]
{"succeeded":true,"num_freed":0}
    at com.dtstack.chunjun.connector.elasticsearch7.org.elasticsearch.client.RestClient.convertResponse(RestClient.java:283)
    at com.dtstack.chunjun.connector.elasticsearch7.org.elasticsearch.client.RestClient.performRequest(RestClient.java:261)
    at com.dtstack.chunjun.connector.elasticsearch7.org.elasticsearch.client.RestClient.performRequest(RestClient.java:235)
    at com.dtstack.chunjun.connector.elasticsearch7.org.elasticsearch.client.RestHighLevelClient.internalPerformRequest(RestHighLevelClient.java:1514)
    ... 11 more
2023-06-27 16:28:29,163 INFO  org.apache.flink.runtime.dispatcher.MiniDispatcher           [] - Job 075a25a83a5ce14f82280f00b96508df has been registered for cleanup in the JobResultStore after reaching a terminal state.

What you expected to happen

--

How to reproduce

CREATE TABLE source
(
  `referer` string,
  `agent` string,
  `route_id` string,
  `scheme` string,
  `request_completion` string,
  `consumer_name` string,
  `request_time` float,
  `proxy_host` string,
  `host` string,
  `user_ip` string,
  `method` string,
  `prophet_token` string,
  `uri` string,
  `args` string,
  `request_id` string,
  `status` string
) WITH (
      'password' = 'xxx',
      'connector' = 'elasticsearch7-x',
      'sink.bulk-flush.max-actions' = '10000',
      'hosts' = 'xxx',
      'client.connect-timeout' = '10000',
      'index' = 'xxx',
      'search-query' = '{

    "range":{

      "@timestamp": {"gte":"2023-05-01T10:00:00.000Z",
                                 "lt":"2023-05-20T00:00:00.000Z"}

  }

}',
      'username' = 'xxx'
      );
CREATE TABLE sink
(
  `referer` string,
  `agent` string,
  `route_id` string,
  `scheme` string,
  `request_completion` string,
  `consumer_name` string,
  `request_time` string,
  `proxy_host` string,
  `host` string,
  `user_ip` string,
  `method` string,
  `prophet_token` string,
  `uri` string,
  `args` string,
  `request_id` string,
  `status` string
) WITH (
      'password' = 'xxx',
      'connector' = 'mysql-x',
      'sink.buffer-flush.interval' = '2000',
      'sink.post-sql' = '',
      'sink.buffer-flush.max-rows' = '1024',
      'sink.all-replace' = 'false',
      'sink.pre-sql' = '',
      'table-name' = 'test_table',
      'sink.parallelism' = '1',
      'url' = 'jdbc:mysql://xxxx/xxx?useUnicode=true&characterEncoding=utf-8&useSSL=false&connectTimeout=3000&useunicode=true&characterEncoding=utf8&useSSL=false&useCursorFetch=true&tinyInt1isBit=false&zeroDateTimeBehavior=convertToNull',
      'username' = 'xxx'
      );
insert into sink 
select 
substring(referer,1,200) as `referer`,
substring(agent,1,200) as `agent`,
`route_id` as `route_id`,
`scheme` as `scheme`,
`request_completion` as `request_completion`,
`consumer_name` as `consumer_name`,
CAST(`request_time` AS string) as `request_time`,
`proxy_host` as `proxy_host`,
`host` as `host`,
`user_ip` as `user_ip`,
`method` as `method`,
`prophet_token` as `prophet_token`,
substring(uri,1,200) as `uri`,
substring(args,1,200) as `args`,
`request_id` as `request_id`,
`status` as `status`
 from source;

Anything else

No response

Version

master

Are you willing to submit PR?

Code of Conduct

libailin commented 1 year ago

java.lang.OutOfMemoryError: Direct buffer memory 这个错误减小 读取es分页大小,