vislee / leevis.com

Blog
87 stars 13 forks source link

quic-go 服务端代码跟踪 #194

Open vislee opened 1 year ago

vislee commented 1 year ago

概述

quic 协议是一个基于UDP实现的应用层通用的传输协议,有点类似于Tcp,有流控、保证数据的完整性。但是又规避了tcp的一些列缺陷,例如队头阻塞、慢启动、3次握手等。

代码跟踪

先从一个例子看起:

func echoServer() error {
    listener, err := quic.ListenAddr(addr, generateTLSConfig(), nil)
    if err != nil {
        return err
    }
    conn, err := listener.Accept(context.Background())
    if err != nil {
        return err
    }
    stream, err := conn.AcceptStream(context.Background())
    if err != nil {
        panic(err)
    }
    // Echo through the loggingWriter
    _, err = io.Copy(loggingWriter{stream}, stream)
    return err
}

server.go 中定义了

func ListenAddr(addr string, tlsConf *tls.Config, config *Config) (*Listener, error) {
    conn, err := listenUDP(addr)
    if err != nil {
        return nil, err
    }
    return (&Transport{
        Conn:        conn,
        createdConn: true,
        isSingleUse: true,
    }).Listen(tlsConf, config)
}

transport.go中定义了:

func (t *Transport) Listen(tlsConf *tls.Config, conf *Config) (*Listener, error) {
    if tlsConf == nil {
        return nil, errors.New("quic: tls.Config not set")
    }
    if err := validateConfig(conf); err != nil {
        return nil, err
    }

    t.mutex.Lock()
    defer t.mutex.Unlock()

    if t.server != nil {
        return nil, errListenerAlreadySet
    }
    conf = populateServerConfig(conf)

       // init函数重点是调用了 go t.listen(conn)
    if err := t.init(true); err != nil {
        return nil, err
    }
    s, err := newServer(t.conn, t.handlerMap, t.connIDGenerator, tlsConf, conf, t.Tracer, t.closeServer, false)
    if err != nil {
        return nil, err
    }
    t.server = s
    return &Listener{baseServer: s}, nil
}

func (t *Transport) listen(conn rawConn) {
    defer close(t.listening)
    defer getMultiplexer().RemoveConn(t.Conn)

    if err := setReceiveBuffer(t.Conn, t.logger); err != nil {
        if !strings.Contains(err.Error(), "use of closed network connection") {
            setBufferWarningOnce.Do(func() {
                if disable, _ := strconv.ParseBool(os.Getenv("QUIC_GO_DISABLE_RECEIVE_BUFFER_WARNING")); disable {
                    return
                }
                log.Printf("%s. See https://github.com/quic-go/quic-go/wiki/UDP-Receive-Buffer-Size for details.", err)
            })
        }
    }
    if err := setSendBuffer(t.Conn, t.logger); err != nil {
        if !strings.Contains(err.Error(), "use of closed network connection") {
            setBufferWarningOnce.Do(func() {
                if disable, _ := strconv.ParseBool(os.Getenv("QUIC_GO_DISABLE_RECEIVE_BUFFER_WARNING")); disable {
                    return
                }
                log.Printf("%s. See https://github.com/quic-go/quic-go/wiki/UDP-Receive-Buffer-Size for details.", err)
            })
        }
    }

    for {
               // 读取udp报文内容
        p, err := conn.ReadPacket()
        //nolint:staticcheck // SA1019 ignore this!
        // TODO: This code is used to ignore wsa errors on Windows.
        // Since net.Error.Temporary is deprecated as of Go 1.18, we should find a better solution.
        // See https://github.com/quic-go/quic-go/issues/1737 for details.
        if nerr, ok := err.(net.Error); ok && nerr.Temporary() {
            t.mutex.Lock()
            closed := t.closed
            t.mutex.Unlock()
            if closed {
                return
            }
            t.logger.Debugf("Temporary error reading from conn: %w", err)
            continue
        }
        if err != nil {
            t.close(err)
            return
        }
              // 处理请求
        t.handlePacket(p)
    }
}

func (t *Transport) handlePacket(p *receivedPacket) {
    connID, err := wire.ParseConnectionID(p.data, t.connIDLen)
    if err != nil {
        t.logger.Debugf("error parsing connection ID on packet from %s: %s", p.remoteAddr, err)
        if t.Tracer != nil {
            t.Tracer.DroppedPacket(p.remoteAddr, logging.PacketTypeNotDetermined, p.Size(), logging.PacketDropHeaderParseError)
        }
        p.buffer.MaybeRelease()
        return
    }

    if isStatelessReset := t.maybeHandleStatelessReset(p.data); isStatelessReset {
        return
    }
    if handler, ok := t.handlerMap.Get(connID); ok {
        handler.handlePacket(p)
        return
    }
    if !wire.IsLongHeaderPacket(p.data[0]) {
        t.maybeSendStatelessReset(p)
        return
    }

    t.mutex.Lock()
    defer t.mutex.Unlock()
    if t.server == nil { // no server set
        t.logger.Debugf("received a packet with an unexpected connection ID %s", connID)
        return
    }
    t.server.handlePacket(p)
}

server.go中定义了:

func (s *baseServer) handlePacket(p *receivedPacket) {
    select {
        // 新建连接
    case s.receivedPackets <- p:
    default:
        s.logger.Debugf("Dropping packet from %s (%d bytes). Server receive queue full.", p.remoteAddr, p.Size())
        if s.tracer != nil {
            s.tracer.DroppedPacket(p.remoteAddr, logging.PacketTypeNotDetermined, p.Size(), logging.PacketDropDOSPrevention)
        }
    }
}

func newServer(
    conn rawConn,
    connHandler packetHandlerManager,
    connIDGenerator ConnectionIDGenerator,
    tlsConf *tls.Config,
    config *Config,
    tracer logging.Tracer,
    onClose func(),
    acceptEarly bool,
) (*baseServer, error) {
    tokenGenerator, err := handshake.NewTokenGenerator(rand.Reader)
    if err != nil {
        return nil, err
    }
    s := &baseServer{
        conn:                    conn,
        tlsConf:                 tlsConf,
        config:                  config,
        tokenGenerator:          tokenGenerator,
        connIDGenerator:         connIDGenerator,
        connHandler:             connHandler,
        connQueue:               make(chan quicConn),
        errorChan:               make(chan struct{}),
        running:                 make(chan struct{}),
        receivedPackets:         make(chan *receivedPacket, protocol.MaxServerUnprocessedPackets),
        versionNegotiationQueue: make(chan *receivedPacket, 4),
        invalidTokenQueue:       make(chan *receivedPacket, 4),
        newConn:                 newConnection,  // 定义在connection.go中
        tracer:                  tracer,
        logger:                  utils.DefaultLogger.WithPrefix("server"),
        acceptEarlyConns:        acceptEarly,
        onClose:                 onClose,
    }
    if acceptEarly {
        s.zeroRTTQueues = map[protocol.ConnectionID]*zeroRTTQueue{}
    }
    go s.run()
    go s.runSendQueue()
    s.logger.Debugf("Listening for %s connections on %s", conn.LocalAddr().Network(), conn.LocalAddr().String())
    return s, nil
}

func (s *baseServer) run() {
    defer close(s.running)
    for {
        select {
        case <-s.errorChan:
            return
        default:
        }
        select {
        case <-s.errorChan:
            return
        case p := <-s.receivedPackets:
                         // 处理新建连接
            if bufferStillInUse := s.handlePacketImpl(p); !bufferStillInUse {
                p.buffer.Release()
            }
        }
    }
}

func (s *baseServer) handlePacketImpl(p *receivedPacket) bool /* is the buffer still in use? */ {
    if !s.nextZeroRTTCleanup.IsZero() && p.rcvTime.After(s.nextZeroRTTCleanup) {
        defer s.cleanupZeroRTTQueues(p.rcvTime)
    }

    if wire.IsVersionNegotiationPacket(p.data) {
        s.logger.Debugf("Dropping Version Negotiation packet.")
        if s.tracer != nil {
            s.tracer.DroppedPacket(p.remoteAddr, logging.PacketTypeVersionNegotiation, p.Size(), logging.PacketDropUnexpectedPacket)
        }
        return false
    }
    // Short header packets should never end up here in the first place
    if !wire.IsLongHeaderPacket(p.data[0]) {
        panic(fmt.Sprintf("misrouted packet: %#v", p.data))
    }
        // 解析长包头中的version
    v, err := wire.ParseVersion(p.data)
    // drop the packet if we failed to parse the protocol version
    if err != nil {
        s.logger.Debugf("Dropping a packet with an unknown version")
        if s.tracer != nil {
            s.tracer.DroppedPacket(p.remoteAddr, logging.PacketTypeNotDetermined, p.Size(), logging.PacketDropUnexpectedPacket)
        }
        return false
    }
    // send a Version Negotiation Packet if the client is speaking a different protocol version
        // 协议协商
    if !protocol.IsSupportedVersion(s.config.Versions, v) {
        if s.config.DisableVersionNegotiationPackets {
            return false
        }

        if p.Size() < protocol.MinUnknownVersionPacketSize {
            s.logger.Debugf("Dropping a packet with an unsupported version number %d that is too small (%d bytes)", v, p.Size())
            if s.tracer != nil {
                s.tracer.DroppedPacket(p.remoteAddr, logging.PacketTypeNotDetermined, p.Size(), logging.PacketDropUnexpectedPacket)
            }
            return false
        }
        return s.enqueueVersionNegotiationPacket(p)
    }

        // 0-RTT包?1-RTT握手后,服务端会发送一个 New Session Ticket 报文,后续才会0-RTT握手。
    if wire.Is0RTTPacket(p.data) {
        if !s.acceptEarlyConns {
            if s.tracer != nil {
                s.tracer.DroppedPacket(p.remoteAddr, logging.PacketType0RTT, p.Size(), logging.PacketDropUnexpectedPacket)
            }
            return false
        }
        return s.handle0RTTPacket(p)
    }

    // If we're creating a new connection, the packet will be passed to the connection.
    // The header will then be parsed again.
    hdr, _, _, err := wire.ParsePacket(p.data)
    if err != nil {
        if s.tracer != nil {
            s.tracer.DroppedPacket(p.remoteAddr, logging.PacketTypeNotDetermined, p.Size(), logging.PacketDropHeaderParseError)
        }
        s.logger.Debugf("Error parsing packet: %s", err)
        return false
    }
    if hdr.Type == protocol.PacketTypeInitial && p.Size() < protocol.MinInitialPacketSize {
        s.logger.Debugf("Dropping a packet that is too small to be a valid Initial (%d bytes)", p.Size())
        if s.tracer != nil {
            s.tracer.DroppedPacket(p.remoteAddr, logging.PacketTypeInitial, p.Size(), logging.PacketDropUnexpectedPacket)
        }
        return false
    }

    if hdr.Type != protocol.PacketTypeInitial {
        // Drop long header packets.
        // There's little point in sending a Stateless Reset, since the client
        // might not have received the token yet.
        s.logger.Debugf("Dropping long header packet of type %s (%d bytes)", hdr.Type, len(p.data))
        if s.tracer != nil {
            s.tracer.DroppedPacket(p.remoteAddr, logging.PacketTypeFromHeader(hdr), p.Size(), logging.PacketDropUnexpectedPacket)
        }
        return false
    }

    s.logger.Debugf("<- Received Initial packet.")

    if err := s.handleInitialImpl(p, hdr); err != nil {
        s.logger.Errorf("Error occurred handling initial packet: %s", err)
    }
    // Don't put the packet buffer back.
    // handleInitialImpl deals with the buffer.
    return true
}

func (s *baseServer) handleInitialImpl(p *receivedPacket, hdr *wire.Header) error {
    if len(hdr.Token) == 0 && hdr.DestConnectionID.Len() < protocol.MinConnectionIDLenInitial {
        p.buffer.Release()
        if s.tracer != nil {
            s.tracer.DroppedPacket(p.remoteAddr, logging.PacketTypeInitial, p.Size(), logging.PacketDropUnexpectedPacket)
        }
        return errors.New("too short connection ID")
    }

    // The server queues packets for a while, and we might already have established a connection by now.
    // This results in a second check in the connection map.
    // That's ok since it's not the hot path (it's only taken by some Initial and 0-RTT packets).
    if handler, ok := s.connHandler.Get(hdr.DestConnectionID); ok {
        handler.handlePacket(p)
        return nil
    }

    var (
        token          *handshake.Token
        retrySrcConnID *protocol.ConnectionID
    )
    origDestConnID := hdr.DestConnectionID
    if len(hdr.Token) > 0 {
        tok, err := s.tokenGenerator.DecodeToken(hdr.Token)
        if err == nil {
            if tok.IsRetryToken {
                origDestConnID = tok.OriginalDestConnectionID
                retrySrcConnID = &tok.RetrySrcConnectionID
            }
            token = tok
        }
    }

    clientAddrIsValid := s.validateToken(token, p.remoteAddr)
    if token != nil && !clientAddrIsValid {
        // For invalid and expired non-retry tokens, we don't send an INVALID_TOKEN error.
        // We just ignore them, and act as if there was no token on this packet at all.
        // This also means we might send a Retry later.
        if !token.IsRetryToken {
            token = nil
        } else {
            // For Retry tokens, we send an INVALID_ERROR if
            // * the token is too old, or
            // * the token is invalid, in case of a retry token.
            s.enqueueInvalidToken(p)
            return nil
        }
    }
    if token == nil && s.config.RequireAddressValidation(p.remoteAddr) {
        // Retry invalidates all 0-RTT packets sent.
        delete(s.zeroRTTQueues, hdr.DestConnectionID)
        go func() {
            defer p.buffer.Release()
            if err := s.sendRetry(p.remoteAddr, hdr, p.info); err != nil {
                s.logger.Debugf("Error sending Retry: %s", err)
            }
        }()
        return nil
    }

    if queueLen := atomic.LoadInt32(&s.connQueueLen); queueLen >= protocol.MaxAcceptQueueSize {
        s.logger.Debugf("Rejecting new connection. Server currently busy. Accept queue length: %d (max %d)", queueLen, protocol.MaxAcceptQueueSize)
        go func() {
            defer p.buffer.Release()
            if err := s.sendConnectionRefused(p.remoteAddr, hdr, p.info); err != nil {
                s.logger.Debugf("Error rejecting connection: %s", err)
            }
        }()
        return nil
    }

    connID, err := s.connIDGenerator.GenerateConnectionID()
    if err != nil {
        return err
    }
    s.logger.Debugf("Changing connection ID to %s.", connID)
    var conn quicConn
    tracingID := nextConnTracingID()
    if added := s.connHandler.AddWithConnID(hdr.DestConnectionID, connID, func() (packetHandler, bool) {
        config := s.config
        if s.config.GetConfigForClient != nil {
            conf, err := s.config.GetConfigForClient(&ClientHelloInfo{RemoteAddr: p.remoteAddr})
            if err != nil {
                s.logger.Debugf("Rejecting new connection due to GetConfigForClient callback")
                return nil, false
            }
            config = populateConfig(conf)
        }
        var tracer logging.ConnectionTracer
        if config.Tracer != nil {
            // Use the same connection ID that is passed to the client's GetLogWriter callback.
            connID := hdr.DestConnectionID
            if origDestConnID.Len() > 0 {
                connID = origDestConnID
            }
            tracer = config.Tracer(context.WithValue(context.Background(), ConnectionTracingKey, tracingID), protocol.PerspectiveServer, connID)
        }
        conn = s.newConn(
            newSendConn(s.conn, p.remoteAddr, p.info),
            s.connHandler,
            origDestConnID,
            retrySrcConnID,
            hdr.DestConnectionID,
            hdr.SrcConnectionID,
            connID,
            s.connIDGenerator,
            s.connHandler.GetStatelessResetToken(connID),
            config,
            s.tlsConf,
            s.tokenGenerator,
            clientAddrIsValid,
            tracer,
            tracingID,
            s.logger,
            hdr.Version,
        )
        conn.handlePacket(p)

        if q, ok := s.zeroRTTQueues[hdr.DestConnectionID]; ok {
            for _, p := range q.packets {
                conn.handlePacket(p)
            }
            delete(s.zeroRTTQueues, hdr.DestConnectionID)
        }

        return conn, true
    }); !added {
        go func() {
            defer p.buffer.Release()
            if err := s.sendConnectionRefused(p.remoteAddr, hdr, p.info); err != nil {
                s.logger.Debugf("Error rejecting connection: %s", err)
            }
        }()
        return nil
    }
    go conn.run()
    go s.handleNewConn(conn)
    if conn == nil {
        p.buffer.Release()
        return nil
    }
    return nil
}

func (s *baseServer) handleNewConn(conn quicConn) {
    connCtx := conn.Context()
    if s.acceptEarlyConns {
        // wait until the early connection is ready (or the handshake fails)
        select {
        case <-conn.earlyConnReady():
        case <-connCtx.Done():
            return
        }
    } else {
        // wait until the handshake is complete (or fails)
        select {
        case <-conn.HandshakeComplete():
        case <-connCtx.Done():
            return
        }
    }

    atomic.AddInt32(&s.connQueueLen, 1)
    select {
        // 此处的conn就是上面例子(conn, err := listener.Accept(context.Background()))取出的conn
    case s.connQueue <- conn:
        // blocks until the connection is accepted
    case <-connCtx.Done():
        atomic.AddInt32(&s.connQueueLen, -1)
        // don't pass connections that were already closed to Accept()
    }
}

// connection.go


var newConnection = func(
    conn sendConn,
    runner connRunner,
    origDestConnID protocol.ConnectionID,
    retrySrcConnID *protocol.ConnectionID,
    clientDestConnID protocol.ConnectionID,
    destConnID protocol.ConnectionID,
    srcConnID protocol.ConnectionID,
    connIDGenerator ConnectionIDGenerator,
    statelessResetToken protocol.StatelessResetToken,
    conf *Config,
    tlsConf *tls.Config,
    tokenGenerator *handshake.TokenGenerator,
    clientAddressValidated bool,
    tracer logging.ConnectionTracer,
    tracingID uint64,
    logger utils.Logger,
    v protocol.VersionNumber,
) quicConn {
    s := &connection{
        conn:                  conn,
        config:                conf,
        handshakeDestConnID:   destConnID,
        srcConnIDLen:          srcConnID.Len(),
        tokenGenerator:        tokenGenerator,
        oneRTTStream:          newCryptoStream(),
        perspective:           protocol.PerspectiveServer,
        handshakeCompleteChan: make(chan struct{}),
        tracer:                tracer,
        logger:                logger,
        version:               v,
    }
    if origDestConnID.Len() > 0 {
        s.logID = origDestConnID.String()
    } else {
        s.logID = destConnID.String()
    }
    s.connIDManager = newConnIDManager(
        destConnID,
        func(token protocol.StatelessResetToken) { runner.AddResetToken(token, s) },
        runner.RemoveResetToken,
        s.queueControlFrame,
    )
    s.connIDGenerator = newConnIDGenerator(
        srcConnID,
        &clientDestConnID,
        func(connID protocol.ConnectionID) { runner.Add(connID, s) },
        runner.GetStatelessResetToken,
        runner.Remove,
        runner.Retire,
        runner.ReplaceWithClosed,
        s.queueControlFrame,
        connIDGenerator,
    )
    s.preSetup()
    s.ctx, s.ctxCancel = context.WithCancel(context.WithValue(context.Background(), ConnectionTracingKey, tracingID))
    s.sentPacketHandler, s.receivedPacketHandler = ackhandler.NewAckHandler(
        0,
        getMaxPacketSize(s.conn.RemoteAddr()),
        s.rttStats,
        clientAddressValidated,
        s.perspective,
        s.tracer,
        s.logger,
    )
    initialStream := newCryptoStream()
    handshakeStream := newCryptoStream()
    params := &wire.TransportParameters{
        InitialMaxStreamDataBidiLocal:   protocol.ByteCount(s.config.InitialStreamReceiveWindow),
        InitialMaxStreamDataBidiRemote:  protocol.ByteCount(s.config.InitialStreamReceiveWindow),
        InitialMaxStreamDataUni:         protocol.ByteCount(s.config.InitialStreamReceiveWindow),
        InitialMaxData:                  protocol.ByteCount(s.config.InitialConnectionReceiveWindow),
        MaxIdleTimeout:                  s.config.MaxIdleTimeout,
        MaxBidiStreamNum:                protocol.StreamNum(s.config.MaxIncomingStreams),
        MaxUniStreamNum:                 protocol.StreamNum(s.config.MaxIncomingUniStreams),
        MaxAckDelay:                     protocol.MaxAckDelayInclGranularity,
        AckDelayExponent:                protocol.AckDelayExponent,
        DisableActiveMigration:          true,
        StatelessResetToken:             &statelessResetToken,
        OriginalDestinationConnectionID: origDestConnID,
        // For interoperability with quic-go versions before May 2023, this value must be set to a value
        // different from protocol.DefaultActiveConnectionIDLimit.
        // If set to the default value, it will be omitted from the transport parameters, which will make
        // old quic-go versions interpret it as 0, instead of the default value of 2.
        // See https://github.com/quic-go/quic-go/pull/3806.
        ActiveConnectionIDLimit:   protocol.MaxActiveConnectionIDs,
        InitialSourceConnectionID: srcConnID,
        RetrySourceConnectionID:   retrySrcConnID,
    }
    if s.config.EnableDatagrams {
        params.MaxDatagramFrameSize = protocol.MaxDatagramFrameSize
    } else {
        params.MaxDatagramFrameSize = protocol.InvalidByteCount
    }
    if s.tracer != nil {
        s.tracer.SentTransportParameters(params)
    }
    cs := handshake.NewCryptoSetupServer(
        initialStream,
        handshakeStream,
        clientDestConnID,
        conn.LocalAddr(),
        conn.RemoteAddr(),
        params,
        &handshakeRunner{
            onReceivedParams: s.handleTransportParameters,
            onError:          s.closeLocal,
            dropKeys:         s.dropEncryptionLevel,
            onHandshakeComplete: func() {
                runner.Retire(clientDestConnID)
                close(s.handshakeCompleteChan)
            },
        },
        tlsConf,
        conf.Allow0RTT,
        s.rttStats,
        tracer,
        logger,
        s.version,
    )
    s.cryptoStreamHandler = cs
    s.packer = newPacketPacker(srcConnID, s.connIDManager.Get, initialStream, handshakeStream, s.sentPacketHandler, s.retransmissionQueue, s.RemoteAddr(), cs, s.framer, s.receivedPacketHandler, s.datagramQueue, s.perspective)
    s.unpacker = newPacketUnpacker(cs, s.srcConnIDLen)
    s.cryptoStreamManager = newCryptoStreamManager(cs, initialStream, handshakeStream, s.oneRTTStream)
    return s
}

// handlePacket is called by the server with a new packet
func (s *connection) handlePacket(p *receivedPacket) {
    // Discard packets once the amount of queued packets is larger than
    // the channel size, protocol.MaxConnUnprocessedPackets
    select {
    case s.receivedPackets <- p:
    default:
        if s.tracer != nil {
            s.tracer.DroppedPacket(logging.PacketTypeNotDetermined, p.Size(), logging.PacketDropDOSPrevention)
        }
    }
}
// run the connection main loop
func (s *connection) run() error {
    defer s.ctxCancel()

    s.timer = *newTimer()

    handshaking := make(chan struct{})
    go func() {
        defer close(handshaking)
        s.cryptoStreamHandler.RunHandshake()
    }()
    go func() {
        if err := s.sendQueue.Run(); err != nil {
            s.destroyImpl(err)
        }
    }()

    if s.perspective == protocol.PerspectiveClient {
        select {
        case zeroRTTParams := <-s.clientHelloWritten:
            s.scheduleSending()
            if zeroRTTParams != nil {
                s.restoreTransportParameters(zeroRTTParams)
                close(s.earlyConnReadyChan)
            }
        case closeErr := <-s.closeChan:
            // put the close error back into the channel, so that the run loop can receive it
            s.closeChan <- closeErr
        }
    }

    var (
        closeErr           closeError
        sendQueueAvailable <-chan struct{}
    )

runLoop:
    for {
        // Close immediately if requested
        select {
        case closeErr = <-s.closeChan:
            break runLoop
        case <-s.handshakeCompleteChan:
            s.handleHandshakeComplete()
        default:
        }

        s.maybeResetTimer()

        var processedUndecryptablePacket bool
        if len(s.undecryptablePacketsToProcess) > 0 {
            queue := s.undecryptablePacketsToProcess
            s.undecryptablePacketsToProcess = nil
            for _, p := range queue {
                if processed := s.handlePacketImpl(p); processed {
                    processedUndecryptablePacket = true
                }
                // Don't set timers and send packets if the packet made us close the connection.
                select {
                case closeErr = <-s.closeChan:
                    break runLoop
                default:
                }
            }
        }
        // If we processed any undecryptable packets, jump to the resetting of the timers directly.
        if !processedUndecryptablePacket {
            select {
            case closeErr = <-s.closeChan:
                break runLoop
            case <-s.timer.Chan():
                s.timer.SetRead()
                // We do all the interesting stuff after the switch statement, so
                // nothing to see here.
            case <-s.sendingScheduled:
                // We do all the interesting stuff after the switch statement, so
                // nothing to see here.
            case <-sendQueueAvailable:
            case firstPacket := <-s.receivedPackets:
                wasProcessed := s.handlePacketImpl(firstPacket)
                // Don't set timers and send packets if the packet made us close the connection.
                select {
                case closeErr = <-s.closeChan:
                    break runLoop
                default:
                }
                if s.handshakeComplete {
                    // Now process all packets in the receivedPackets channel.
                    // Limit the number of packets to the length of the receivedPackets channel,
                    // so we eventually get a chance to send out an ACK when receiving a lot of packets.
                    numPackets := len(s.receivedPackets)
                receiveLoop:
                    for i := 0; i < numPackets; i++ {
                        select {
                        case p := <-s.receivedPackets:
                            if processed := s.handlePacketImpl(p); processed {
                                wasProcessed = true
                            }
                            select {
                            case closeErr = <-s.closeChan:
                                break runLoop
                            default:
                            }
                        default:
                            break receiveLoop
                        }
                    }
                }
                // Only reset the timers if this packet was actually processed.
                // This avoids modifying any state when handling undecryptable packets,
                // which could be injected by an attacker.
                if !wasProcessed {
                    continue
                }
            case <-s.handshakeCompleteChan:
                s.handleHandshakeComplete()
            }
        }

        now := time.Now()
        if timeout := s.sentPacketHandler.GetLossDetectionTimeout(); !timeout.IsZero() && timeout.Before(now) {
            // This could cause packets to be retransmitted.
            // Check it before trying to send packets.
            if err := s.sentPacketHandler.OnLossDetectionTimeout(); err != nil {
                s.closeLocal(err)
            }
        }

        if keepAliveTime := s.nextKeepAliveTime(); !keepAliveTime.IsZero() && !now.Before(keepAliveTime) {
            // send a PING frame since there is no activity in the connection
            s.logger.Debugf("Sending a keep-alive PING to keep the connection alive.")
            s.framer.QueueControlFrame(&wire.PingFrame{})
            s.keepAlivePingSent = true
        } else if !s.handshakeComplete && now.Sub(s.creationTime) >= s.config.handshakeTimeout() {
            s.destroyImpl(qerr.ErrHandshakeTimeout)
            continue
        } else {
            idleTimeoutStartTime := s.idleTimeoutStartTime()
            if (!s.handshakeComplete && now.Sub(idleTimeoutStartTime) >= s.config.HandshakeIdleTimeout) ||
                (s.handshakeComplete && now.Sub(idleTimeoutStartTime) >= s.idleTimeout) {
                s.destroyImpl(qerr.ErrIdleTimeout)
                continue
            }
        }

        if s.sendQueue.WouldBlock() {
            // The send queue is still busy sending out packets.
            // Wait until there's space to enqueue new packets.
            sendQueueAvailable = s.sendQueue.Available()
            continue
        }
        if err := s.sendPackets(); err != nil {
            s.closeLocal(err)
        }
        if s.sendQueue.WouldBlock() {
            sendQueueAvailable = s.sendQueue.Available()
        } else {
            sendQueueAvailable = nil
        }
    }

    s.cryptoStreamHandler.Close()
    <-handshaking
    s.handleCloseError(&closeErr)
    if e := (&errCloseForRecreating{}); !errors.As(closeErr.err, &e) && s.tracer != nil {
        s.tracer.Close()
    }
    s.logger.Infof("Connection %s closed.", s.logID)
    s.sendQueue.Close()
    s.timer.Stop()
    return closeErr.err
}