reactor / reactor-netty

TCP/HTTP/UDP/QUIC client/server with Reactor over Netty
https://projectreactor.io
Apache License 2.0
2.61k stars 649 forks source link

Reactor Netty websocket channel closed prematurely #1772

Closed greg007r closed 3 years ago

greg007r commented 3 years ago

I have a long running websocket client implemented in java Spring reactor with Netty (spring-boot-starter-parent 2.5.3) targeting Binance ws api. According to specs, the weboscket channel is kept open 24 hours.

The websocket is unexpectedly and prematurely closed after around 3 minutes :

16:50:48.418 [main] DEBUG reactor.util.Loggers - Using Slf4j logging framework
16:50:48.434 [main] DEBUG io.netty.util.internal.logging.InternalLoggerFactory - Using SLF4J as the default logging framework
16:50:48.436 [main] DEBUG io.netty.util.internal.PlatformDependent0 - -Dio.netty.noUnsafe: false
16:50:48.437 [main] DEBUG io.netty.util.internal.PlatformDependent0 - Java version: 14
16:50:48.438 [main] DEBUG io.netty.util.internal.PlatformDependent0 - sun.misc.Unsafe.theUnsafe: available
16:50:48.438 [main] DEBUG io.netty.util.internal.PlatformDependent0 - sun.misc.Unsafe.copyMemory: available
16:50:48.438 [main] DEBUG io.netty.util.internal.PlatformDependent0 - java.nio.Buffer.address: available
16:50:48.439 [main] DEBUG io.netty.util.internal.PlatformDependent0 - direct buffer constructor: unavailable: Reflective setAccessible(true) disabled
16:50:48.439 [main] DEBUG io.netty.util.internal.PlatformDependent0 - java.nio.Bits.unaligned: available, true
16:50:48.440 [main] DEBUG io.netty.util.internal.PlatformDependent0 - jdk.internal.misc.Unsafe.allocateUninitializedArray(int): unavailable: class io.netty.util.internal.PlatformDependent0$6 cannot access class jdk.internal.misc.Unsafe (in module java.base) because module java.base does not export jdk.internal.misc to unnamed module @1efbd816
16:50:48.440 [main] DEBUG io.netty.util.internal.PlatformDependent0 - java.nio.DirectByteBuffer.<init>(long, int): unavailable
16:50:48.440 [main] DEBUG io.netty.util.internal.PlatformDependent - sun.misc.Unsafe: available
16:50:48.448 [main] DEBUG io.netty.util.internal.PlatformDependent - maxDirectMemory: 8388608000 bytes (maybe)
16:50:48.448 [main] DEBUG io.netty.util.internal.PlatformDependent - -Dio.netty.tmpdir: /tmp (java.io.tmpdir)
16:50:48.448 [main] DEBUG io.netty.util.internal.PlatformDependent - -Dio.netty.bitMode: 64 (sun.arch.data.model)
16:50:48.449 [main] DEBUG io.netty.util.internal.PlatformDependent - -Dio.netty.maxDirectMemory: -1 bytes
16:50:48.450 [main] DEBUG io.netty.util.internal.PlatformDependent - -Dio.netty.uninitializedArrayAllocationThreshold: -1
16:50:48.450 [main] DEBUG io.netty.util.internal.CleanerJava9 - java.nio.ByteBuffer.cleaner(): available
16:50:48.450 [main] DEBUG io.netty.util.internal.PlatformDependent - -Dio.netty.noPreferDirect: false
16:50:48.460 [main] DEBUG reactor.netty.tcp.TcpResources - [http] resources will use the default LoopResources: DefaultLoopResources {prefix=reactor-http, daemon=true, selectCount=8, workerCount=8}
16:50:48.460 [main] DEBUG reactor.netty.tcp.TcpResources - [http] resources will use the default ConnectionProvider: reactor.netty.resources.DefaultPooledConnectionProvider@192b07fd
16:50:48.485 [main] DEBUG io.netty.util.ResourceLeakDetector - -Dio.netty.leakDetection.level: simple
16:50:48.486 [main] DEBUG io.netty.util.ResourceLeakDetector - -Dio.netty.leakDetection.targetRecords: 4
16:50:48.581 [main] DEBUG io.netty.util.NetUtil - -Djava.net.preferIPv4Stack: false
16:50:48.581 [main] DEBUG io.netty.util.NetUtil - -Djava.net.preferIPv6Addresses: false
16:50:48.582 [main] DEBUG io.netty.util.NetUtilInitializations - Loopback interface: lo (lo, 0:0:0:0:0:0:0:1%lo)
16:50:48.583 [main] DEBUG io.netty.util.NetUtil - /proc/sys/net/core/somaxconn: 128
16:50:48.590 [main] DEBUG org.springframework.web.reactive.socket.client.ReactorNettyWebSocketClient - Connecting to wss://stream.binance.com:9443/ws
16:50:48.601 [main] DEBUG io.netty.handler.ssl.OpenSsl - netty-tcnative not in the classpath; OpenSslEngine will be unavailable.
16:50:48.712 [main] DEBUG io.netty.handler.ssl.JdkSslContext - Default protocols (JDK): [TLSv1.3, TLSv1.2, TLSv1.1, TLSv1] 
16:50:48.712 [main] DEBUG io.netty.handler.ssl.JdkSslContext - Default cipher suites (JDK): [TLS_ECDHE_ECDSA_WITH_AES_256_GCM_SHA384, TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256, TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256, TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384, TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA, TLS_ECDHE_RSA_WITH_AES_256_CBC_SHA, TLS_RSA_WITH_AES_128_GCM_SHA256, TLS_RSA_WITH_AES_128_CBC_SHA, TLS_RSA_WITH_AES_256_CBC_SHA, TLS_AES_128_GCM_SHA256, TLS_AES_256_GCM_SHA384]
16:50:48.720 [main] DEBUG reactor.netty.resources.DefaultLoopIOUring - Default io_uring support : false
16:50:48.724 [main] DEBUG io.netty.util.internal.NativeLibraryLoader - -Dio.netty.native.workdir: /tmp (io.netty.tmpdir)
16:50:48.725 [main] DEBUG io.netty.util.internal.NativeLibraryLoader - -Dio.netty.native.deleteLibAfterLoading: true
16:50:48.725 [main] DEBUG io.netty.util.internal.NativeLibraryLoader - -Dio.netty.native.tryPatchShadedId: true
16:50:48.730 [main] DEBUG io.netty.util.internal.NativeLibraryLoader - Successfully loaded the library /tmp/libnetty_transport_native_epoll_x86_6410359104745093945181.so
16:50:48.731 [main] DEBUG reactor.netty.resources.DefaultLoopEpoll - Default Epoll support : true
16:50:48.734 [main] DEBUG io.netty.channel.MultithreadEventLoopGroup - -Dio.netty.eventLoopThreads: 16
16:50:48.742 [main] DEBUG io.netty.util.internal.InternalThreadLocalMap - -Dio.netty.threadLocalMap.stringBuilder.initialSize: 1024
16:50:48.743 [main] DEBUG io.netty.util.internal.InternalThreadLocalMap - -Dio.netty.threadLocalMap.stringBuilder.maxSize: 4096
16:50:48.749 [main] DEBUG io.netty.util.internal.PlatformDependent - org.jctools-core.MpscChunkedArrayQueue: available
16:50:48.768 [main] DEBUG reactor.netty.resources.PooledConnectionProvider - Creating a new [http] client pool [PoolFactory{evictionInterval=PT0S, leasingStrategy=fifo, maxConnections=500, maxIdleTime=-1, maxLifeTime=-1, metricsEnabled=false, pendingAcquireMaxCount=1000, pendingAcquireTimeout=45000}] for [stream.binance.com/<unresolved>:9443]
16:50:48.798 [main] DEBUG io.netty.channel.DefaultChannelId - -Dio.netty.processId: 27223 (auto-detected)
16:50:48.799 [main] DEBUG io.netty.channel.DefaultChannelId - -Dio.netty.machineId: 28:16:ad:ff:fe:2b:7c:b7 (auto-detected)
16:50:48.809 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.numHeapArenas: 16
16:50:48.809 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.numDirectArenas: 16
16:50:48.809 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.pageSize: 8192
16:50:48.809 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.maxOrder: 11
16:50:48.809 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.chunkSize: 16777216
16:50:48.809 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.smallCacheSize: 256
16:50:48.809 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.normalCacheSize: 64
16:50:48.809 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.maxCachedBufferCapacity: 32768
16:50:48.809 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.cacheTrimInterval: 8192
16:50:48.809 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.cacheTrimIntervalMillis: 0
16:50:48.809 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.useCacheForAllThreads: true
16:50:48.809 [main] DEBUG io.netty.buffer.PooledByteBufAllocator - -Dio.netty.allocator.maxCachedByteBuffersPerChunk: 1023
16:50:48.813 [main] DEBUG io.netty.buffer.ByteBufUtil - -Dio.netty.allocator.type: pooled
16:50:48.813 [main] DEBUG io.netty.buffer.ByteBufUtil - -Dio.netty.threadLocalDirectBufferSize: 0
16:50:48.814 [main] DEBUG io.netty.buffer.ByteBufUtil - -Dio.netty.maxThreadLocalCharBufferSize: 16384
16:50:48.828 [reactor-http-epoll-2] DEBUG reactor.netty.resources.PooledConnectionProvider - [id:d962b126] Created a new pooled channel, now: 0 active connections, 0 inactive connections and 0 pending acquire requests.
16:50:48.845 [reactor-http-epoll-2] DEBUG reactor.netty.tcp.SslProvider - [id:d962b126] SSL enabled using engine sun.security.ssl.SSLEngineImpl@55608030 and SNI stream.binance.com/<unresolved>:9443
16:50:48.852 [reactor-http-epoll-2] DEBUG io.netty.buffer.AbstractByteBuf - -Dio.netty.buffer.checkAccessible: true
16:50:48.853 [reactor-http-epoll-2] DEBUG io.netty.buffer.AbstractByteBuf - -Dio.netty.buffer.checkBounds: true
16:50:48.853 [reactor-http-epoll-2] DEBUG io.netty.util.ResourceLeakDetectorFactory - Loaded default ResourceLeakDetector: io.netty.util.ResourceLeakDetector@3ba51dc6
16:50:48.854 [reactor-http-epoll-2] DEBUG reactor.netty.transport.TransportConfig - [id:d962b126] Initialized pipeline DefaultChannelPipeline{(reactor.left.sslHandler = io.netty.handler.ssl.SslHandler), (reactor.left.sslReader = reactor.netty.tcp.SslProvider$SslReadHandler), (reactor.left.httpCodec = io.netty.handler.codec.http.HttpClientCodec), (reactor.right.reactiveBridge = reactor.netty.channel.ChannelOperationsHandler)}
16:50:48.866 [reactor-http-epoll-1] DEBUG io.netty.util.ResourceLeakDetectorFactory - Loaded default ResourceLeakDetector: io.netty.util.ResourceLeakDetector@1fb356c5
16:50:48.867 [reactor-http-epoll-1] DEBUG io.netty.resolver.dns.DnsQueryContext - [id: 0xdd7103d7] WRITE: UDP, [11524: /127.0.0.53:53], DefaultDnsQuestion(stream.binance.com. IN A)
16:50:48.869 [reactor-http-epoll-1] DEBUG io.netty.util.Recycler - -Dio.netty.recycler.maxCapacityPerThread: 4096
16:50:48.869 [reactor-http-epoll-1] DEBUG io.netty.util.Recycler - -Dio.netty.recycler.maxSharedCapacityFactor: 2
16:50:48.869 [reactor-http-epoll-1] DEBUG io.netty.util.Recycler - -Dio.netty.recycler.linkCapacity: 16
16:50:48.869 [reactor-http-epoll-1] DEBUG io.netty.util.Recycler - -Dio.netty.recycler.ratio: 8
16:50:48.869 [reactor-http-epoll-1] DEBUG io.netty.util.Recycler - -Dio.netty.recycler.delayedQueue.ratio: 8
16:50:48.878 [reactor-http-epoll-1] DEBUG io.netty.resolver.dns.DnsQueryContext - [id: 0xdd7103d7] WRITE: UDP, [33872: /127.0.0.53:53], DefaultDnsQuestion(stream.binance.com. IN AAAA)
16:50:48.904 [reactor-http-epoll-1] DEBUG io.netty.resolver.dns.DnsNameResolver - [id: 0xdd7103d7] RECEIVED: UDP [11524: /127.0.0.53:53], DatagramDnsResponse(from: /127.0.0.53:53, 11524, QUERY(0), NoError(0), RD RA)
    DefaultDnsQuestion(stream.binance.com. IN A)
    DefaultDnsRawRecord(stream.binance.com. 12 IN A 4B)
    DefaultDnsRawRecord(stream.binance.com. 12 IN A 4B)
    DefaultDnsRawRecord(stream.binance.com. 12 IN A 4B)
    DefaultDnsRawRecord(stream.binance.com. 12 IN A 4B)
    DefaultDnsRawRecord(stream.binance.com. 12 IN A 4B)
    DefaultDnsRawRecord(stream.binance.com. 12 IN A 4B)
    DefaultDnsRawRecord(stream.binance.com. 12 IN A 4B)
    DefaultDnsRawRecord(stream.binance.com. 12 IN A 4B)
    DefaultDnsRawRecord(OPT flags:0 udp:65494 0B)
16:50:48.907 [reactor-http-epoll-2] DEBUG reactor.netty.transport.TransportConnector - [id:d962b126] Connecting to [stream.binance.com/52.199.12.133:9443].
16:50:48.907 [reactor-http-epoll-1] DEBUG io.netty.resolver.dns.DnsNameResolver - [id: 0xdd7103d7] RECEIVED: UDP [33872: /127.0.0.53:53], DatagramDnsResponse(from: /127.0.0.53:53, 33872, QUERY(0), NoError(0), RD RA)
    DefaultDnsQuestion(stream.binance.com. IN AAAA)
    DefaultDnsRawRecord(OPT flags:0 udp:65494 0B)
16:50:49.162 [reactor-http-epoll-2] DEBUG reactor.netty.resources.DefaultPooledConnectionProvider - [id:d962b126, L:/192.168.1.5:44690 - R:stream.binance.com/52.199.12.133:9443] Registering pool release on close event for channel
16:50:49.163 [reactor-http-epoll-2] DEBUG reactor.netty.resources.PooledConnectionProvider - [id:d962b126, L:/192.168.1.5:44690 - R:stream.binance.com/52.199.12.133:9443] Channel connected, now: 1 active connections, 0 inactive connections and 0 pending acquire requests.
16:50:49.807 [reactor-http-epoll-2] DEBUG io.netty.handler.ssl.SslHandler - [id: 0xd962b126, L:/192.168.1.5:44690 - R:stream.binance.com/52.199.12.133:9443] HANDSHAKEN: protocol:TLSv1.2 cipher suite:TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256
16:50:49.808 [reactor-http-epoll-2] DEBUG reactor.netty.resources.DefaultPooledConnectionProvider - [id:d962b126, L:/192.168.1.5:44690 - R:stream.binance.com/52.199.12.133:9443] onStateChange(PooledConnection{channel=[id: 0xd962b126, L:/192.168.1.5:44690 - R:stream.binance.com/52.199.12.133:9443]}, [connected])
16:50:49.826 [reactor-http-epoll-2] DEBUG reactor.netty.resources.DefaultPooledConnectionProvider - [id:d962b126-1, L:/192.168.1.5:44690 - R:stream.binance.com/52.199.12.133:9443] onStateChange(GET{uri=/, connection=PooledConnection{channel=[id: 0xd962b126, L:/192.168.1.5:44690 - R:stream.binance.com/52.199.12.133:9443]}}, [configured])
16:50:49.826 [reactor-http-epoll-2] DEBUG reactor.netty.http.client.HttpClientConnect - [id:d962b126-1, L:/192.168.1.5:44690 - R:stream.binance.com/52.199.12.133:9443] Handler is being applied: {uri=wss://stream.binance.com:9443/ws, method=GET}
16:50:49.830 [reactor-http-epoll-2] DEBUG reactor.netty.resources.DefaultPooledConnectionProvider - [id:d962b126-1, L:/192.168.1.5:44690 - R:stream.binance.com/52.199.12.133:9443] onStateChange(GET{uri=/ws, connection=PooledConnection{channel=[id: 0xd962b126, L:/192.168.1.5:44690 - R:stream.binance.com/52.199.12.133:9443]}}, [request_prepared])
16:50:49.839 [reactor-http-epoll-2] DEBUG reactor.netty.ReactorNetty - [id:d962b126-1, L:/192.168.1.5:44690 - R:stream.binance.com/52.199.12.133:9443] Added encoder [reactor.left.httpAggregator] at the beginning of the user pipeline, full pipeline: [reactor.left.sslHandler, reactor.left.httpCodec, reactor.left.httpAggregator, reactor.right.reactiveBridge, DefaultChannelPipeline$TailContext#0]
16:50:49.839 [reactor-http-epoll-2] DEBUG reactor.netty.ReactorNetty - [id:d962b126-1, L:/192.168.1.5:44690 - R:stream.binance.com/52.199.12.133:9443] Non Removed handler: reactor.left.httpMetricsHandler, context: null, pipeline: DefaultChannelPipeline{(reactor.left.sslHandler = io.netty.handler.ssl.SslHandler), (reactor.left.httpCodec = io.netty.handler.codec.http.HttpClientCodec), (reactor.left.httpAggregator = io.netty.handler.codec.http.HttpObjectAggregator), (reactor.right.reactiveBridge = reactor.netty.channel.ChannelOperationsHandler)}
16:50:49.840 [reactor-http-epoll-2] DEBUG reactor.netty.http.client.HttpClientOperations - [id:d962b126-1, L:/192.168.1.5:44690 - R:stream.binance.com/52.199.12.133:9443] Attempting to perform websocket handshake with wss://stream.binance.com:9443/ws
16:50:49.846 [reactor-http-epoll-2] DEBUG io.netty.handler.codec.http.websocketx.WebSocketClientHandshaker13 - WebSocket version 13 client handshake key: 7FNVb427OHllyiM2Clg//g==, expected response: iTvQFIKtv7xyyXvmEAooh8NZPVw=
16:50:50.122 [reactor-http-epoll-2] DEBUG reactor.netty.resources.DefaultPooledConnectionProvider - [id:d962b126-1, L:/192.168.1.5:44690 - R:stream.binance.com/52.199.12.133:9443] onStateChange(ws{uri=/ws, connection=PooledConnection{channel=[id: 0xd962b126, L:/192.168.1.5:44690 - R:stream.binance.com/52.199.12.133:9443]}}, [response_received])
16:50:50.135 [reactor-http-epoll-2] DEBUG org.springframework.web.reactive.socket.adapter.ReactorNettyWebSocketSession - [36eb4d6b] Session id "36eb4d6b" for wss://stream.binance.com:9443/ws
16:50:50.135 [reactor-http-epoll-2] DEBUG org.springframework.web.reactive.socket.client.ReactorNettyWebSocketClient - Started session '36eb4d6b' for wss://stream.binance.com:9443/ws
16:50:50.147 [reactor-http-epoll-2] DEBUG reactor.netty.ReactorNetty - [id:d962b126-1, L:/192.168.1.5:44690 - R:stream.binance.com/52.199.12.133:9443] Added decoder [reactor.left.wsFrameAggregator] at the end of the user pipeline, full pipeline: [reactor.left.sslHandler, reactor.left.httpCodec, ws-decoder, ws-encoder, reactor.left.wsFrameAggregator, reactor.right.reactiveBridge, DefaultChannelPipeline$TailContext#0]
16:50:50.149 [reactor-http-epoll-2] DEBUG reactor.netty.channel.FluxReceive - [id:d962b126-1, L:/192.168.1.5:44690 - R:stream.binance.com/52.199.12.133:9443] FluxReceive{pending=0, cancelled=false, inboundDone=false, inboundError=null}: subscribing inbound receiver
16:50:50.150 [reactor-http-epoll-2] INFO TRACE - onSubscribe(FluxMap.MapSubscriber)
16:50:50.150 [reactor-http-epoll-2] INFO TRACE - request(256)
16:50:50.411 [reactor-http-epoll-2] INFO TRACE - onNext(evt)
16:50:50.413 [reactor-http-epoll-2] INFO TRACE - request(1)
...
16:52:16.652 [reactor-http-epoll-2] INFO TRACE - onNext(evt)
16:52:16.652 [reactor-http-epoll-2] INFO TRACE - request(1)
16:52:17.168 [reactor-http-epoll-2] DEBUG reactor.netty.resources.PooledConnectionProvider - [id:d962b126-1, L:/192.168.1.5:44690 ! R:stream.binance.com/52.199.12.133:9443] Channel closed, now: 0 active connections, 0 inactive connections and 0 pending acquire requests.
16:52:17.169 [reactor-http-epoll-2] DEBUG reactor.netty.ReactorNetty - [id:d962b126-1, L:/192.168.1.5:44690 ! R:stream.binance.com/52.199.12.133:9443] Non Removed handler: reactor.left.httpAggregator, context: null, pipeline: DefaultChannelPipeline{(reactor.left.sslHandler = io.netty.handler.ssl.SslHandler), (ws-decoder = io.netty.handler.codec.http.websocketx.WebSocket13FrameDecoder), (ws-encoder = io.netty.handler.codec.http.websocketx.WebSocket13FrameEncoder), (reactor.left.wsFrameAggregator = io.netty.handler.codec.http.websocketx.WebSocketFrameAggregator), (reactor.right.reactiveBridge = reactor.netty.channel.ChannelOperationsHandler)}
A completed
A terminated
16:52:17.172 [reactor-http-epoll-2] INFO TRACE - onComplete()
B completed
B terminated
C success
C terminated
16:52:17.177 [reactor-http-epoll-2] DEBUG reactor.netty.resources.DefaultPooledConnectionProvider - [id:d962b126, L:/192.168.1.5:44690 ! R:stream.binance.com/52.199.12.133:9443] onStateChange(ws{uri=/ws, connection=PooledConnection{channel=[id: 0xd962b126, L:/192.168.1.5:44690 ! R:stream.binance.com/52.199.12.133:9443]}}, [response_completed])
16:52:17.177 [reactor-http-epoll-2] DEBUG reactor.netty.resources.DefaultPooledConnectionProvider - [id:d962b126, L:/192.168.1.5:44690 ! R:stream.binance.com/52.199.12.133:9443] onStateChange(ws{uri=/ws, connection=PooledConnection{channel=[id: 0xd962b126, L:/192.168.1.5:44690 ! R:stream.binance.com/52.199.12.133:9443]}}, [disconnecting])

I tried to reproduce the issue using another technology like javascript but everything runs fine. It seems that the channel is closed so I tried to tune the ChannelOptions at TcpClient level... still no luck !

TcpClient wsTcp = TcpClient.create();
wsTcp.option(ChannelOption.AUTO_CLOSE, Boolean.FALSE);
wsTcp.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, Integer.MAX_VALUE);
wsTcp.option(ChannelOption.AUTO_READ, Boolean.TRUE);
wsTcp.option(ChannelOption.SO_KEEPALIVE, Boolean.TRUE);
wsTcp.option(ChannelOption.SO_TIMEOUT, Integer.MAX_VALUE);

I provided a java sample code to reproduce the issue:

package test;

import java.net.URI;
import java.util.concurrent.CountDownLatch;

import org.springframework.web.reactive.socket.client.ReactorNettyWebSocketClient;

import reactor.core.publisher.EmitterProcessor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

public class wsTest {

    public static void main(String[] args) throws InterruptedException {
        CountDownLatch latch = new CountDownLatch(1);
        ReactorNettyWebSocketClient wsclient = new ReactorNettyWebSocketClient();
        wsclient.setMaxFramePayloadLength(Integer.MAX_VALUE);
        EmitterProcessor<String> output = EmitterProcessor.create();
        Mono<Void> execMono = wsclient.execute(URI.create("wss://stream.binance.com:9443/ws"),
                session -> session.send(Flux.just(session.textMessage("{\"method\": \"SUBSCRIBE\",\"params\":[\"!ticker@arr\"],\"id\": 1}")))
                        .thenMany(session
                                .receive()
                                .doOnCancel(() -> System.out.println("A cancelled"))
                                .doOnComplete(() -> System.out.println("A completed"))
                                .doOnTerminate(() -> System.out.println("A terminated"))
                                .map(x -> "evt")
                                .log("TRACE")
                                .subscribeWith(output).then())
                        .then());

        output.doOnCancel(() -> System.out.println("B cancelled"))
                .doOnComplete(() -> System.out.println("B completed"))
                .doOnTerminate(() -> System.out.println("B terminated"))
                .doOnSubscribe(s -> execMono
                        .doOnCancel(() -> System.out.println("C cancelled"))
                        .doOnSuccess(x -> System.out.println("C success"))
                        .doOnTerminate(() -> System.out.println("C terminated"))
                        .subscribe())
                .subscribe();

        latch.await();
    }
}

I don't understand why I get completed/terminated event from ReactorNettyWebSocketClient WebSocketHandler ?

I also posted my issue on Stackoverflow https://stackoverflow.com/questions/68792765/reactor-netty-websocket-channel-closed-prematurely

Thank you for your help,

violetagg commented 3 years ago

@greg007r Instead of wss://stream.binance.com:9443/ws can you try with wss://echo.websocket.org or similar server that just echos the incoming data e.g. https://github.com/reactor/reactor-netty/blob/7608571fb944a173481828b077d25776e08dd0eb/reactor-netty-examples/src/main/java/reactor/netty/examples/documentation/http/server/routing/Application.java#L34-L35

Also can you take a tcp dump (for example with Wireshark) in order to see the peer that closes the connection.

OlegDokuka commented 3 years ago

@greg007r as of Binance Websocket API docs you have to send PONG message back on every incoming PING. If it is not done, Binance closes the connection prematurely.

Also, it feels like this is a question that would be better suited to Gitter or Stack Overflow. As mentioned in README, we prefer to use GitHub issues only for bugs and enhancements. Feel free to update this issue with a link to the re-posted question (so that other people can find it) or add some more details if you feel this is a genuine bug.

greg007r commented 3 years ago

Thank you for your feedback, I already investigated the PING/PONG frames that should be managed transparently by the ReactorNettyWebSocketClient having wsclient.setHandlePing(Boolean.FALSE)configured. So, as far as I know, I should not take care of this. I don't think this is the root cause. If I choose another service having less payload over time the connection is kept 10 min instead of 3 min. I will give a try on the echo websocket but the message is not continuously delivered as it is for price ticker without any further message sent by the client.

OlegDokuka commented 3 years ago

@greg007r I would suggest doing a TCP dump to see who is closing the connection.

Also, I will try to replace EmitterProcess with DirectProcessor just to ensure that the problem is not related to backpressure

greg007r commented 3 years ago

I replaced EmitterProcessor by DirectProcessor as explained and did a wireshark TCP dump. Screenshot from 2021-08-16 18-58-02 I don't understand why I'm not facing the same issue using javascript WebSocket or plain java 14 java.net.http.WebSocket Those two websocket client implementations runs perfectly for hours ... `<!DOCTYPE HTML>

`
greg007r commented 3 years ago

Also, I adapted the way I initialize the ReactorNettyWebSocketClient using the new WebsocketSpec Builder to be sure the ping handling is properly initialized :

WebsocketSpec.Builder builder = WebsocketClientSpec.builder() .handlePing(false) .maxFramePayloadLength(Integer.MAX_VALUE); ReactorNettyWebSocketClient wsclient = new ReactorNettyWebSocketClient(httpClient, builder);

violetagg commented 3 years ago

@greg007r Can you try the code below. I tried to do the example with Reactor Netty only.

    public static void main(String[] args) throws InterruptedException {
        CountDownLatch latch = new CountDownLatch(1);
        EmitterProcessor<String> output = EmitterProcessor.create();
        Mono<Void> execMono =
                HttpClient.create()
                        .websocket(WebsocketClientSpec.builder().maxFramePayloadLength(Integer.MAX_VALUE).build())
                        .uri(URI.create("wss://stream.binance.com:9443/ws"))
                        .handle((in, out) -> out.sendObject(Flux.just(new TextWebSocketFrame("{\"method\": \"SUBSCRIBE\",\"params\":[\"!ticker@arr\"],\"id\": 1}")))
                                .then(in.receive()
                                        .doOnCancel(() -> System.out.println("A cancelled"))
                                        .doOnComplete(() -> System.out.println("A completed"))
                                        .doOnTerminate(() -> System.out.println("A terminated"))
                                        .map(x -> "evt")
                                        .log("TRACE")
                                        .subscribeWith(output)
                                        .then()))
                        .then();

        output.doOnCancel(() -> System.out.println("B cancelled"))
                .doOnComplete(() -> System.out.println("B completed"))
                .doOnTerminate(() -> System.out.println("B terminated"))
                .doOnSubscribe(s -> execMono
                        .doOnCancel(() -> System.out.println("C cancelled"))
                        .doOnSuccess(x -> System.out.println("C success"))
                        .doOnTerminate(() -> System.out.println("C terminated"))
                        .subscribe())
                .subscribe();

        latch.await();
    }
greg007r commented 3 years ago

@violetagg I tested and got the channel closed after 2min

11:44:06.217 [reactor-http-epoll-2] DEBUG reactor.netty.resources.PooledConnectionProvider - [id:17cab690-1, L:/192.168.1.5:50178 ! R:stream.binance.com/52.193.213.21:9443] Channel closed, now: 0 active connections, 0 inactive connections and 0 pending acquire requests.
11:44:06.218 [reactor-http-epoll-2] DEBUG reactor.netty.ReactorNetty - [id:17cab690-1, L:/192.168.1.5:50178 ! R:stream.binance.com/52.193.213.21:9443] Non Removed handler: reactor.left.httpAggregator, context: null, pipeline: DefaultChannelPipeline{(reactor.left.sslHandler = io.netty.handler.ssl.SslHandler), (ws-decoder = io.netty.handler.codec.http.websocketx.WebSocket13FrameDecoder), (ws-encoder = io.netty.handler.codec.http.websocketx.WebSocket13FrameEncoder), (reactor.right.reactiveBridge = reactor.netty.channel.ChannelOperationsHandler)}
A completed
A terminated
11:44:06.218 [reactor-http-epoll-2] INFO TRACE - onComplete()
B completed
B terminated
C success
C terminated
11:44:06.219 [reactor-http-epoll-2] DEBUG reactor.netty.resources.DefaultPooledConnectionProvider - [id:17cab690, L:/192.168.1.5:50178 ! R:stream.binance.com/52.193.213.21:9443] onStateChange(ws{uri=/ws, connection=PooledConnection{channel=[id: 0x17cab690, L:/192.168.1.5:50178 ! R:stream.binance.com/52.193.213.21:9443]}}, [response_completed])
11:44:06.219 [reactor-http-epoll-2] DEBUG reactor.netty.resources.DefaultPooledConnectionProvider - [id:17cab690, L:/192.168.1.5:50178 ! R:stream.binance.com/52.193.213.21:9443] onStateChange(ws{uri=/ws, connection=PooledConnection{channel=[id: 0x17cab690, L:/192.168.1.5:50178 ! R:stream.binance.com/52.193.213.21:9443]}}, [disconnecting])

I tested also a third websocket implementation in Python 2.3.7 with binance-python, it is also working fine for several hours ... Here is the sample code :

import asyncio
from binance import AsyncClient, BinanceSocketManager
from datetime import datetime

async def main():
    client = await AsyncClient.create()
    bm = BinanceSocketManager(client)
    # start any sockets here, i.e a trade socket
    ts = bm.multiplex_socket(['!ticker@arr'])
    # then start receiving messages
    async with ts as tscm:
        while True:
            res = await tscm.recv()
            print("evt ", datetime.now())

    await client.close_connection()

if __name__ == "__main__":

    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

I'm really disappointed because I implemented my project in a async way end to end with webflux. As I have 3 different implementations (js, native java, python) working for several hours, I'm more and more convinced that I'm facing a bug from netty reactor.

I try to isolate the issue as much as possible ... all the sample codes I provided are easily replayable. I will also test with a different provider such as Jetty and then start to debug from the sources ... I have no other option

violetagg commented 3 years ago

@simonbasle Can you recommend the best way for avoiding EmitterProcessor here:

        EmitterProcessor<String> output = EmitterProcessor.create();
        Mono<Void> execMono = wsclient.execute(URI.create("wss://stream.binance.com:9443/ws"),
                session -> session.send(Flux.just(session.textMessage("{\"method\": \"SUBSCRIBE\",\"params\":[\"!ticker@arr\"],\"id\": 1}")))
                        .thenMany(session
                                .receive()
                                .doOnCancel(() -> System.out.println("A cancelled"))
                                .doOnComplete(() -> System.out.println("A completed"))
                                .doOnTerminate(() -> System.out.println("A terminated"))
                                .map(x -> "evt")
                                .log("TRACE")
                                .subscribeWith(output).then())
simonbasle commented 3 years ago

@violetagg @greg007r I don't understand the intent behind this EmitterProcessor, other than complicating the code...

@greg007r you have it subscribe to monoExec, then subscribe to the processor, THEN when said processor is subscribed to you subscribe to monoExec again in a doOnSubscribe?? This makes absolutely no sense to me, except for seeking complication for the sake of complexity...

greg007r commented 3 years ago

My bad ... you are right, so now the issue is reproducible with this simple code :

public class SocketTest {
       public static void main(String[] args) throws InterruptedException {
        CountDownLatch latch = new CountDownLatch(1);
        new ReactorNettyWebSocketClient(HttpClient.create(), 
                WebsocketClientSpec.builder()
                .handlePing(false)
                .maxFramePayloadLength(Integer.MAX_VALUE))
                .execute(URI.create("wss://stream.binance.com:9443/ws"),
                session -> session.send(Flux.just(session.textMessage("{\"method\": \"SUBSCRIBE\",\"params\":[\"!ticker@arr\"],\"id\": 1}")))
                        .thenMany(session
                                .receive()
                                .doOnCancel(() -> System.out.println("A cancelled"))
                                .doOnComplete(() -> System.out.println("A completed"))
                                .doOnTerminate(() -> System.out.println("A terminated"))
                                .map(x -> "evt")
                                .log("TRACE").then())
                        .then()).subscribe();
        latch.await();
    }
}
violetagg commented 3 years ago

@greg007r Can you also specify your Java version and vendor because somehow I cannot reproduce this ... neither on Mac OS nor on Ubuntu

greg007r commented 3 years ago

@violetagg Do you mean that the websocket keeps running after 10 min for you ? This is my linux mint setup : Linux 4.15.0-153-generic #160-Ubuntu x86_64 x86_64 x86_64 GNU/Linux openjdk version "14" 2020-03-17 OpenJDK Runtime Environment AdoptOpenJDK (build 14+36) OpenJDK 64-Bit Server VM AdoptOpenJDK (build 14+36, mixed mode, sharing)

greg007r commented 3 years ago

@violetagg I did recompile and execute on jdk11 ... issue still there

violetagg commented 3 years ago

@greg007r I need to find a way to reproduce this .... Currently I cannot reproduce it on Ubuntu: Linux 5.8.0-55-generic #62~20.04.1-Ubuntu SMP Wed Jun 2 08:55:04 UTC 2021 x86_64 x86_64 x86_64 GNU/Linux Java: adopt-openjdk-14.0.2 and adopt-openjdk-11.0.11

15:52:35.768 [reactor-http-epoll-2] INFO TRACE - onSubscribe(FluxMap.MapSubscriber)
15:52:35.770 [reactor-http-epoll-2] INFO TRACE - request(unbounded)
15:52:36.089 [reactor-http-epoll-2] INFO TRACE - onNext(evt)
15:52:38.343 [reactor-http-epoll-2] INFO TRACE - onNext(evt)
15:52:39.142 [reactor-http-epoll-2] INFO TRACE - onNext(evt)
...
16:05:46.213 [reactor-http-epoll-2] INFO TRACE - onNext(evt)
16:05:47.234 [reactor-http-epoll-2] INFO TRACE - onNext(evt)
16:05:48.257 [reactor-http-epoll-2] INFO TRACE - onNext(evt)
greg007r commented 3 years ago

@violetagg Thank you so much for your support and indeed it seems to work for you I switched to Ubuntu and java 11 to stick to your happy scenario but it is still not working for me :-( Now having : Linux greg-P7xxDM2-G 5.11.0-31-generic #33-Ubuntu SMP Wed Aug 11 13:19:04 UTC 2021 x86_64 x86_64 x86_64 GNU/Linux openjdk version "11.0.11" 2021-04-20 OpenJDK Runtime Environment (build 11.0.11+9-Ubuntu-0ubuntu2) OpenJDK 64-Bit Server VM (build 11.0.11+9-Ubuntu-0ubuntu2, mixed mode, sharing)

May I ask lastly your version used of the following components ? spring-boot-starter-parent 2.5.3 netty-transport-native-epoll-4.1.66.Final-linux-x86_64.jar reactor-netty-http-1.0.9.jar reactor-core-3.4.8.jar reactive-streams-1.0.3.jar

I would like to thank you for your premium support Violeta, that was really kind of you. Anyway, I will continue to investigate on my side

Gregory

greg007r commented 3 years ago

I finally managed to find the root cause. After some investigation, I got the returned code 1006 meaning the connection was closed abnormally by the client as documented in the rfc https://datatracker.ietf.org/doc/html/rfc6455#section-7.4.1

1006 is a reserved value and MUST NOT be set as a status code in a
      Close control frame by an endpoint.  It is designated for use in
      applications expecting a status code to indicate that the
      connection was closed abnormally, e.g., without sending or
      receiving a Close control frame.

At that time, I switched from WIFI connection to LAN connection and the issue vanished immediately. My WIFI router was not able to handle the huge payload correctly. You can close the issue and once again I would like to warmly thank you for your time @violetagg Kind reagrds, Gregory

violetagg commented 3 years ago

@greg007r nice that you found the issue!