housepower / ClickHouse-Native-JDBC

ClickHouse Native Protocol JDBC implementation
https://housepower.github.io/ClickHouse-Native-JDBC/
Apache License 2.0
524 stars 145 forks source link

Connecting to TLS enabled ClickHouse on Yandex #203

Closed itzikiusa closed 1 year ago

itzikiusa commented 3 years ago

is there a way to connect to clickhouse on yandex(certificate is needed) this is the error i'm getting(port 9440)


java.sql.SQLException: Connection reset
    at com.github.housepower.jdbc.connect.PhysicalConnection.receiveResponse(PhysicalConnection.java:111) ~[clickhouse-native-jdbc-shaded-2.4.0.jar:na]
    at com.github.housepower.jdbc.connect.PhysicalConnection.receiveHello(PhysicalConnection.java:95) ~[clickhouse-native-jdbc-shaded-2.4.0.jar:na]
    at com.github.housepower.jdbc.ClickHouseConnection.serverInfo(ClickHouseConnection.java:196) ~[clickhouse-native-jdbc-shaded-2.4.0.jar:na]
    at com.github.housepower.jdbc.ClickHouseConnection.createPhysicalInfo(ClickHouseConnection.java:180) ~[clickhouse-native-jdbc-shaded-2.4.0.jar:na]
    at com.github.housepower.jdbc.ClickHouseConnection.createClickHouseConnection(ClickHouseConnection.java:175) ~[clickhouse-native-jdbc-shaded-2.4.0.jar:na]
    at com.github.housepower.jdbc.NonRegisterDriver.connect(NonRegisterDriver.java:40) ~[clickhouse-native-jdbc-shaded-2.4.0.jar:na]
    at java.sql.DriverManager.getConnection(DriverManager.java:664) ~[na:1.8.0_265]
    at java.sql.DriverManager.getConnection(DriverManager.java:247) ~[na:1.8.0_265]
    at com.example.elasticaggregate.ClichouseDao.getNow(ClichouseDao.java:29) ~[classes/:na]
    at com.example.elasticaggregate.TstController.now(TstController.java:393) [classes/:na]
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_265]
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_265]
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_265]
    at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_265]
    at org.springframework.web.reactive.result.method.InvocableHandlerMethod.lambda$invoke$0(InvocableHandlerMethod.java:147) [spring-webflux-5.2.8.RELEASE.jar:5.2.8.RELEASE]
    at reactor.core.publisher.FluxFlatMap.trySubscribeScalarMap(FluxFlatMap.java:151) ~[reactor-core-3.3.9.RELEASE.jar:3.3.9.RELEASE]
    at reactor.core.publisher.MonoFlatMap.subscribeOrReturn(MonoFlatMap.java:53) ~[reactor-core-3.3.9.RELEASE.jar:3.3.9.RELEASE]
    at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:57) ~[reactor-core-3.3.9.RELEASE.jar:3.3.9.RELEASE]
    at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52) ~[reactor-core-3.3.9.RELEASE.jar:3.3.9.RELEASE]
    at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.drain(MonoIgnoreThen.java:153) ~[reactor-core-3.3.9.RELEASE.jar:3.3.9.RELEASE]
    at reactor.core.publisher.MonoIgnoreThen.subscribe(MonoIgnoreThen.java:56) ~[reactor-core-3.3.9.RELEASE.jar:3.3.9.RELEASE]
    at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64) ~[reactor-core-3.3.9.RELEASE.jar:3.3.9.RELEASE]
    at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:150) ~[reactor-core-3.3.9.RELEASE.jar:3.3.9.RELEASE]
    at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onNext(FluxSwitchIfEmpty.java:67) ~[reactor-core-3.3.9.RELEASE.jar:3.3.9.RELEASE]
    at reactor.core.publisher.MonoNext$NextSubscriber.onNext(MonoNext.java:76) ~[reactor-core-3.3.9.RELEASE.jar:3.3.9.RELEASE]
    at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.innerNext(FluxConcatMap.java:274) ~[reactor-core-3.3.9.RELEASE.jar:3.3.9.RELEASE]
    at reactor.core.publisher.FluxConcatMap$ConcatMapInner.onNext(FluxConcatMap.java:851) ~[reactor-core-3.3.9.RELEASE.jar:3.3.9.RELEASE]
    at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:121) ~[reactor-core-3.3.9.RELEASE.jar:3.3.9.RELEASE]
    at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.onNext(MonoPeekTerminal.java:173) ~[reactor-core-3.3.9.RELEASE.jar:3.3.9.RELEASE]
    at reactor.core.publisher.Operators$ScalarSubscription.request(Operators.java:2344) ~[reactor-core-3.3.9.RELEASE.jar:3.3.9.RELEASE]
    at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.request(MonoPeekTerminal.java:132) ~[reactor-core-3.3.9.RELEASE.jar:3.3.9.RELEASE]
    at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.request(FluxMapFuseable.java:162) ~[reactor-core-3.3.9.RELEASE.jar:3.3.9.RELEASE]
    at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.set(Operators.java:2152) ~[reactor-core-3.3.9.RELEASE.jar:3.3.9.RELEASE]
    at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onSubscribe(Operators.java:2026) ~[reactor-core-3.3.9.RELEASE.jar:3.3.9.RELEASE]
    at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onSubscribe(FluxMapFuseable.java:90) ~[reactor-core-3.3.9.RELEASE.jar:3.3.9.RELEASE]
    at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.onSubscribe(MonoPeekTerminal.java:145) ~[reactor-core-3.3.9.RELEASE.jar:3.3.9.RELEASE]
    at reactor.core.publisher.MonoJust.subscribe(MonoJust.java:54) ~[reactor-core-3.3.9.RELEASE.jar:3.3.9.RELEASE]
    at reactor.core.publisher.Mono.subscribe(Mono.java:4213) ~[reactor-core-3.3.9.RELEASE.jar:3.3.9.RELEASE]
    at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.drain(FluxConcatMap.java:441) ~[reactor-core-3.3.9.RELEASE.jar:3.3.9.RELEASE]
    at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.onSubscribe(FluxConcatMap.java:211) ~[reactor-core-3.3.9.RELEASE.jar:3.3.9.RELEASE]
    at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:161) ~[reactor-core-3.3.9.RELEASE.jar:3.3.9.RELEASE]
    at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:86) ~[reactor-core-3.3.9.RELEASE.jar:3.3.9.RELEASE]
    at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64) ~[reactor-core-3.3.9.RELEASE.jar:3.3.9.RELEASE]
    at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52) ~[reactor-core-3.3.9.RELEASE.jar:3.3.9.RELEASE]
    at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64) ~[reactor-core-3.3.9.RELEASE.jar:3.3.9.RELEASE]
    at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52) ~[reactor-core-3.3.9.RELEASE.jar:3.3.9.RELEASE]
    at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64) ~[reactor-core-3.3.9.RELEASE.jar:3.3.9.RELEASE]
    at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52) ~[reactor-core-3.3.9.RELEASE.jar:3.3.9.RELEASE]
    at reactor.core.publisher.Mono.subscribe(Mono.java:4213) ~[reactor-core-3.3.9.RELEASE.jar:3.3.9.RELEASE]
    at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.drain(MonoIgnoreThen.java:172) ~[reactor-core-3.3.9.RELEASE.jar:3.3.9.RELEASE]
    at reactor.core.publisher.MonoIgnoreThen.subscribe(MonoIgnoreThen.java:56) ~[reactor-core-3.3.9.RELEASE.jar:3.3.9.RELEASE]
    at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64) ~[reactor-core-3.3.9.RELEASE.jar:3.3.9.RELEASE]
    at reactor.netty.http.server.HttpServerHandle.onStateChange(HttpServerHandle.java:65) ~[reactor-netty-0.9.11.RELEASE.jar:0.9.11.RELEASE]
    at reactor.netty.ReactorNetty$CompositeConnectionObserver.onStateChange(ReactorNetty.java:518) ~[reactor-netty-0.9.11.RELEASE.jar:0.9.11.RELEASE]
    at reactor.netty.tcp.TcpServerBind$ChildObserver.onStateChange(TcpServerBind.java:267) ~[reactor-netty-0.9.11.RELEASE.jar:0.9.11.RELEASE]
    at reactor.netty.http.server.HttpServerOperations.onInboundNext(HttpServerOperations.java:462) ~[reactor-netty-0.9.11.RELEASE.jar:0.9.11.RELEASE]
    at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:96) ~[reactor-netty-0.9.11.RELEASE.jar:0.9.11.RELEASE]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-transport-4.1.51.Final.jar:4.1.51.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-transport-4.1.51.Final.jar:4.1.51.Final]
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) ~[netty-transport-4.1.51.Final.jar:4.1.51.Final]
    at reactor.netty.http.server.HttpTrafficHandler.channelRead(HttpTrafficHandler.java:170) ~[reactor-netty-0.9.11.RELEASE.jar:0.9.11.RELEASE]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-transport-4.1.51.Final.jar:4.1.51.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-transport-4.1.51.Final.jar:4.1.51.Final]
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) ~[netty-transport-4.1.51.Final.jar:4.1.51.Final]
    at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:436) ~[netty-transport-4.1.51.Final.jar:4.1.51.Final]
    at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324) ~[netty-codec-4.1.51.Final.jar:4.1.51.Final]
    at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:296) ~[netty-codec-4.1.51.Final.jar:4.1.51.Final]
    at io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:251) ~[netty-transport-4.1.51.Final.jar:4.1.51.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-transport-4.1.51.Final.jar:4.1.51.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-transport-4.1.51.Final.jar:4.1.51.Final]
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) ~[netty-transport-4.1.51.Final.jar:4.1.51.Final]
    at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) ~[netty-transport-4.1.51.Final.jar:4.1.51.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-transport-4.1.51.Final.jar:4.1.51.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-transport-4.1.51.Final.jar:4.1.51.Final]
    at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) ~[netty-transport-4.1.51.Final.jar:4.1.51.Final]
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163) ~[netty-transport-4.1.51.Final.jar:4.1.51.Final]
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:714) ~[netty-transport-4.1.51.Final.jar:4.1.51.Final]
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650) ~[netty-transport-4.1.51.Final.jar:4.1.51.Final]
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576) ~[netty-transport-4.1.51.Final.jar:4.1.51.Final]
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493) ~[netty-transport-4.1.51.Final.jar:4.1.51.Final]
    at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) ~[netty-common-4.1.51.Final.jar:4.1.51.Final]
    at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[netty-common-4.1.51.Final.jar:4.1.51.Final]
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.51.Final.jar:4.1.51.Final]
    at java.lang.Thread.run(Thread.java:748) ~[na:1.8.0_265]
Caused by: java.net.SocketException: Connection reset
    at java.net.SocketInputStream.read(SocketInputStream.java:210) ~[na:1.8.0_265]
    at java.net.SocketInputStream.read(SocketInputStream.java:141) ~[na:1.8.0_265]
    at com.github.housepower.jdbc.buffer.SocketBuffedReader.refill(SocketBuffedReader.java:80) ~[clickhouse-native-jdbc-shaded-2.4.0.jar:na]
    at com.github.housepower.jdbc.buffer.SocketBuffedReader.readBinary(SocketBuffedReader.java:48) ~[clickhouse-native-jdbc-shaded-2.4.0.jar:na]
    at com.github.housepower.jdbc.serializer.BinaryDeserializer.readVarInt(BinaryDeserializer.java:38) ~[clickhouse-native-jdbc-shaded-2.4.0.jar:na]
    at com.github.housepower.jdbc.protocol.RequestOrResponse.readFrom(RequestOrResponse.java:46) ~[clickhouse-native-jdbc-shaded-2.4.0.jar:na]
    at com.github.housepower.jdbc.connect.PhysicalConnection.receiveResponse(PhysicalConnection.java:109) ~[clickhouse-native-jdbc-shaded-2.4.0.jar:na]
    ... 83 common frames omitted

`
sundy-li commented 3 years ago

Did you mean TLS connection like clickhouse-client -s? Currently this lib doesn't support that.

itzikiusa commented 3 years ago

@sundy-li yes ok, thank you

itziklavon commented 3 years ago

@sundy-li @pan3793 is there any plan of enabling secure connection? i'm willing to do the work on my own, just need small guidance

sundy-li commented 3 years ago

@sundy-li @pan3793 is there any plan of enabling secure connection? i'm willing to do the work on my own, just need small guidance

It'll be good to have someone work on it. Because we are not familiar with secure connection.

  1. Maybe you can check the go implementation.

    https://github.com/ClickHouse/clickhouse-go/blob/07664d4def10151f2c34e4df7c53d71ef5d3a7a9/connect.go#L54-L86

  2. https://docs.oracle.com/javase/7/docs/api/javax/net/ssl/SSLContext.html

Related codes in:

https://github.com/housepower/ClickHouse-Native-JDBC/blob/1c8731df9e4711e3730b744cda69cf47b0943cff/clickhouse-native-jdbc/src/main/java/com/github/housepower/jdbc/connect/NativeClient.java#L48-L55

pan3793 commented 3 years ago

@itziklavon JDK has the native support on SSL socket, here is a demo code https://github.com/osheroff/mysql-binlog-connector-java/blob/master/src/main/java/com/github/shyiko/mysql/binlog/network/DefaultSSLSocketFactory.java

itziklavon commented 3 years ago

@sundy-li @pan3793 will give it a go though, also not familiar with this currently i overcome the connection(hello response is working correctly and returning the version and server time), however it hangs while reading from: deserializer.readVarInt()

trying to debug it currently with no success

pan3793 commented 3 years ago

@itziklavon Theoretically, if you got a correct Hello response, the following communication is same with normal Socket, the TLS layer should be a transparent protocol layer which not affect upper layer protocol.

itziklavon commented 3 years ago

this is the current stack trace: first line is the hello response i set timeout of 10 seconds for query


2021-01-05 13:35:29.718  INFO 3970 --- [ctor-http-nio-4] c.g.h.jdbc.connect.NativeClient          : recv response: {"majorVersion":20,"minorVersion":11,"reversion":54442,"serverName":"ClickHouse","serverTimeZone":"Europe/Moscow","serverDisplayName":"xxxxx.xxxxxx"}
2021-01-05 13:35:39.896 ERROR 3970 --- [ctor-http-nio-4] a.w.r.e.AbstractErrorWebExceptionHandler : [c8972047-1]  500 Server Error for HTTP GET "/test/select/now3"

org.springframework.jdbc.UncategorizedSQLException: 
### Error querying database.  Cause: java.sql.SQLException: Read timed out
### The error may exist in com/example/elasticaggregate/TransactionsMapper.java (best guess)
### The error may involve defaultParameterMap
### The error occurred while setting parameters
### SQL: SELECT NOW()
### Cause: java.sql.SQLException: Read timed out
; uncategorized SQLException; SQL state [null]; error code [0]; Read timed out; nested exception is java.sql.SQLException: Read timed out
    at org.springframework.jdbc.support.AbstractFallbackSQLExceptionTranslator.translate(AbstractFallbackSQLExceptionTranslator.java:89) ~[spring-jdbc-5.2.8.RELEASE.jar:5.2.8.RELEASE]
    Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: 
Error has been observed at the following site(s):
    |_ checkpoint ⇢ org.springframework.boot.actuate.metrics.web.reactive.server.MetricsWebFilter [DefaultWebFilterChain]
    |_ checkpoint ⇢ HTTP GET "/test/select/now3" [ExceptionHandlingWebHandler]
Stack trace:
        at org.springframework.jdbc.support.AbstractFallbackSQLExceptionTranslator.translate(AbstractFallbackSQLExceptionTranslator.java:89) ~[spring-jdbc-5.2.8.RELEASE.jar:5.2.8.RELEASE]
        at org.springframework.jdbc.support.AbstractFallbackSQLExceptionTranslator.translate(AbstractFallbackSQLExceptionTranslator.java:81) ~[spring-jdbc-5.2.8.RELEASE.jar:5.2.8.RELEASE]
        at org.springframework.jdbc.support.AbstractFallbackSQLExceptionTranslator.translate(AbstractFallbackSQLExceptionTranslator.java:81) ~[spring-jdbc-5.2.8.RELEASE.jar:5.2.8.RELEASE]
        at org.mybatis.spring.MyBatisExceptionTranslator.translateExceptionIfPossible(MyBatisExceptionTranslator.java:91) ~[mybatis-spring-2.0.6.jar:2.0.6]
        at org.mybatis.spring.SqlSessionTemplate$SqlSessionInterceptor.invoke(SqlSessionTemplate.java:441) ~[mybatis-spring-2.0.6.jar:2.0.6]
        at com.sun.proxy.$Proxy80.selectOne(Unknown Source) ~[na:na]
        at org.mybatis.spring.SqlSessionTemplate.selectOne(SqlSessionTemplate.java:160) ~[mybatis-spring-2.0.6.jar:2.0.6]
        at org.apache.ibatis.binding.MapperMethod.execute(MapperMethod.java:87) ~[mybatis-3.5.6.jar:3.5.6]
        at org.apache.ibatis.binding.MapperProxy$PlainMethodInvoker.invoke(MapperProxy.java:152) ~[mybatis-3.5.6.jar:3.5.6]
        at org.apache.ibatis.binding.MapperProxy.invoke(MapperProxy.java:85) ~[mybatis-3.5.6.jar:3.5.6]
        at com.sun.proxy.$Proxy83.aggregateRound(Unknown Source) ~[na:na]
        at com.example.elasticaggregate.TstController.selectnow3(TstController.java:460) ~[classes/:na]
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_272]
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_272]
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_272]
        at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_272]
        at org.springframework.web.reactive.result.method.InvocableHandlerMethod.lambda$invoke$0(InvocableHandlerMethod.java:147) ~[spring-webflux-5.2.8.RELEASE.jar:5.2.8.RELEASE]
        at reactor.core.publisher.FluxFlatMap.trySubscribeScalarMap(FluxFlatMap.java:151) ~[reactor-core-3.3.9.RELEASE.jar:3.3.9.RELEASE]
        at reactor.core.publisher.MonoFlatMap.subscribeOrReturn(MonoFlatMap.java:53) ~[reactor-core-3.3.9.RELEASE.jar:3.3.9.RELEASE]
        at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:57) ~[reactor-core-3.3.9.RELEASE.jar:3.3.9.RELEASE]
        at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52) ~[reactor-core-3.3.9.RELEASE.jar:3.3.9.RELEASE]
        at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.drain(MonoIgnoreThen.java:153) ~[reactor-core-3.3.9.RELEASE.jar:3.3.9.RELEASE]
        at reactor.core.publisher.MonoIgnoreThen.subscribe(MonoIgnoreThen.java:56) ~[reactor-core-3.3.9.RELEASE.jar:3.3.9.RELEASE]
        at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64) ~[reactor-core-3.3.9.RELEASE.jar:3.3.9.RELEASE]
        at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:150) ~[reactor-core-3.3.9.RELEASE.jar:3.3.9.RELEASE]
        at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onNext(FluxSwitchIfEmpty.java:67) ~[reactor-core-3.3.9.RELEASE.jar:3.3.9.RELEASE]
        at reactor.core.publisher.MonoNext$NextSubscriber.onNext(MonoNext.java:76) ~[reactor-core-3.3.9.RELEASE.jar:3.3.9.RELEASE]
        at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.innerNext(FluxConcatMap.java:274) ~[reactor-core-3.3.9.RELEASE.jar:3.3.9.RELEASE]
        at reactor.core.publisher.FluxConcatMap$ConcatMapInner.onNext(FluxConcatMap.java:851) ~[reactor-core-3.3.9.RELEASE.jar:3.3.9.RELEASE]
        at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:121) ~[reactor-core-3.3.9.RELEASE.jar:3.3.9.RELEASE]
        at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.onNext(MonoPeekTerminal.java:173) ~[reactor-core-3.3.9.RELEASE.jar:3.3.9.RELEASE]
        at reactor.core.publisher.Operators$ScalarSubscription.request(Operators.java:2344) ~[reactor-core-3.3.9.RELEASE.jar:3.3.9.RELEASE]
        at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.request(MonoPeekTerminal.java:132) ~[reactor-core-3.3.9.RELEASE.jar:3.3.9.RELEASE]
        at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.request(FluxMapFuseable.java:162) ~[reactor-core-3.3.9.RELEASE.jar:3.3.9.RELEASE]
        at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.set(Operators.java:2152) ~[reactor-core-3.3.9.RELEASE.jar:3.3.9.RELEASE]
        at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onSubscribe(Operators.java:2026) ~[reactor-core-3.3.9.RELEASE.jar:3.3.9.RELEASE]
        at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onSubscribe(FluxMapFuseable.java:90) ~[reactor-core-3.3.9.RELEASE.jar:3.3.9.RELEASE]
        at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.onSubscribe(MonoPeekTerminal.java:145) ~[reactor-core-3.3.9.RELEASE.jar:3.3.9.RELEASE]
        at reactor.core.publisher.MonoJust.subscribe(MonoJust.java:54) ~[reactor-core-3.3.9.RELEASE.jar:3.3.9.RELEASE]
        at reactor.core.publisher.Mono.subscribe(Mono.java:4213) ~[reactor-core-3.3.9.RELEASE.jar:3.3.9.RELEASE]
        at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.drain(FluxConcatMap.java:441) ~[reactor-core-3.3.9.RELEASE.jar:3.3.9.RELEASE]
        at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.onSubscribe(FluxConcatMap.java:211) ~[reactor-core-3.3.9.RELEASE.jar:3.3.9.RELEASE]
        at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:161) ~[reactor-core-3.3.9.RELEASE.jar:3.3.9.RELEASE]
        at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:86) ~[reactor-core-3.3.9.RELEASE.jar:3.3.9.RELEASE]
        at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64) ~[reactor-core-3.3.9.RELEASE.jar:3.3.9.RELEASE]
        at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52) ~[reactor-core-3.3.9.RELEASE.jar:3.3.9.RELEASE]
        at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64) ~[reactor-core-3.3.9.RELEASE.jar:3.3.9.RELEASE]
        at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52) ~[reactor-core-3.3.9.RELEASE.jar:3.3.9.RELEASE]
        at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64) ~[reactor-core-3.3.9.RELEASE.jar:3.3.9.RELEASE]
        at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52) ~[reactor-core-3.3.9.RELEASE.jar:3.3.9.RELEASE]
        at reactor.core.publisher.Mono.subscribe(Mono.java:4213) ~[reactor-core-3.3.9.RELEASE.jar:3.3.9.RELEASE]
        at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.drain(MonoIgnoreThen.java:172) ~[reactor-core-3.3.9.RELEASE.jar:3.3.9.RELEASE]
        at reactor.core.publisher.MonoIgnoreThen.subscribe(MonoIgnoreThen.java:56) ~[reactor-core-3.3.9.RELEASE.jar:3.3.9.RELEASE]
        at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64) ~[reactor-core-3.3.9.RELEASE.jar:3.3.9.RELEASE]
        at reactor.netty.http.server.HttpServerHandle.onStateChange(HttpServerHandle.java:65) ~[reactor-netty-0.9.11.RELEASE.jar:0.9.11.RELEASE]
        at reactor.netty.ReactorNetty$CompositeConnectionObserver.onStateChange(ReactorNetty.java:518) ~[reactor-netty-0.9.11.RELEASE.jar:0.9.11.RELEASE]
        at reactor.netty.tcp.TcpServerBind$ChildObserver.onStateChange(TcpServerBind.java:267) ~[reactor-netty-0.9.11.RELEASE.jar:0.9.11.RELEASE]
        at reactor.netty.http.server.HttpServerOperations.onInboundNext(HttpServerOperations.java:462) ~[reactor-netty-0.9.11.RELEASE.jar:0.9.11.RELEASE]
        at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:96) ~[reactor-netty-0.9.11.RELEASE.jar:0.9.11.RELEASE]
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-transport-4.1.51.Final.jar:4.1.51.Final]
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-transport-4.1.51.Final.jar:4.1.51.Final]
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) ~[netty-transport-4.1.51.Final.jar:4.1.51.Final]
        at reactor.netty.http.server.HttpTrafficHandler.channelRead(HttpTrafficHandler.java:170) ~[reactor-netty-0.9.11.RELEASE.jar:0.9.11.RELEASE]
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-transport-4.1.51.Final.jar:4.1.51.Final]
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-transport-4.1.51.Final.jar:4.1.51.Final]
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) ~[netty-transport-4.1.51.Final.jar:4.1.51.Final]
        at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:436) ~[netty-transport-4.1.51.Final.jar:4.1.51.Final]
        at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324) ~[netty-codec-4.1.51.Final.jar:4.1.51.Final]
        at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:296) ~[netty-codec-4.1.51.Final.jar:4.1.51.Final]
        at io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:251) ~[netty-transport-4.1.51.Final.jar:4.1.51.Final]
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-transport-4.1.51.Final.jar:4.1.51.Final]
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-transport-4.1.51.Final.jar:4.1.51.Final]
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) ~[netty-transport-4.1.51.Final.jar:4.1.51.Final]
        at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) ~[netty-transport-4.1.51.Final.jar:4.1.51.Final]
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-transport-4.1.51.Final.jar:4.1.51.Final]
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-transport-4.1.51.Final.jar:4.1.51.Final]
        at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) ~[netty-transport-4.1.51.Final.jar:4.1.51.Final]
        at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163) ~[netty-transport-4.1.51.Final.jar:4.1.51.Final]
        at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:714) ~[netty-transport-4.1.51.Final.jar:4.1.51.Final]
        at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650) ~[netty-transport-4.1.51.Final.jar:4.1.51.Final]
        at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576) ~[netty-transport-4.1.51.Final.jar:4.1.51.Final]
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493) ~[netty-transport-4.1.51.Final.jar:4.1.51.Final]
        at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) ~[netty-common-4.1.51.Final.jar:4.1.51.Final]
        at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[netty-common-4.1.51.Final.jar:4.1.51.Final]
        at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.51.Final.jar:4.1.51.Final]
        at java.lang.Thread.run(Thread.java:748) ~[na:1.8.0_272]
Caused by: java.sql.SQLException: Read timed out
    at com.github.housepower.jdbc.connect.NativeClient.receiveResponse(NativeClient.java:287) ~[classes/:na]
    at com.github.housepower.jdbc.connect.NativeClient.lambda$receiveQuery$0(NativeClient.java:239) ~[classes/:na]
    at com.github.housepower.jdbc.stream.ClickHouseQueryResult.consumeDataResponse(ClickHouseQueryResult.java:89) ~[classes/:na]
    at com.github.housepower.jdbc.stream.ClickHouseQueryResult.ensureHeaderConsumed(ClickHouseQueryResult.java:82) ~[classes/:na]
    at com.github.housepower.jdbc.stream.ClickHouseQueryResult.header(ClickHouseQueryResult.java:43) ~[classes/:na]
    at com.github.housepower.jdbc.statement.ClickHouseStatement.executeUpdate(ClickHouseStatement.java:93) ~[classes/:na]
    at com.github.housepower.jdbc.statement.ClickHouseStatement.executeQuery(ClickHouseStatement.java:100) ~[classes/:na]
    at com.github.housepower.jdbc.statement.ClickHouseStatement.execute(ClickHouseStatement.java:69) ~[classes/:na]
    at com.github.housepower.jdbc.statement.ClickHousePreparedQueryStatement.execute(ClickHousePreparedQueryStatement.java:64) ~[classes/:na]
    at com.alibaba.druid.pool.DruidPooledPreparedStatement.execute(DruidPooledPreparedStatement.java:497) ~[druid-1.2.4.jar:1.2.4]
    at org.apache.ibatis.executor.statement.PreparedStatementHandler.query(PreparedStatementHandler.java:64) ~[mybatis-3.5.6.jar:3.5.6]
    at org.apache.ibatis.executor.statement.RoutingStatementHandler.query(RoutingStatementHandler.java:79) ~[mybatis-3.5.6.jar:3.5.6]
    at org.apache.ibatis.executor.SimpleExecutor.doQuery(SimpleExecutor.java:63) ~[mybatis-3.5.6.jar:3.5.6]
    at org.apache.ibatis.executor.BaseExecutor.queryFromDatabase(BaseExecutor.java:325) ~[mybatis-3.5.6.jar:3.5.6]
    at org.apache.ibatis.executor.BaseExecutor.query(BaseExecutor.java:156) ~[mybatis-3.5.6.jar:3.5.6]
    at org.apache.ibatis.executor.CachingExecutor.query(CachingExecutor.java:109) ~[mybatis-3.5.6.jar:3.5.6]
    at org.apache.ibatis.executor.CachingExecutor.query(CachingExecutor.java:89) ~[mybatis-3.5.6.jar:3.5.6]
    at org.apache.ibatis.session.defaults.DefaultSqlSession.selectList(DefaultSqlSession.java:147) ~[mybatis-3.5.6.jar:3.5.6]
    at org.apache.ibatis.session.defaults.DefaultSqlSession.selectList(DefaultSqlSession.java:140) ~[mybatis-3.5.6.jar:3.5.6]
    at org.apache.ibatis.session.defaults.DefaultSqlSession.selectOne(DefaultSqlSession.java:76) ~[mybatis-3.5.6.jar:3.5.6]
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_272]
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_272]
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_272]
    at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_272]
    at org.mybatis.spring.SqlSessionTemplate$SqlSessionInterceptor.invoke(SqlSessionTemplate.java:427) ~[mybatis-spring-2.0.6.jar:2.0.6]
    at com.sun.proxy.$Proxy80.selectOne(Unknown Source) ~[na:na]
    at org.mybatis.spring.SqlSessionTemplate.selectOne(SqlSessionTemplate.java:160) ~[mybatis-spring-2.0.6.jar:2.0.6]
    at org.apache.ibatis.binding.MapperMethod.execute(MapperMethod.java:87) ~[mybatis-3.5.6.jar:3.5.6]
    at org.apache.ibatis.binding.MapperProxy$PlainMethodInvoker.invoke(MapperProxy.java:152) ~[mybatis-3.5.6.jar:3.5.6]
    at org.apache.ibatis.binding.MapperProxy.invoke(MapperProxy.java:85) ~[mybatis-3.5.6.jar:3.5.6]
    at com.sun.proxy.$Proxy83.aggregateRound(Unknown Source) ~[na:na]
    at com.example.elasticaggregate.TstController.selectnow3(TstController.java:460) ~[classes/:na]
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_272]
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_272]
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_272]
    at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_272]
    at org.springframework.web.reactive.result.method.InvocableHandlerMethod.lambda$invoke$0(InvocableHandlerMethod.java:147) ~[spring-webflux-5.2.8.RELEASE.jar:5.2.8.RELEASE]
    at reactor.core.publisher.FluxFlatMap.trySubscribeScalarMap(FluxFlatMap.java:151) ~[reactor-core-3.3.9.RELEASE.jar:3.3.9.RELEASE]
    at reactor.core.publisher.MonoFlatMap.subscribeOrReturn(MonoFlatMap.java:53) ~[reactor-core-3.3.9.RELEASE.jar:3.3.9.RELEASE]
    at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:57) ~[reactor-core-3.3.9.RELEASE.jar:3.3.9.RELEASE]
    at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52) ~[reactor-core-3.3.9.RELEASE.jar:3.3.9.RELEASE]
    at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.drain(MonoIgnoreThen.java:153) ~[reactor-core-3.3.9.RELEASE.jar:3.3.9.RELEASE]
    at reactor.core.publisher.MonoIgnoreThen.subscribe(MonoIgnoreThen.java:56) ~[reactor-core-3.3.9.RELEASE.jar:3.3.9.RELEASE]
    at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64) ~[reactor-core-3.3.9.RELEASE.jar:3.3.9.RELEASE]
    at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:150) ~[reactor-core-3.3.9.RELEASE.jar:3.3.9.RELEASE]
    at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onNext(FluxSwitchIfEmpty.java:67) ~[reactor-core-3.3.9.RELEASE.jar:3.3.9.RELEASE]
    at reactor.core.publisher.MonoNext$NextSubscriber.onNext(MonoNext.java:76) ~[reactor-core-3.3.9.RELEASE.jar:3.3.9.RELEASE]
    at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.innerNext(FluxConcatMap.java:274) ~[reactor-core-3.3.9.RELEASE.jar:3.3.9.RELEASE]
    at reactor.core.publisher.FluxConcatMap$ConcatMapInner.onNext(FluxConcatMap.java:851) ~[reactor-core-3.3.9.RELEASE.jar:3.3.9.RELEASE]
    at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:121) ~[reactor-core-3.3.9.RELEASE.jar:3.3.9.RELEASE]
    at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.onNext(MonoPeekTerminal.java:173) ~[reactor-core-3.3.9.RELEASE.jar:3.3.9.RELEASE]
    at reactor.core.publisher.Operators$ScalarSubscription.request(Operators.java:2344) ~[reactor-core-3.3.9.RELEASE.jar:3.3.9.RELEASE]
    at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.request(MonoPeekTerminal.java:132) ~[reactor-core-3.3.9.RELEASE.jar:3.3.9.RELEASE]
    at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.request(FluxMapFuseable.java:162) ~[reactor-core-3.3.9.RELEASE.jar:3.3.9.RELEASE]
    at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.set(Operators.java:2152) ~[reactor-core-3.3.9.RELEASE.jar:3.3.9.RELEASE]
    at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onSubscribe(Operators.java:2026) ~[reactor-core-3.3.9.RELEASE.jar:3.3.9.RELEASE]
    at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onSubscribe(FluxMapFuseable.java:90) ~[reactor-core-3.3.9.RELEASE.jar:3.3.9.RELEASE]
    at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.onSubscribe(MonoPeekTerminal.java:145) ~[reactor-core-3.3.9.RELEASE.jar:3.3.9.RELEASE]
    at reactor.core.publisher.MonoJust.subscribe(MonoJust.java:54) ~[reactor-core-3.3.9.RELEASE.jar:3.3.9.RELEASE]
    at reactor.core.publisher.Mono.subscribe(Mono.java:4213) ~[reactor-core-3.3.9.RELEASE.jar:3.3.9.RELEASE]
    at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.drain(FluxConcatMap.java:441) ~[reactor-core-3.3.9.RELEASE.jar:3.3.9.RELEASE]
    at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.onSubscribe(FluxConcatMap.java:211) ~[reactor-core-3.3.9.RELEASE.jar:3.3.9.RELEASE]
    at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:161) ~[reactor-core-3.3.9.RELEASE.jar:3.3.9.RELEASE]
    at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:86) ~[reactor-core-3.3.9.RELEASE.jar:3.3.9.RELEASE]
    at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64) ~[reactor-core-3.3.9.RELEASE.jar:3.3.9.RELEASE]
    at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52) ~[reactor-core-3.3.9.RELEASE.jar:3.3.9.RELEASE]
    at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64) ~[reactor-core-3.3.9.RELEASE.jar:3.3.9.RELEASE]
    at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52) ~[reactor-core-3.3.9.RELEASE.jar:3.3.9.RELEASE]
    at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64) ~[reactor-core-3.3.9.RELEASE.jar:3.3.9.RELEASE]
    at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52) ~[reactor-core-3.3.9.RELEASE.jar:3.3.9.RELEASE]
    at reactor.core.publisher.Mono.subscribe(Mono.java:4213) ~[reactor-core-3.3.9.RELEASE.jar:3.3.9.RELEASE]
    at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.drain(MonoIgnoreThen.java:172) ~[reactor-core-3.3.9.RELEASE.jar:3.3.9.RELEASE]
    at reactor.core.publisher.MonoIgnoreThen.subscribe(MonoIgnoreThen.java:56) ~[reactor-core-3.3.9.RELEASE.jar:3.3.9.RELEASE]
    at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64) ~[reactor-core-3.3.9.RELEASE.jar:3.3.9.RELEASE]
    at reactor.netty.http.server.HttpServerHandle.onStateChange(HttpServerHandle.java:65) ~[reactor-netty-0.9.11.RELEASE.jar:0.9.11.RELEASE]
    at reactor.netty.ReactorNetty$CompositeConnectionObserver.onStateChange(ReactorNetty.java:518) ~[reactor-netty-0.9.11.RELEASE.jar:0.9.11.RELEASE]
    at reactor.netty.tcp.TcpServerBind$ChildObserver.onStateChange(TcpServerBind.java:267) ~[reactor-netty-0.9.11.RELEASE.jar:0.9.11.RELEASE]
    at reactor.netty.http.server.HttpServerOperations.onInboundNext(HttpServerOperations.java:462) ~[reactor-netty-0.9.11.RELEASE.jar:0.9.11.RELEASE]
    at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:96) ~[reactor-netty-0.9.11.RELEASE.jar:0.9.11.RELEASE]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-transport-4.1.51.Final.jar:4.1.51.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-transport-4.1.51.Final.jar:4.1.51.Final]
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) ~[netty-transport-4.1.51.Final.jar:4.1.51.Final]
    at reactor.netty.http.server.HttpTrafficHandler.channelRead(HttpTrafficHandler.java:170) ~[reactor-netty-0.9.11.RELEASE.jar:0.9.11.RELEASE]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-transport-4.1.51.Final.jar:4.1.51.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-transport-4.1.51.Final.jar:4.1.51.Final]
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) ~[netty-transport-4.1.51.Final.jar:4.1.51.Final]
    at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:436) ~[netty-transport-4.1.51.Final.jar:4.1.51.Final]
    at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324) ~[netty-codec-4.1.51.Final.jar:4.1.51.Final]
    at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:296) ~[netty-codec-4.1.51.Final.jar:4.1.51.Final]
    at io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:251) ~[netty-transport-4.1.51.Final.jar:4.1.51.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-transport-4.1.51.Final.jar:4.1.51.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-transport-4.1.51.Final.jar:4.1.51.Final]
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) ~[netty-transport-4.1.51.Final.jar:4.1.51.Final]
    at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) ~[netty-transport-4.1.51.Final.jar:4.1.51.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-transport-4.1.51.Final.jar:4.1.51.Final]
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-transport-4.1.51.Final.jar:4.1.51.Final]
    at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) ~[netty-transport-4.1.51.Final.jar:4.1.51.Final]
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163) ~[netty-transport-4.1.51.Final.jar:4.1.51.Final]
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:714) ~[netty-transport-4.1.51.Final.jar:4.1.51.Final]
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650) ~[netty-transport-4.1.51.Final.jar:4.1.51.Final]
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576) ~[netty-transport-4.1.51.Final.jar:4.1.51.Final]
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493) ~[netty-transport-4.1.51.Final.jar:4.1.51.Final]
    at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) ~[netty-common-4.1.51.Final.jar:4.1.51.Final]
    at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[netty-common-4.1.51.Final.jar:4.1.51.Final]
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.51.Final.jar:4.1.51.Final]
    at java.lang.Thread.run(Thread.java:748) ~[na:1.8.0_272]
Caused by: java.net.SocketTimeoutException: Read timed out
    at java.net.SocketInputStream.socketRead0(Native Method) ~[na:1.8.0_272]
    at java.net.SocketInputStream.socketRead(SocketInputStream.java:116) ~[na:1.8.0_272]
    at java.net.SocketInputStream.read(SocketInputStream.java:171) ~[na:1.8.0_272]
    at java.net.SocketInputStream.read(SocketInputStream.java:141) ~[na:1.8.0_272]
    at sun.security.ssl.SSLSocketInputRecord.read(SSLSocketInputRecord.java:457) ~[na:1.8.0_272]
    at sun.security.ssl.SSLSocketInputRecord.bytesInCompletePacket(SSLSocketInputRecord.java:68) ~[na:1.8.0_272]
    at sun.security.ssl.SSLSocketImpl.readApplicationRecord(SSLSocketImpl.java:1095) ~[na:1.8.0_272]
    at sun.security.ssl.SSLSocketImpl.access$200(SSLSocketImpl.java:72) ~[na:1.8.0_272]
    at sun.security.ssl.SSLSocketImpl$AppInputStream.read(SSLSocketImpl.java:815) ~[na:1.8.0_272]
    at com.github.housepower.jdbc.buffer.SocketBuffedReader.refill(SocketBuffedReader.java:80) ~[classes/:na]
    at com.github.housepower.jdbc.buffer.SocketBuffedReader.readBinary(SocketBuffedReader.java:48) ~[classes/:na]
    at com.github.housepower.jdbc.serde.BinaryDeserializer.readVarInt(BinaryDeserializer.java:35) ~[classes/:na]
    at com.github.housepower.jdbc.protocol.Response.readFrom(Response.java:29) ~[classes/:na]
    at com.github.housepower.jdbc.connect.NativeClient.receiveResponse(NativeClient.java:282) ~[classes/:na]
    ... 105 common frames omitted
itziklavon commented 3 years ago

@sundy-li @pan3793 i managed to make it work(although i did skip verify and not added the cert itself, but it should not be an issue even if i add it) however, seems like it is slower than http port(8443)

any ideas where to check why it is slower?

pan3793 commented 3 years ago

@itziklavon Happy to know you made it, and about the performance, see https://github.com/housepower/ClickHouse-Native-JDBC/issues/245#issuecomment-742548491

itziklavon commented 3 years ago

@pan3793 thanks for the response if this is the case, then i'll stick with http port thank you very much for you help!

pan3793 commented 3 years ago

@itziklavon Also thanks for your exploration, any discussion here about clickhouse is welcome. Have a nice day!

harshalstory commented 3 years ago

@sundy-li @pan3793 i managed to make it work(although i did skip verify and not added the cert itself, but it should not be an issue even if i add it) however, seems like it is slower than http port(8443)

any ideas where to check why it is slower?

I guess I should have landed on this sooner. I have resolved the same problem, and am stuck with the deserializer.readVarInt(), as it hangs after that. @itziklavon I can see you have resolved the same issue, can you please help?

itziklavon commented 3 years ago

@harshalstory this is the only class i changed(it supports only secure connection now, but with minor changes it can support both secure and not secure):


/*
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package com.github.housepower.jdbc.connect;

import com.github.housepower.jdbc.buffer.SocketBuffedReader;
import com.github.housepower.jdbc.buffer.SocketBuffedWriter;
import com.github.housepower.jdbc.data.Block;
import com.github.housepower.jdbc.misc.Validate;
import com.github.housepower.jdbc.protocol.*;
import com.github.housepower.jdbc.serde.BinaryDeserializer;
import com.github.housepower.jdbc.serde.BinarySerializer;
import com.github.housepower.jdbc.settings.ClickHouseConfig;
import com.github.housepower.jdbc.settings.ClickHouseDefines;
import com.github.housepower.jdbc.settings.SettingKey;
import com.github.housepower.jdbc.log.Logger;
import com.github.housepower.jdbc.log.LoggerFactory;
import com.github.housepower.jdbc.stream.QueryResult;
import com.github.housepower.jdbc.stream.ClickHouseQueryResult;

import javax.net.ssl.*;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketAddress;
import java.security.SecureRandom;
import java.security.cert.X509Certificate;
import java.sql.SQLException;
import java.time.Duration;
import java.util.Map;
import java.util.UUID;

// TODO throw ClickHouseException instead of SQLException
public class NativeClient {

    private static final Logger LOG = LoggerFactory.getLogger(NativeClient.class);

    public static NativeClient connect(ClickHouseConfig configure) throws SQLException {
        try {
            SocketAddress endpoint = new InetSocketAddress(configure.host(), configure.port());
            SSLContext context = SSLContext.getInstance("SSL");
            TrustManager[] trustAllCerts = new TrustManager[]{new X509ExtendedTrustManager() {
                @Override
                public void checkClientTrusted(X509Certificate[] chain, String authType, Socket socket) {

                }

                @Override
                public void checkServerTrusted(X509Certificate[] chain, String authType, Socket socket) {

                }

                @Override
                public void checkClientTrusted(X509Certificate[] chain, String authType, SSLEngine engine) {

                }

                @Override
                public void checkServerTrusted(X509Certificate[] chain, String authType, SSLEngine engine) {

                }

                @Override
                public java.security.cert.X509Certificate[] getAcceptedIssuers() {
                    return null;
                }

                @Override
                public void checkClientTrusted(X509Certificate[] certs, String authType) {
                }

                @Override
                public void checkServerTrusted(X509Certificate[] certs, String authType) {
                }

            }};
//                SSLEngine sslEngine = context.createSSLEngine();
//                sslEngine.setUseClientMode(true);
            context.init(null, trustAllCerts, new SecureRandom());

//                System.setProperty("javax.net.ssl.file", (String) configure.settings().get(SettingKey.sslrootcert));
            SSLSocketFactory factory = context.getSocketFactory();
//                        (SSLSocketFactory) SSLSocketFactory.getDefault();
            SSLSocket socket =
                    (SSLSocket) factory.createSocket();
            socket.setTcpNoDelay(true);
            socket.setSendBufferSize(ClickHouseDefines.SOCKET_SEND_BUFFER_BYTES);
            socket.setReceiveBufferSize(ClickHouseDefines.SOCKET_RECV_BUFFER_BYTES);
            socket.setKeepAlive(configure.tcpKeepAlive());
            socket.connect(endpoint);
            socket.startHandshake();

            return new NativeClient(socket,
                    new BinarySerializer(new SocketBuffedWriter(socket), true),
                    new BinaryDeserializer(new SocketBuffedReader(socket)));
        } catch (Exception ex) {
            throw new SQLException(ex.getMessage(), ex);
        }
    }

    private final Socket socket;
    private final SocketAddress address;
    private final BinarySerializer serializer;
    private final BinaryDeserializer deserializer;

    public NativeClient(Socket socket, BinarySerializer serializer, BinaryDeserializer deserializer) {
        this.socket = socket;
        this.address = socket.getLocalSocketAddress();
        this.serializer = serializer;
        this.deserializer = deserializer;
    }

    public SocketAddress address() {
        return address;
    }

    public boolean ping(Duration soTimeout, NativeContext.ServerContext info) {
        try {
            sendRequest(new PingRequest());
            while (true) {
                Response response = receiveResponse(soTimeout, info);

                if (response instanceof PongResponse)
                    return true;

                // TODO there are some previous response we haven't consumed
                LOG.debug("expect pong, skip response: {}", response.type());
            }
        } catch (SQLException e) {
            LOG.warn(e.getMessage());
            return false;
        }
    }

    public Block receiveSampleBlock(Duration soTimeout, NativeContext.ServerContext info) throws SQLException {
        while (true) {
            Response response = receiveResponse(soTimeout, info);
            if (response instanceof DataResponse) {
                return ((DataResponse) response).block();
            }
            // TODO there are some previous response we haven't consumed
            LOG.debug("expect sample block, skip response: {}", response.type());
        }
    }

    public void sendHello(String client, long reversion, String db, String user, String password) throws SQLException {
        sendRequest(new HelloRequest(client, reversion, db, user, password));
    }

    public void sendQuery(String query, QueryRequest.ClientContext info, Map<SettingKey, Object> settings) throws SQLException {
        sendQuery(UUID.randomUUID().toString(), QueryRequest.STAGE_COMPLETE, info, query, settings);
    }

    public void sendData(Block data) throws SQLException {
        sendRequest(new DataRequest("", data));
    }

    public HelloResponse receiveHello(Duration soTimeout, NativeContext.ServerContext info) throws SQLException {
        Response response = receiveResponse(soTimeout, info);
        Validate.isTrue(response instanceof HelloResponse, "Expect Hello Response.");
        return (HelloResponse) response;
    }

    public EOFStreamResponse receiveEndOfStream(Duration soTimeout, NativeContext.ServerContext info) throws SQLException {
        Response response = receiveResponse(soTimeout, info);
        Validate.isTrue(response instanceof EOFStreamResponse, "Expect EOFStream Response.");
        return (EOFStreamResponse) response;
    }

    public QueryResult receiveQuery(Duration soTimeout, NativeContext.ServerContext info) {
        return new ClickHouseQueryResult(() -> receiveResponse(soTimeout, info));
    }

    public void silentDisconnect() {
        try {
            disconnect();
        } catch (Throwable th) {
            LOG.debug("disconnect throw exception.", th);
        }
    }

    public void disconnect() throws SQLException {
        try {
            if (socket.isClosed()) {
                LOG.info("socket already closed, ignore");
                return;
            }
            LOG.debug("flush and close socket");
            serializer.flushToTarget(true);
            socket.close();
        } catch (IOException ex) {
            throw new SQLException(ex.getMessage(), ex);
        }
    }

    private void sendQuery(String id, int stage, QueryRequest.ClientContext info, String query,
                           Map<SettingKey, Object> settings) throws SQLException {
        sendRequest(new QueryRequest(id, info, stage, true, query, settings));
    }

    private void sendRequest(Request request) throws SQLException {
        try {
            LOG.trace("send request: {}", request.type());
            request.writeTo(serializer);
            serializer.flushToTarget(true);
        } catch (IOException ex) {
            throw new SQLException(ex.getMessage(), ex);
        }
    }

    private Response receiveResponse(Duration soTimeout, NativeContext.ServerContext info) throws SQLException {
        try {
            socket.setSoTimeout(((int) soTimeout.toMillis()));
            Response response = Response.readFrom(deserializer, info);
            LOG.trace("recv response: {}", response.type());
            return response;
        } catch (IOException ex) {
            throw new SQLException(ex.getMessage(), ex);
        }
    }
}
`
harshalstory commented 3 years ago

@itziklavon Appreciate your response! It looks similar to what I have in place. Let me figure out where is the difference, causing the deserializer.readVarInt() to hang.

@sundy-li @pan3793 Once I have resolved this, it would be good if we could integrate this with the library, for anyone else to use. Thoughts?

f1yegor commented 3 years ago

Hi @itziklavon @sundy-li @pan3793

I created pull request to introduce TLS based on the sample above, but I think I'm getting the same problem with stalled socket. Could you elaborate?

      at com.github.housepower.buffer.SocketBuffedReader.refill(SocketBuffedReader.java:80)
      at com.github.housepower.buffer.SocketBuffedReader.readBinary(SocketBuffedReader.java:48)
      at com.github.housepower.serde.BinaryDeserializer.readVarInt(BinaryDeserializer.java:41)
      at com.github.housepower.protocol.Response.readFrom(Response.java:29)
      at com.github.housepower.client.NativeClient.receiveResponse(NativeClient.java:191)
      at com.github.housepower.client.NativeClient.lambda$receiveQuery$0(NativeClient.java:148)