netty / netty-incubator-codec-quic

Apache License 2.0
299 stars 70 forks source link

How to keep the `QuicChannel` alive? #132

Closed chenzhiguo closed 3 years ago

chenzhiguo commented 3 years ago

When creating a QuicChannel, set maxIdleTimeout to 5m. It means that QuicChannel (QuicheQuicChannel) will only survive for 5 seconds without data transmission, right? If so, what should I do to keep the QuicChannel alive? Can only by not setting this parameter(use max value)?

normanmaurer commented 3 years ago

Usually you would build a "heartbeat" into the protocol that sits on top of QUIC. Its the same as you are used too with TCP which may timeout after some period of time as well. The maxIdleTimeout is basically just there to ensure connections will be closed at some point if not told explicit before.

chenzhiguo commented 3 years ago

I tried to send data from the client side to the server using IdleStateHandler, as follows:

            QuicChannel quicChannel = QuicChannel.newBootstrap(channel)
                    .handler(new ChannelInitializer<QuicChannel>() {
                        @Override
                        protected void initChannel(QuicChannel channel) throws Exception {
                            channel.pipeline()
                                    .addLast("clientIdleCheckHandler", new ClientIdleCheckHandler(3, (channelHandlerContext, idleStateEvent) -> {
                                        QuicChannel quicChan = (QuicChannel) channelHandlerContext.channel();
                                        quicChan.writeAndFlush("PING");
                                        log.info("PING->{}", quicChan);
                                    }))
                                    .addLast("clientConnectionHandler", new ClientConnectionHandler(QuicBrokerClient.this::disconnect));
                        }
                    })
                    .streamHandler(new ChannelInitializer<Channel>() {
                        @Override
                        protected void initChannel(Channel socketChannel) throws Exception {
                            ChannelPipeline pipeline = socketChannel.pipeline();
                            pipeline
                                    .addLast("commandEncoder", commandEncoder)
                                    .addLast("commandDecoder", commandDecoder)
                                    .addLast("clientConnectionHandler", new ClientConnectionHandler(QuicBrokerClient.this::disconnect))
                                    .addLast("responseCommandHandler", responseCommandHandler);
                        }

                        @Override
                        public void channelActive(ChannelHandlerContext ctx) {
                            // As we did not allow any remote initiated streams we will never see this method called.
                            // That said just let us keep it here to demonstrate that this handle would be called
                            // for each remote initiated stream.
                            ctx.close();
                        }
                    })
                    .remoteAddress(ChannelUtil.string2SocketAddress(address))
                    .connect()
                    .get();

However, as a result, the server is still periodically disconnected from QuicChannel, is my way wrong?

normanmaurer commented 3 years ago

This needs to happen in a stream

chenzhiguo commented 3 years ago

Isn't it? Data is being sent via QuicStreamChannel all the time, but the server is still shut down periodically. That's weird.

I tried this phenomenon: the server receives the data regularly, and then reaches the maxIdleTimeout time, disconnected!

normanmaurer commented 3 years ago

Enable debug logging to see what happens in the QUIC level

chenzhiguo commented 3 years ago

Part of logs

2021-01-07 18:26:05.970  INFO 1046 --- [tGroup-thread-1] c.j.j.g.c.c.h.ServerConnectionHandler    : [Server]Remote Broker connect success!channel=[id: 0x92ff0fcb, L:QuicStreamAddress{streamId=36} - R:QuicStreamAddress{streamId=36}]
2021-01-07 18:26:05.971  INFO 1046 --- [tGroup-thread-1] c.j.j.g.c.t.codec.CommandDecoder         : parent channel state:open=true, active=true, writable=true, channel=(ea3393b77a7c3b5bc463bae76543d50833ecdebd)[id: 0x547deb19, L:/0:0:0:0:0:0:0:0:22000 - R:/127.0.0.1:62911]
2021-01-07 18:26:05.971  INFO 1046 --- [tGroup-thread-1] c.j.j.g.c.t.codec.CommandDecoder         : channel state:open=true, active=true, writable=true, channel=[id: 0x92ff0fcb, L:QuicStreamAddress{streamId=36} - R:QuicStreamAddress{streamId=36}]
2021-01-07 18:26:05.971 DEBUG 1046 --- [tGroup-thread-1] c.j.j.g.c.t.codec.CommandDecoder         : [###CommandDecoder###]Decode compression=0, inputStream=io.netty.buffer.ByteBufInputStream
2021-01-07 18:26:05.971 DEBUG 1046 --- [tGroup-thread-1] c.j.j.g.c.t.codec.CommandDecoder         : [###CommandDecoder###]Decode success, command={"commandBody":"请求次数:8","commandName":"disconnectCommand"}, type=com.jd.jdd.gateway.core.command.DisconnectCommand
2021-01-07 18:26:05.971  INFO 1046 --- [tGroup-thread-1] c.j.j.g.c.c.handler.BaseCommandHandler   : [BaseCommand]Received a command from other brokers, command={"commandBody":"请求次数:8","commandName":"disconnectCommand"}
2021-01-07 18:26:05.971 DEBUG 1046 --- [tGroup-thread-1] io.netty.incubator.codec.quic.Quiche     : quiche: ea3393b77a7c3b5bc463bae76543d50833ecdebd tx pkt Short dcid=c3bd5c41b7188107cf92f18d0163a12ee611438d key_phase=false len=22 pn=10
2021-01-07 18:26:05.971 DEBUG 1046 --- [tGroup-thread-1] io.netty.incubator.codec.quic.Quiche     : quiche: ea3393b77a7c3b5bc463bae76543d50833ecdebd tx frm ACK delay=112 blocks=[0..10]
2021-01-07 18:26:05.971 DEBUG 1046 --- [tGroup-thread-1] io.netty.incubator.codec.quic.Quiche     : quiche::recovery: ea3393b77a7c3b5bc463bae76543d50833ecdebd timer=none latest_rtt=1.676105ms srtt=Some(3.62405ms) min_rtt=1.578633ms rttvar=1.576655ms loss_time=[None, None, None] loss_probes=[0, 0, 0] cwnd=10164 ssthresh=10164 bytes_in_flight=0 app_limited=true congestion_recovery_start_time=Some(Instant { t: 1787656243961688 }) delivered=3217 delivered_time=5.009715825s recent_delivered_packet_sent_time=5.011392286s app_limited_at_pkt=0  hystart=window_end=None last_round_min_rtt=None current_round_min_rtt=None rtt_sample_count=0 lss_start_time=None  
2021-01-07 18:26:05.971 DEBUG 1046 --- [tGroup-thread-1] io.netty.incubator.codec.quic.Quiche     : quiche: ea3393b77a7c3b5bc463bae76543d50833ecdebd rx pkt Short dcid=ea3393b77a7c3b5bc463bae76543d50833ecdebd key_phase=false len=100 pn=11
2021-01-07 18:26:05.972 DEBUG 1046 --- [tGroup-thread-1] io.netty.incubator.codec.quic.Quiche     : quiche: ea3393b77a7c3b5bc463bae76543d50833ecdebd rx frm STREAM id=40 off=0 len=78 fin=true
2021-01-07 18:26:05.972  INFO 1046 --- [tGroup-thread-1] c.j.j.g.c.c.h.ServerConnectionHandler    : [Server]Remote Broker connect success!channel=[id: 0x07b41649, L:QuicStreamAddress{streamId=40} - R:QuicStreamAddress{streamId=40}]
2021-01-07 18:26:05.972  INFO 1046 --- [tGroup-thread-1] c.j.j.g.c.t.codec.CommandDecoder         : parent channel state:open=true, active=true, writable=true, channel=(ea3393b77a7c3b5bc463bae76543d50833ecdebd)[id: 0x547deb19, L:/0:0:0:0:0:0:0:0:22000 - R:/127.0.0.1:62911]
2021-01-07 18:26:05.972  INFO 1046 --- [tGroup-thread-1] c.j.j.g.c.t.codec.CommandDecoder         : channel state:open=true, active=true, writable=true, channel=[id: 0x07b41649, L:QuicStreamAddress{streamId=40} - R:QuicStreamAddress{streamId=40}]
2021-01-07 18:26:05.972 DEBUG 1046 --- [tGroup-thread-1] c.j.j.g.c.t.codec.CommandDecoder         : [###CommandDecoder###]Decode compression=0, inputStream=io.netty.buffer.ByteBufInputStream
2021-01-07 18:26:05.972 DEBUG 1046 --- [tGroup-thread-1] c.j.j.g.c.t.codec.CommandDecoder         : [###CommandDecoder###]Decode success, command={"commandBody":"请求次数:9","commandName":"disconnectCommand"}, type=com.jd.jdd.gateway.core.command.DisconnectCommand
2021-01-07 18:26:05.972  INFO 1046 --- [tGroup-thread-1] c.j.j.g.c.c.handler.BaseCommandHandler   : [BaseCommand]Received a command from other brokers, command={"commandBody":"请求次数:9","commandName":"disconnectCommand"}
2021-01-07 18:26:05.972 DEBUG 1046 --- [tGroup-thread-1] io.netty.incubator.codec.quic.Quiche     : quiche: ea3393b77a7c3b5bc463bae76543d50833ecdebd tx pkt Short dcid=c3bd5c41b7188107cf92f18d0163a12ee611438d key_phase=false len=22 pn=11
2021-01-07 18:26:05.972 DEBUG 1046 --- [tGroup-thread-1] io.netty.incubator.codec.quic.Quiche     : quiche: ea3393b77a7c3b5bc463bae76543d50833ecdebd tx frm ACK delay=102 blocks=[0..11]
2021-01-07 18:26:05.972 DEBUG 1046 --- [tGroup-thread-1] io.netty.incubator.codec.quic.Quiche     : quiche::recovery: ea3393b77a7c3b5bc463bae76543d50833ecdebd timer=none latest_rtt=1.676105ms srtt=Some(3.62405ms) min_rtt=1.578633ms rttvar=1.576655ms loss_time=[None, None, None] loss_probes=[0, 0, 0] cwnd=10164 ssthresh=10164 bytes_in_flight=0 app_limited=true congestion_recovery_start_time=Some(Instant { t: 1787656243961688 }) delivered=3217 delivered_time=5.010877178s recent_delivered_packet_sent_time=5.012553649s app_limited_at_pkt=0  hystart=window_end=None last_round_min_rtt=None current_round_min_rtt=None rtt_sample_count=0 lss_start_time=None  
2021-01-07 18:26:06.202 DEBUG 1046 --- [tGroup-thread-1] io.netty.incubator.codec.quic.Quiche     : quiche: ea3393b77a7c3b5bc463bae76543d50833ecdebd rx pkt Short dcid=ea3393b77a7c3b5bc463bae76543d50833ecdebd key_phase=false len=102 pn=12
2021-01-07 18:26:06.202 DEBUG 1046 --- [tGroup-thread-1] io.netty.incubator.codec.quic.Quiche     : quiche: ea3393b77a7c3b5bc463bae76543d50833ecdebd rx frm STREAM id=44 off=0 len=80 fin=true
2021-01-07 18:26:06.202  INFO 1046 --- [tGroup-thread-1] c.j.j.g.c.c.h.ServerConnectionHandler    : [Server]Remote Broker connect success!channel=[id: 0x0b9dc9a1, L:QuicStreamAddress{streamId=44} - R:QuicStreamAddress{streamId=44}]
2021-01-07 18:26:06.202  INFO 1046 --- [tGroup-thread-1] c.j.j.g.c.t.codec.CommandDecoder         : parent channel state:open=true, active=true, writable=true, channel=(ea3393b77a7c3b5bc463bae76543d50833ecdebd)[id: 0x547deb19, L:/0:0:0:0:0:0:0:0:22000 - R:/127.0.0.1:62911]
2021-01-07 18:26:06.202  INFO 1046 --- [tGroup-thread-1] c.j.j.g.c.t.codec.CommandDecoder         : channel state:open=true, active=true, writable=true, channel=[id: 0x0b9dc9a1, L:QuicStreamAddress{streamId=44} - R:QuicStreamAddress{streamId=44}]
2021-01-07 18:26:06.202 DEBUG 1046 --- [tGroup-thread-1] c.j.j.g.c.t.codec.CommandDecoder         : [###CommandDecoder###]Decode compression=0, inputStream=io.netty.buffer.ByteBufInputStream
2021-01-07 18:26:06.202 DEBUG 1046 --- [tGroup-thread-1] c.j.j.g.c.t.codec.CommandDecoder         : [###CommandDecoder###]Decode success, command={"commandBody":"请求次数:10","commandName":"disconnectCommand"}, type=com.jd.jdd.gateway.core.command.DisconnectCommand
2021-01-07 18:26:06.202  INFO 1046 --- [tGroup-thread-1] c.j.j.g.c.c.handler.BaseCommandHandler   : [BaseCommand]Received a command from other brokers, command={"commandBody":"请求次数:10","commandName":"disconnectCommand"}
2021-01-07 18:26:06.203 DEBUG 1046 --- [tGroup-thread-1] io.netty.incubator.codec.quic.Quiche     : quiche: ea3393b77a7c3b5bc463bae76543d50833ecdebd tx pkt Short dcid=c3bd5c41b7188107cf92f18d0163a12ee611438d key_phase=false len=22 pn=12
2021-01-07 18:26:06.203 DEBUG 1046 --- [tGroup-thread-1] io.netty.incubator.codec.quic.Quiche     : quiche: ea3393b77a7c3b5bc463bae76543d50833ecdebd tx frm ACK delay=106 blocks=[0..12]
2021-01-07 18:26:06.203 DEBUG 1046 --- [tGroup-thread-1] io.netty.incubator.codec.quic.Quiche     : quiche::recovery: ea3393b77a7c3b5bc463bae76543d50833ecdebd timer=none latest_rtt=1.676105ms srtt=Some(3.62405ms) min_rtt=1.578633ms rttvar=1.576655ms loss_time=[None, None, None] loss_probes=[0, 0, 0] cwnd=10164 ssthresh=10164 bytes_in_flight=0 app_limited=true congestion_recovery_start_time=Some(Instant { t: 1787656243961688 }) delivered=3217 delivered_time=5.241151535s recent_delivered_packet_sent_time=5.242827966s app_limited_at_pkt=0  hystart=window_end=None last_round_min_rtt=None current_round_min_rtt=None rtt_sample_count=0 lss_start_time=None  
2021-01-07 18:26:11.203 DEBUG 1046 --- [tGroup-thread-1] io.netty.incubator.codec.quic.Quiche     : quiche: ea3393b77a7c3b5bc463bae76543d50833ecdebd idle timeout expired
2021-01-07 18:26:11.205  INFO 1046 --- [tGroup-thread-1] c.j.j.g.c.c.h.ServerConnectionHandler    : [Server]BrokerClient disconnect!ClientAddress=127.0.0.1:62911,channel=(ea3393b77a7c3b5bc463bae76543d50833ecdebd)[id: 0x547deb19, L:/0:0:0:0:0:0:0:0:22000 ! R:/127.0.0.1:62911]
2021-01-07 18:26:11.205  INFO 1046 --- [tGroup-thread-1] c.j.j.g.c.c.h.ServerConnectionHandler    : [Server]BrokerClient disconnect!ClientAddress=QuicStreamAddress{streamId=0},channel=[id: 0x41a31bf6, L:QuicStreamAddress{streamId=0} ! R:QuicStreamAddress{streamId=0}]
chenzhiguo commented 3 years ago

I found that when I commented out maxIdleTimeout and created a StreamChannel from the client multiple times and sent data, the server didn't receive all of it. While the connection state is ok.

normanmaurer commented 3 years ago

Did you check if the future failed or not ?

chenzhiguo commented 3 years ago

Yes,Modified QuicClientExample.java

for (int i = 0; i < 20; i++) {
                QuicStreamChannel streamChannel = getStreamChannel(quicChannel);
                System.out.println("Send message:" + i + "; streamChannel:" + streamChannel);
                System.out.println(quicChannel);
                // Write the data and send the FIN. After this its not possible anymore to write any more data.
                streamChannel.writeAndFlush(Unpooled.copiedBuffer("GET /\r\n", CharsetUtil.US_ASCII))
                        .addListener(future -> {
                            if (future.isSuccess()) {
                                System.out.println("success! streamChannel:" + streamChannel);
                            } else {
                                System.out.println("fail! streamChannel:" + streamChannel);
                            }
                        })
                        .addListener(QuicStreamChannel.SHUTDOWN_OUTPUT);
                streamChannel.closeFuture();//.sync();
                Thread.sleep(300);
            }

Exports success 20 times, but receives "Hello World!" only 4 times.

normanmaurer commented 3 years ago

Can you please share a full reproducer that I can run ?

chenzhiguo commented 3 years ago

OK~
QuicServerExample.java

package io.netty.incubator.codec.quic.example;

import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioDatagramChannel;
import io.netty.handler.codec.LineBasedFrameDecoder;
import io.netty.incubator.codec.quic.InsecureQuicTokenHandler;
import io.netty.incubator.codec.quic.QuicChannel;
import io.netty.incubator.codec.quic.QuicServerCodecBuilder;
import io.netty.incubator.codec.quic.QuicStreamChannel;
import io.netty.util.CharsetUtil;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;

import java.net.InetSocketAddress;
import java.util.concurrent.TimeUnit;

public final class QuicServerExample {

    private static final InternalLogger LOGGER = InternalLoggerFactory.getInstance(QuicServerExample.class);

    private QuicServerExample() { }

    public static void main(String[] args) throws Exception {
        // We just want to support HTTP 0.9 as application protocol
        byte[] proto = new byte[] {
                0x08, 'h', 't', 't', 'p', '/', '0', '.', '9'
        };

        NioEventLoopGroup group = new NioEventLoopGroup(1);
        ChannelHandler codec = new QuicServerCodecBuilder()
                .certificateChain("./src/test/resources/cert.crt")
                .privateKey("./src/test/resources/cert.key")
                .applicationProtocols(proto)
                // .maxIdleTimeout(5000, TimeUnit.MILLISECONDS)
                // Configure some limits for the maximal number of streams (and the data) that we want to handle.
                .initialMaxData(10000000)
                .initialMaxStreamDataBidirectionalLocal(1000000)
                .initialMaxStreamDataBidirectionalRemote(1000000)
                .initialMaxStreamsBidirectional(100)
                .initialMaxStreamsUnidirectional(100)

                // Setup a token handler. In a production system you would want to implement and provide your custom
                // one.
                .tokenHandler(InsecureQuicTokenHandler.INSTANCE)
                // ChannelHandler that is added into QuicChannel pipeline.
                .handler(new ChannelInboundHandlerAdapter() {
                    @Override
                    public void channelActive(ChannelHandlerContext ctx) {
                        QuicChannel channel = (QuicChannel) ctx.channel();
                        // Create streams etc..
                    }

                    public void channelInactive(ChannelHandlerContext ctx) {
                        ((QuicChannel) ctx.channel()).collectStats().addListener(f -> {
                            if (f.isSuccess()) {
                                LOGGER.info("Connection closed: {}", f.getNow());
                            }
                        });
                    }

                    @Override
                    public boolean isSharable() {
                        return true;
                    }
                })
                .streamHandler(new ChannelInitializer<QuicStreamChannel>() {
                    @Override
                    protected void initChannel(QuicStreamChannel ch)  {
                        // Add a LineBasedFrameDecoder here as we just want to do some simple HTTP 0.9 handling.
                        ch.pipeline().addLast(new LineBasedFrameDecoder(1024))
                                .addLast(new ChannelInboundHandlerAdapter() {
                            @Override
                            public void channelRead(ChannelHandlerContext ctx, Object msg) {
                                ByteBuf byteBuf = (ByteBuf) msg;
                                try {
                                    System.out.println("Recv: "+byteBuf.toString(CharsetUtil.US_ASCII).trim());
                                    if (byteBuf.toString(CharsetUtil.US_ASCII).trim().equals("GET /")) {
                                        ByteBuf buffer = ctx.alloc().directBuffer();
                                        buffer.writeCharSequence("Hello World!\r\n", CharsetUtil.US_ASCII);
                                        // Write the buffer and shutdown the output by writing a FIN.
                                        ctx.writeAndFlush(buffer).addListener(QuicStreamChannel.SHUTDOWN_OUTPUT);
                                    }
                                } finally {
                                    byteBuf.release();
                                }
                            }
                        });
                    }
                }).build();
        try {
            Bootstrap bs = new Bootstrap();
            Channel channel = bs.group(group)
                    .channel(NioDatagramChannel.class)
                    .handler(codec)
                    .bind(new InetSocketAddress(9999)).sync().channel();
            channel.closeFuture().sync();
        } finally {
            group.shutdownGracefully();
        }
    }
}

QuicClientExample.java

package io.netty.incubator.codec.quic.example;

import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.ChannelInputShutdownReadComplete;
import io.netty.channel.socket.nio.NioDatagramChannel;
import io.netty.incubator.codec.quic.QuicChannel;
import io.netty.incubator.codec.quic.QuicClientCodecBuilder;
import io.netty.incubator.codec.quic.QuicStreamChannel;
import io.netty.incubator.codec.quic.QuicStreamType;
import io.netty.util.CharsetUtil;
import io.netty.util.NetUtil;

import java.net.InetSocketAddress;

public final class QuicClientExample {

    private QuicClientExample() { }

    public static void main(String[] args) throws Exception {
        // We just want to support HTTP 0.9 as application protocol
        byte[] proto = new byte[] {
                0x08, 'h', 't', 't', 'p', '/', '0', '.', '9'
        };

        NioEventLoopGroup group = new NioEventLoopGroup(1);
        try {
            ChannelHandler codec = new QuicClientCodecBuilder()
                    .applicationProtocols(proto)
                    // .maxIdleTimeout(5000, TimeUnit.MILLISECONDS)
                    .initialMaxData(10000000)
                    // As we don't want to support remote initiated streams just setup the limit for local initiated
                    // streams in this example.
                    .initialMaxStreamDataBidirectionalLocal(1000000)
                    .build();

            Bootstrap bs = new Bootstrap();
            Channel channel = bs.group(group)
                    .channel(NioDatagramChannel.class)
                    .handler(codec)
                    .bind(0).sync().channel();

            QuicChannel quicChannel = QuicChannel.newBootstrap(channel)
                    .streamHandler(new ChannelInboundHandlerAdapter() {
                        @Override
                        public void channelActive(ChannelHandlerContext ctx) {
                            // As we did not allow any remote initiated streams we will never see this method called.
                            // That said just let us keep it here to demonstrate that this handle would be called
                            // for each remote initiated stream.
                            ctx.close();
                        }
                    })
                    .remoteAddress(new InetSocketAddress(NetUtil.LOCALHOST4, 9999))
                    .connect()
                    .get();

            for (int i = 0; i < 20; i++) {
                QuicStreamChannel streamChannel = getStreamChannel(quicChannel);
                System.out.println("send message:" + i + "; streamChannel:" + streamChannel);
                System.out.println(quicChannel);
                // Write the data and send the FIN. After this its not possible anymore to write any more data.
                streamChannel.writeAndFlush(Unpooled.copiedBuffer("GET /\r\n", CharsetUtil.US_ASCII))
                        .addListener(future -> {
                            if (future.isSuccess()) {
                                System.out.println("success! streamChannel:" + streamChannel);
                            } else {
                                System.out.println("fail! streamChannel:" + streamChannel);
                            }
                        })
                        .addListener(QuicStreamChannel.SHUTDOWN_OUTPUT);
                // streamChannel.closeFuture().sync();
                Thread.sleep(300);
            }
            Thread.currentThread().join();
            // Wait for the stream channel and quic channel to be closed (this will happen after we received the FIN).
            // After this is done we will close the underlying datagram channel.
            // streamChannel.closeFuture().sync();
            // quicChannel.closeFuture().sync();
            // channel.close().sync();
        } finally {
            group.shutdownGracefully();
        }
    }

    private static QuicStreamChannel getStreamChannel(QuicChannel quicChannel) throws InterruptedException {
        return quicChannel.createStream(QuicStreamType.BIDIRECTIONAL,
                new ChannelInboundHandlerAdapter() {
                    @Override
                    public void channelRead(ChannelHandlerContext ctx, Object msg) {
                        ByteBuf byteBuf = (ByteBuf) msg;
                        System.err.println(byteBuf.toString(CharsetUtil.US_ASCII));
                        byteBuf.release();
                    }

                    @Override
                    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
                        if (evt == ChannelInputShutdownReadComplete.INSTANCE) {
                            // Close the connection once the remote peer did send the FIN for this stream.
                            // ((QuicChannel) ctx.channel().parent()).close(true, 0,
                            //         ctx.alloc().directBuffer(16)
                            //                 .writeBytes(new byte[]{'k', 't', 'h', 'x', 'b', 'y', 'e'}));
                        }
                    }
                }).sync().getNow();
    }
}
chenzhiguo commented 3 years ago

From the above experiment, do you see any problems? @normanmaurer