InfluxCommunity / influxdb3-java

The Java Client that provides a simple and convenient way to interact with InfluxDB 3.
https://InfluxCommunity.github.io/influxdb3-java/apidocs/com/influxdb/v3/client/package-summary.html
MIT License
13 stars 3 forks source link

Manual client refresh required after FlightRuntimeException with FlightStatusCode.UNAVAILABLE due to ConnectTimeoutException #129

Open mps2209 opened 5 months ago

mps2209 commented 5 months ago

Specifications

Code sample to reproduce problem

Its hard to reproduce the error, because we do not know why the exception is thrown in the first place. We suspect that some configuration change on influx side, causes the client to run into the timeout exception.

This was our setup: We initialize our client as a bean and inject it into our services.

    @Bean
    public InfluxDBClient influxDB(InfluxConfiguration influxConfig) {
        return InfluxDBClient.getInstance(influxConfig.getHost(), influxConfig.getToken().toCharArray(), influxConfig.getDatabase());
    }

But sometimes something goes wrong an this exception is thrown. Then it was the case, that the client got stuck in this error state and threw this exception with each request.

Caused by: org.apache.arrow.flight.FlightRuntimeException: UNAVAILABLE: io exception
    at org.apache.arrow.flight.CallStatus.toRuntimeException(CallStatus.java:131)
    at org.apache.arrow.flight.grpc.StatusUtils.fromGrpcRuntimeException(StatusUtils.java:164)
    at org.apache.arrow.flight.grpc.StatusUtils.fromThrowable(StatusUtils.java:185)
    at org.apache.arrow.flight.FlightStream$Observer.onError(FlightStream.java:456)
    at org.apache.arrow.flight.FlightClient$1.onError(FlightClient.java:350)

The client works again after restarting the service.

Expected behavior

We expect not having to create a new client for every request. We load-tested this approach and it was noticeably slower. Therefore, we initialize the client only once, at startup of our service.

Actual behavior

Once the exception from the code example is thrown due to some change on influx side, the client gets stuck in this error mode. We had to implement a proxy/factory that creates a new client when this exception is thrown. We are unsure what is the intended way of using the client. To create a new client for every request, as the code example in the readme shows, seems not feasible: try (InfluxDBClient client = InfluxDBClient.getInstance(host, token, database))

Additional info

No response

bednar commented 5 months ago

Hi @mps2209,

Thank you for using our client and for bringing this issue to our attention.

The client should be designed to recover from a ConnectTimeoutException, and subsequent uses should function correctly without interruption. I appreciate you pointing out that this may not works as expected, and I will prioritize investigating this issue as soon as possible.

To assist in diagnosing the problem effectively, do you know any specific steps that could help us easily replicate this behavior?

Best Regards

mps2209 commented 5 months ago

Thank you for giving our issue attention. Regrettably I cant really provide anymore information, except that we have multiple services, and when it happens, it happens to several of them. We just had it happen again, so sadly the fix on our side did not work. We are currently investigating and I will update if we find out anything. Maybe the time frame can help you if something on influx side changed during that time. It must have happened after 27.04.24 19:50 UTC and before 28.04.24 5:14 UTC That was the last successful and the first failed request from our services.

karel-rehor commented 4 months ago

Hi @mps2209

I'm trying to recreate this issue. I've managed to get a stack trace with the first six lines you've included above, however instead of a ConnectTimeoutException the recreation is with the underlying exception io.netty.channel.AbstractChannel$AnnotatedConnectException, so I doubt this is a true recreation of the original issue. I'm assuming from the issue description that the base exception thrown is io.netty.channel.ConnectTimeoutException. Can you add a full stack trace with the underlying exception ConnectTimeoutException?

Thanks

mps2209 commented 4 months ago

This is the most I can extract from our logs: Original Stack Trace:

....
        at reactor.core.publisher.MonoCallable$MonoCallableSubscription.request(MonoCallable.java:137)
        at reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.request(FluxHide.java:152)
        at reactor.core.publisher.FluxFlattenIterable$FlattenIterableSubscriber.onSubscribe(FluxFlattenIterable.java:241)
        at io.opentelemetry.javaagent.shaded.instrumentation.reactor.v3_1.TracingSubscriber.lambda$onSubscribe$0(TracingSubscriber.java:57)
        at io.opentelemetry.javaagent.shaded.instrumentation.reactor.v3_1.TracingSubscriber.withActiveSpan(TracingSubscriber.java:83)
        at io.opentelemetry.javaagent.shaded.instrumentation.reactor.v3_1.TracingSubscriber.onSubscribe(TracingSubscriber.java:57)
        at reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.onSubscribe(FluxHide.java:122)
        at reactor.core.publisher.MonoCallable.subscribe(MonoCallable.java:48)
        at reactor.core.publisher.InternalFluxOperator.subscribe(InternalFluxOperator.java:68)
        at reactor.core.publisher.FluxSubscribeOn$SubscribeOnSubscriber.run(FluxSubscribeOn.java:194)
        at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:84)
        at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:37)
        at java.base/java.util.concurrent.FutureTask.run(Unknown Source)
        at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown Source)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
        at java.base/java.lang.Thread.run(Unknown Source)
Caused by: org.apache.arrow.flight.FlightRuntimeException: UNAVAILABLE: io exception
    at org.apache.arrow.flight.CallStatus.toRuntimeException(CallStatus.java:131)
    at org.apache.arrow.flight.grpc.StatusUtils.fromGrpcRuntimeException(StatusUtils.java:164)
    at org.apache.arrow.flight.grpc.StatusUtils.fromThrowable(StatusUtils.java:185)
    at org.apache.arrow.flight.FlightStream$Observer.onError(FlightStream.java:456)
    at org.apache.arrow.flight.FlightClient$1.onError(FlightClient.java:350)
    at io.grpc.stub.ClientCalls$StreamObserverToCallListenerAdapter.onClose(ClientCalls.java:481)
    at io.grpc.PartialForwardingClientCallListener.onClose(PartialForwardingClientCallListener.java:39)
    at io.grpc.ForwardingClientCallListener.onClose(ForwardingClientCallListener.java:23)
    at io.grpc.ForwardingClientCallListener$SimpleForwardingClientCallListener.onClose(ForwardingClientCallListener.java:40)
    at org.apache.arrow.flight.grpc.ClientInterceptorAdapter$FlightClientCallListener.onClose(ClientInterceptorAdapter.java:117)
    at io.opentelemetry.javaagent.shaded.instrumentation.grpc.v1_6.TracingClientInterceptor$TracingClientCall$TracingClientCallListener.onClose(TracingClientInterceptor.java:158)
    at io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:574)
    at io.grpc.internal.ClientCallImpl.access$300(ClientCallImpl.java:72)
    at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInternal(ClientCallImpl.java:742)
    at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInContext(ClientCallImpl.java:723)
    at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
    at io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
    at java.base/java.lang.Thread.run(Unknown Source)
Caused by: io.netty.channel.ConnectTimeoutException: connection timed out after 30000 ms: 84984b87-7c2b-4d60-8f1a-964a1c1cda4f.a.influxdb.io/3.127.138.48:443
    at io.netty.channel.epoll.AbstractEpollChannel$AbstractEpollUnsafe$2.run(AbstractEpollChannel.java:613)
    at io.netty.util.concurrent.PromiseTask.runTask(PromiseTask.java:98)
    at io.netty.util.concurrent.ScheduledFutureTask.run(ScheduledFutureTask.java:153)
    at io.netty.util.concurrent.AbstractEventExecutor.runTask(AbstractEventExecutor.java:173)
    at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:166)
    at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:470)
    at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:416)
    at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
    at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    ... 1 more

I hope this helps

bednar commented 4 months ago

Hi @mps2209,

Unfortunately, we are unable to replicate the issue you described in our testing environment. Our client successfully recovers from network exceptions as expected, which suggests there might be specific factors in your setup influencing the behavior you're encountering.

To help us understand and potentially solve the problem, could you provide more details about your implementation? Specifically:

  1. Code Integration: How are you integrating the client with reactor? Sharing the relevant parts of your code where the client is set up and used might help us identify any integration issues.

  2. Error Handling: How do you handle errors from the client within your application?

If possible, please include code snippets or a broader description of your architecture, as these details will be crucial in diagnosing the issue more effectively.

Thank you for your cooperation, and looking forward to your response.

Best regards

mps2209 commented 4 months ago

Im not sure what you mean with reactor, but we used to inject the client like this:

@Bean
public InfluxDBClient influxDB(InfluxConfiguration influxConfig) {
    return InfluxDBClient.getInstance(influxConfig.getHost(), influxConfig.getToken().toCharArray(), influxConfig.getDatabase());
}

And query influx like this:

    try (var stream = queryExecutor.execute(query, queryOptions)) {
        ...
    } catch (Exception ex) {
        ...
    }
}
 public Stream<PointValues> execute(String sqlQuery, QueryOptions queryOptions) {
    return influxClient.queryPoints(sqlQuery, queryOptions);
}

We have now changed this to circumvent the timeout error and instead of the @Bean we implemented a Proxy/Factory and are calling the refresh client method of the proxy, in the catch block from above.

public class InfluxDBClientProxy {

private final InfluxConfiguration influxConfig;
private volatile InfluxDBClient influxDBClient;

public synchronized void refreshClient() {
    log.warn("refreshing the client");
    if (influxDBClient != null) {
        try {
            influxDBClient.close();
        } catch (Exception e) {
            log.debug("Could not close influx db client: {}", e.getMessage());
        }
    }
    this.influxDBClient = null;
}

public InfluxDBClient getClient() {
    if (influxDBClient == null) {
        synchronized (this) {
            if (influxDBClient == null) {
                this.influxDBClient = createNewClient();
            }
        }
    }
    return influxDBClient;
}

protected InfluxDBClient createNewClient() {
    return InfluxDBClient.getInstance(influxConfig.getHost(), influxConfig.getToken().toCharArray(), influxConfig.getDatabase());
}
}

Hope this helps! kind regards

bednar commented 4 months ago

@mps2209, Thanks for providing detailed information. To further understand the context and possibly identify any compatibility or configuration issues, could you let us know which version of Spring you are using? Additionally, are you using Spring Boot in your project?

mps2209 commented 4 months ago

@bednar yes we are using spring boot, here are the relevant entries in our pom.xml:

    <artifactId>spring-boot-starter-parent</artifactId>
    <version>3.2.4</version>

    <java.version>17</java.version>

    <!-- influx -->
    <dependency>
        <groupId>org.influxdb</groupId>
        <artifactId>influxdb-java</artifactId>
        <version>2.23</version>
    </dependency>
    <!-- influxdb3 used for SQL queries -->
    <dependency>
        <groupId>com.influxdb</groupId>
        <artifactId>influxdb3-java</artifactId>
        <version>0.7.0</version>
    </dependency>
mps2209 commented 4 months ago

Small update from our side. We have encountered the mentioned exception again, but our fix to refresh the client once we catch an exception seems to have worked. We had an error on all of our stages,but some stages showed a different exception. I can't guarantuee that these exceptions would have led to the same behaviour, as our fix seems to work now. But I will share the stacktrace anyway, maybe it helps: This was the first exception that was new to me:


Original Stack Trace:
        at reactor.core.publisher.MonoCallable$MonoCallableSubscription.request(MonoCallable.java:137)
        at reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.request(FluxHide.java:152)
        at reactor.core.publisher.FluxFlattenIterable$FlattenIterableSubscriber.onSubscribe(FluxFlattenIterable.java:241)
        at io.opentelemetry.javaagent.shaded.instrumentation.reactor.v3_1.TracingSubscriber.lambda$onSubscribe$0(TracingSubscriber.java:59)
        at io.opentelemetry.javaagent.shaded.instrumentation.reactor.v3_1.TracingSubscriber.withActiveSpan(TracingSubscriber.java:97)
        at io.opentelemetry.javaagent.shaded.instrumentation.reactor.v3_1.TracingSubscriber.withActiveSpan(TracingSubscriber.java:91)
        at io.opentelemetry.javaagent.shaded.instrumentation.reactor.v3_1.TracingSubscriber.onSubscribe(TracingSubscriber.java:59)
        at reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.onSubscribe(FluxHide.java:122)
        at reactor.core.publisher.MonoCallable.subscribe(MonoCallable.java:48)
        at reactor.core.publisher.InternalFluxOperator.subscribe(InternalFluxOperator.java:68)
        at reactor.core.publisher.FluxSubscribeOn$SubscribeOnSubscriber.run(FluxSubscribeOn.java:194)
        at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:84)
        at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:37)
        at java.base/java.util.concurrent.FutureTask.run(Unknown Source)
        at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown Source)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
        at java.base/java.lang.Thread.run(Unknown Source)
Caused by: org.apache.arrow.flight.FlightRuntimeException: UNAVAILABLE: io exception
Channel Pipeline: [SslHandler#0, ProtocolNegotiators$ClientTlsHandler#0, WriteBufferingAndExceptionHandler#0, DefaultChannelPipeline$TailContext#0]
    at org.apache.arrow.flight.CallStatus.toRuntimeException(CallStatus.java:131)
    at org.apache.arrow.flight.grpc.StatusUtils.fromGrpcRuntimeException(StatusUtils.java:164)
    at org.apache.arrow.flight.grpc.StatusUtils.fromThrowable(StatusUtils.java:185)
    at org.apache.arrow.flight.FlightStream$Observer.onError(FlightStream.java:456)
    at org.apache.arrow.flight.FlightClient$1.onError(FlightClient.java:350)
    at io.grpc.stub.ClientCalls$StreamObserverToCallListenerAdapter.onClose(ClientCalls.java:481)
    at io.grpc.PartialForwardingClientCallListener.onClose(PartialForwardingClientCallListener.java:39)
    at io.grpc.ForwardingClientCallListener.onClose(ForwardingClientCallListener.java:23)
    at io.grpc.ForwardingClientCallListener$SimpleForwardingClientCallListener.onClose(ForwardingClientCallListener.java:40)
    at org.apache.arrow.flight.grpc.ClientInterceptorAdapter$FlightClientCallListener.onClose(ClientInterceptorAdapter.java:117)
    at io.opentelemetry.javaagent.shaded.instrumentation.grpc.v1_6.TracingClientInterceptor$TracingClientCall$TracingClientCallListener.onClose(TracingClientInterceptor.java:159)
    at io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:574)
    at io.grpc.internal.ClientCallImpl.access$300(ClientCallImpl.java:72)
    at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInternal(ClientCallImpl.java:742)
    at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInContext(ClientCallImpl.java:723)
    at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
    at io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
    at java.base/java.lang.Thread.run(Unknown Source)
Caused by: javax.net.ssl.SSLHandshakeException: PKIX path validation failed: java.security.cert.CertPathValidatorException: validity check failed
    at java.base/sun.security.ssl.Alert.createSSLException(Unknown Source)
    at java.base/sun.security.ssl.TransportContext.fatal(Unknown Source)
    at java.base/sun.security.ssl.TransportContext.fatal(Unknown Source)
    at java.base/sun.security.ssl.TransportContext.fatal(Unknown Source)
    at java.base/sun.security.ssl.CertificateMessage$T12CertificateConsumer.checkServerCerts(Unknown Source)
    at java.base/sun.security.ssl.CertificateMessage$T12CertificateConsumer.onCertificate(Unknown Source)
    at java.base/sun.security.ssl.CertificateMessage$T12CertificateConsumer.consume(Unknown Source)
    at java.base/sun.security.ssl.SSLHandshake.consume(Unknown Source)
    at java.base/sun.security.ssl.HandshakeContext.dispatch(Unknown Source)
    at java.base/sun.security.ssl.SSLEngineImpl$DelegatedTask$DelegatedAction.run(Unknown Source)
    at java.base/sun.security.ssl.SSLEngineImpl$DelegatedTask$DelegatedAction.run(Unknown Source)
    at java.base/java.security.AccessController.doPrivileged(Unknown Source)
    at java.base/sun.security.ssl.SSLEngineImpl$DelegatedTask.run(Unknown Source)
    at io.netty.handler.ssl.SslHandler$SslTasksRunner.run(SslHandler.java:1889)
    ... 3 more
Caused by: sun.security.validator.ValidatorException: PKIX path validation failed: java.security.cert.CertPathValidatorException: validity check failed
    at java.base/sun.security.validator.PKIXValidator.doValidate(Unknown Source)
    at java.base/sun.security.validator.PKIXValidator.engineValidate(Unknown Source)
    at java.base/sun.security.validator.Validator.validate(Unknown Source)
    at java.base/sun.security.ssl.X509TrustManagerImpl.checkTrusted(Unknown Source)
    at java.base/sun.security.ssl.X509TrustManagerImpl.checkServerTrusted(Unknown Source)
    ... 13 more
Caused by: java.security.cert.CertPathValidatorException: validity check failed
    at java.base/sun.security.provider.certpath.PKIXMasterCertPathValidator.validate(Unknown Source)
    at java.base/sun.security.provider.certpath.PKIXCertPathValidator.validate(Unknown Source)
    at java.base/sun.security.provider.certpath.PKIXCertPathValidator.validate(Unknown Source)
    at java.base/sun.security.provider.certpath.PKIXCertPathValidator.engineValidate(Unknown Source)
    at java.base/java.security.cert.CertPathValidator.validate(Unknown Source)
    ... 18 more
Caused by: java.security.cert.CertificateExpiredException: NotAfter: Sun Jul 05 12:00:00 GMT 2020
    at java.base/sun.security.x509.CertificateValidity.valid(Unknown Source)
    at java.base/sun.security.x509.X509CertImpl.checkValidity(Unknown Source)
    at java.base/sun.security.provider.certpath.BasicChecker.verifyValidity(Unknown Source)
    at java.base/sun.security.provider.certpath.BasicChecker.check(Unknown Source)
    ... 23 more
"}

The second new exception looked like this:

2024-05-28 00:22:36.143 {"@timestamp":"2024-05-27T22:22:36.139Z",
"error.type":"java.lang.RuntimeException",
"error.message":"java.lang.RuntimeException: java.lang.InterruptedException",
"error.stack_trace":"java.lang.RuntimeException: java.lang.RuntimeException: java.lang.InterruptedException
    at reactor.core.publisher.MonoCallable$MonoCallableSubscription.request(MonoCallable.java:137)
    at reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.request(FluxHide.java:152)
    at reactor.core.publisher.FluxFlattenIterable$FlattenIterableSubscriber.onSubscribe(FluxFlattenIterable.java:241)
    at io.opentelemetry.javaagent.shaded.instrumentation.reactor.v3_1.TracingSubscriber.lambda$onSubscribe$0(TracingSubscriber.java:59)
    at io.opentelemetry.javaagent.shaded.instrumentation.reactor.v3_1.TracingSubscriber.withActiveSpan(TracingSubscriber.java:97)
    at io.opentelemetry.javaagent.shaded.instrumentation.reactor.v3_1.TracingSubscriber.withActiveSpan(TracingSubscriber.java:91)
    at io.opentelemetry.javaagent.shaded.instrumentation.reactor.v3_1.TracingSubscriber.onSubscribe(TracingSubscriber.java:59)
    at reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.onSubscribe(FluxHide.java:122)
    at reactor.core.publisher.MonoCallable.subscribe(MonoCallable.java:48)
    at reactor.core.publisher.InternalFluxOperator.subscribe(InternalFluxOperator.java:68)
    at reactor.core.publisher.FluxSubscribeOn$SubscribeOnSubscriber.run(FluxSubscribeOn.java:194)
    at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:84)
    at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:37)
    at java.base/java.util.concurrent.FutureTask.run(Unknown Source)
    at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown Source)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
    at java.base/java.lang.Thread.run(Unknown Source)

Caused by: java.lang.InterruptedException
    at java.base/java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(Unknown Source)
    at java.base/java.util.concurrent.LinkedBlockingQueue.take(Unknown Source)
    at org.apache.arrow.flight.FlightStream.next(FlightStream.java:233)
    ... 31 more
"}

Hope this helps! Thank you!

bednar commented 4 months ago

@mps2209 thanks for update!