opensearch-project / OpenSearch

🔎 Open source distributed and RESTful search engine.
https://opensearch.org/docs/latest/opensearch/index/
Apache License 2.0
9.77k stars 1.82k forks source link

[Feature Request] RestClient does not retry if only given one endpoint (e.g. load balancer) #12563

Open ruigyang-wish opened 8 months ago

ruigyang-wish commented 8 months ago

Is your feature request related to a problem? Please describe

At present, we are using the AWS managed openSearch, and we use the AWS endpoint, such as https://vpc-57v4bbnpjsz6gmcmhoi2ca.us-west-1.es.amazonaws.com/, as the openSearch host. Actually, there are several nodes behind the endpoint.

Sometimes, the cluster maybe very busy and one of openSearch server node returns 502 Bad Gateway, then the job crashed, below is the call stack, and we didn't observe significant cpu/mem usage issues at that time.

org.apache.flink.util.SerializedThrowable: org.opensearch.client.ResponseException: method [POST], host https://vpc-57v4bbnpjsz6gmcmhoi2ca.us-west-1.es.amazonaws.com//, URI [/_bulk?timeout=1m], status line [HTTP/1.1 502 Bad Gateway]
<html>
<head><title>502 Bad Gateway</title></head>
<body>
<center><h1>502 Bad Gateway</h1></center>
</body>
</html>

at org.opensearch.client.RestClient.convertResponse(RestClient.java:375)
at org.opensearch.client.RestClient$1.completed(RestClient.java:425)
at org.opensearch.client.RestClient$1.completed(RestClient.java:421)
at org.apache.http.concurrent.BasicFuture.completed(BasicFuture.java:122)
at org.apache.http.impl.nio.client.DefaultClientExchangeHandlerImpl.responseCompleted(DefaultClientExchangeHandlerImpl.java:182)
at org.apache.http.nio.protocol.HttpAsyncRequestExecutor.processResponse(HttpAsyncRequestExecutor.java:448)
at org.apache.http.nio.protocol.HttpAsyncRequestExecutor.inputReady(HttpAsyncRequestExecutor.java:338)
at org.apache.http.impl.nio.DefaultNHttpClientConnection.consumeInput(DefaultNHttpClientConnection.java:265)
at org.apache.http.impl.nio.client.InternalIODispatch.onInputReady(InternalIODispatch.java:87)
at org.apache.http.impl.nio.client.InternalIODispatch.onInputReady(InternalIODispatch.java:40)
at org.apache.http.impl.nio.reactor.AbstractIODispatch.inputReady(AbstractIODispatch.java:121)
at org.apache.http.impl.nio.reactor.BaseIOReactor.readable(BaseIOReactor.java:162)
at org.apache.http.impl.nio.reactor.AbstractIOReactor.processEvent(AbstractIOReactor.java:337)
at org.apache.http.impl.nio.reactor.AbstractIOReactor.processEvents(AbstractIOReactor.java:315)
at org.apache.http.impl.nio.reactor.AbstractIOReactor.execute(AbstractIOReactor.java:276)
at org.apache.http.impl.nio.reactor.BaseIOReactor.execute(BaseIOReactor.java:104)
at org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor$Worker.run(AbstractMultiworkerIOReactor.java:591)
at java.lang.Thread.run
org.apache.flink.util.SerializedThrowable: org.opensearch.OpenSearchStatusException: Unable to parse response body
at org.opensearch.client.RestHighLevelClient.parseResponseException(RestHighLevelClient.java:2208)
at org.opensearch.client.RestHighLevelClient$1.onFailure(RestHighLevelClient.java:2116)
at org.opensearch.client.RestClient$FailureTrackingResponseListener.onDefinitiveFailure(RestClient.java:707)
at org.opensearch.client.RestClient$1.completed(RestClient.java:433)
at org.opensearch.client.RestClient$1.completed(RestClient.java:421)
at org.apache.http.concurrent.BasicFuture.completed(BasicFuture.java:122)
at org.apache.http.impl.nio.client.DefaultClientExchangeHandlerImpl.responseCompleted(DefaultClientExchangeHandlerImpl.java:182)
at org.apache.http.nio.protocol.HttpAsyncRequestExecutor.processResponse(HttpAsyncRequestExecutor.java:448)
at org.apache.http.nio.protocol.HttpAsyncRequestExecutor.inputReady(HttpAsyncRequestExecutor.java:338)
at org.apache.http.impl.nio.DefaultNHttpClientConnection.consumeInput(DefaultNHttpClientConnection.java:265)
at org.apache.http.impl.nio.client.InternalIODispatch.onInputReady(InternalIODispatch.java:87)
at org.apache.http.impl.nio.client.InternalIODispatch.onInputReady(InternalIODispatch.java:40)
at org.apache.http.impl.nio.reactor.AbstractIODispatch.inputReady(AbstractIODispatch.java:121)
at org.apache.http.impl.nio.reactor.BaseIOReactor.readable(BaseIOReactor.java:162)
at org.apache.http.impl.nio.reactor.AbstractIOReactor.processEvent(AbstractIOReactor.java:337)
at org.apache.http.impl.nio.reactor.AbstractIOReactor.processEvents(AbstractIOReactor.java:315)
at org.apache.http.impl.nio.reactor.AbstractIOReactor.execute(AbstractIOReactor.java:276)
at org.apache.http.impl.nio.reactor.BaseIOReactor.execute(BaseIOReactor.java:104)
at org.apache.http.impl.nio.reactor.AbstractMultiworkerIOReactor$Worker.run(AbstractMultiworkerIOReactor.java:591)
at java.lang.Thread.run
org.apache.flink.util.SerializedThrowable: java.lang.RuntimeException: An error occurred in OpensearchSink.
at org.apache.flink.streaming.connectors.opensearch.OpensearchSink.checkErrorAndRethrow(OpensearchSink.java:510)
at org.apache.flink.streaming.connectors.opensearch.OpensearchSink.checkAsyncErrorsAndRequests(OpensearchSink.java:515)
at org.apache.flink.streaming.connectors.opensearch.OpensearchSink.invoke(OpensearchSink.java:341)
at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:54)
at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:233)
at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)
at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:542)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:831)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:780)
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:914)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550)
at java.lang.Thread.run
org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:139)
at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:83)
at org.apache.flink.runtime.scheduler.DefaultScheduler.recordTaskFailure(DefaultScheduler.java:256)
at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:247)
at org.apache.flink.runtime.scheduler.DefaultScheduler.onTaskFailed(DefaultScheduler.java:240)
at org.apache.flink.runtime.scheduler.SchedulerBase.onTaskExecutionStateUpdate(SchedulerBase.java:738)
at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:715)
at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:78)
at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:477)
at jdk.internal.reflect.GeneratedMethodAccessor439.invoke
at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke
at java.lang.reflect.Method.invoke

According to the code of openSearch, the openSearch client will mark the host as dead if the openSearch server returns 502 Bad Gateway, then try to forward the request to other available hosts. So if we only pass the load balancer URL of our openSearch cluster, which means the program will crash and won't retry.

https://github.com/opensearch-project/OpenSearch/blob/main/client/rest/src/main/java/org/opensearch/client/RestClient.java#L386-L393

private ResponseOrResponseException convertResponse(InternalRequest request, Node node, ClassicHttpResponse httpResponse)
    throws IOException {
    //....
    ResponseException responseException = new ResponseException(response);
    if (isRetryStatus(statusCode)) {
        // mark host dead and retry against next one
        onFailure(node);
        return new ResponseOrResponseException(responseException);
    }
    // mark host alive and don't retry, as the error should be a request problem
    onResponse(node);
    throw responseException;
}
private static boolean isRetryStatus(int statusCode) {
    switch (statusCode) {
        case 502:
        case 503:
        case 504:
            return true;
    }
    return false;
}

Describe the solution you'd like

So I suggest openSearch client should expose 2 parameters for each openSearch host.

Related component

Clients

Describe alternatives you've considered

Alternatively, I suggest to expose one parameter to allow the openSearch client user can set the maximum retry times before marking one host as dead.

Additional context

NA

andrross commented 8 months ago

[Triage - attendees 1 2 3] @ruigyang-wish Thanks for filing this issue. The behavior you're seeing does match the design of the client. Looking forward to more discussion here.

ruigyang-wish commented 8 months ago

@andrross Thanks for the triage. I also did further investigation for this issue, it seems there is no elegant solution to identify the passed host is load balancer or not in the RestClient. Of course, we can expose another parameter in RestClientBuilder to allow the users to indicate the host is load balancer or not.

andrross commented 8 months ago

@ruigyang-wish Can you pass the same endpoint multiple times so it appears to the client like there are multiple hosts to retry across? Or does it dedupe?

dblock commented 7 months ago

Do other transports in https://github.com/opensearch-project/opensearch-java fix this problem?

ruigyang-wish commented 7 months ago

@ruigyang-wish Can you pass the same endpoint multiple times so it appears to the client like there are multiple hosts to retry across? Or does it dedupe?

@andrross I also considered this way, but openSearch is using map to maintain the dead hosts, so it will dedupe. https://github.com/opensearch-project/OpenSearch/blob/main/client/rest/src/main/java/org/opensearch/client/RestClient.java#L139

private final ConcurrentMap<HttpHost, DeadHostState> denylist = new ConcurrentHashMap<>();