silenceper / pool

🚌 A golang general network connection poolction pool
MIT License
829 stars 212 forks source link

全部连接被关闭后导致hang(阻塞) #32

Open upwell opened 3 years ago

upwell commented 3 years ago

现象:

当有较多的goroutine在等待从Pool中Get连接,同时没有空闲连接时,有种情况,当被使用的连接,同时出错并被调用者Close掉时,等待获取连接的goroutine会一直阻塞等待从connReq这个channel获取连接,而所有的连接都已经被关闭了,不会再有可用的连接,导致一直等待,程序hang住。

原因

Get方法中,第133行检查完已创建的连接数达到了最大值后,会等待从connReq的channel等待其它goroutine Put回来的连接。 在Close方法中,会把openingConns--,但已经没法触发Get方法去新建连接了,Get方法都被阻塞在从connReq的channel拿连接了。

Get方法133行 https://github.com/silenceper/pool/blob/58e025e48bf97061c9f2638564fd6857092caeb4/channel.go#L133

https://github.com/silenceper/pool/blob/58e025e48bf97061c9f2638564fd6857092caeb4/channel.go#L130-L140

Close方法 https://github.com/silenceper/pool/blob/58e025e48bf97061c9f2638564fd6857092caeb4/channel.go#L202-L213

silenceper commented 3 years ago

多谢反馈。 这里的问题应该是原本Get的请求会被阻塞住而导致一直无法获得连接,而如果是新建的Get方法,还是会通过新建连接返回。

这里应该在这里等待加一个超时返回,你觉得呢? @upwell

upwell commented 3 years ago

@silenceper 之前有考虑解法,有两个思路:

超时重试可以解决后续可能出现的类似问题,但超时时间要斟酌一下,设的太短,容易误触发重试,设的太长,可能会影响业务逻辑。

我用了解法2改了个版本,相对简单些,就只是解决这个特定问题了。

Close方法

type connReq struct {
    idleConn *idleConn
    reCheck  bool
}

func (c *channelPool) Close(conn interface{}) error {
    if conn == nil {
        return errors.New("connection is nil. rejecting")
    }
    c.mu.Lock()
    defer c.mu.Unlock()
    if c.close == nil {
        return nil
    }
    c.openingConns--

    if l := len(c.connReqs); l > 0 {
        req := c.connReqs[0]
        copy(c.connReqs, c.connReqs[1:])
        c.connReqs = c.connReqs[:l-1]
        req <- connReq{
            idleConn: nil,
            reCheck:  true,
        }
    }

    return c.close(conn)
}

Get方法

  if c.openingConns >= c.maxActive {                                                                                                                                                                              
      req := make(chan connReq, 1)                                                                    
      c.connReqs = append(c.connReqs, req)                                                            
      c.mu.Unlock()                                                                                   
      ret, ok := <-req                                                                                
      if !ok {                                                                                        
          return nil, ErrMaxActiveConnReached                                                         
      }                                                                                               
      if ret.reCheck {                                                                                
          continue                                                                                    
      }                                                                                               

      if timeout := c.idleTimeout; timeout > 0 {                                                      
          if ret.idleConn.t.Add(timeout).Before(time.Now()) {                                         
              //丢弃并关闭该连接                                                                      
              c.Close(ret.idleConn.conn)                                                              
              continue                                                                                
          }                                                                                           
      }                                                                                               
      return ret.idleConn.conn, nil                                                                   
  }    
silenceper commented 3 years ago

看了下database/sql中提供的连接池,通过使用者传入context包,让调用方进行控制

upwell commented 3 years ago

@silenceper 昨天仔细追了下database/sql的代码,发现其实是在finalClose方法里面做了判断,对于需要关闭的连接,根据情况判断是否需要开一个新的连接,这个思路挺好的,可以借鉴过来。

    dc.db.maybeOpenNewConnections()
func (dc *driverConn) finalClose() error {
    var err error

    // Each *driverStmt has a lock to the dc. Copy the list out of the dc
    // before calling close on each stmt.
    var openStmt []*driverStmt
    withLock(dc, func() {
        openStmt = make([]*driverStmt, 0, len(dc.openStmt))
        for ds := range dc.openStmt {
            openStmt = append(openStmt, ds)
        }
        dc.openStmt = nil
    })
    for _, ds := range openStmt {
        ds.Close()
    }
    withLock(dc, func() {
        dc.finalClosed = true
        err = dc.ci.Close()
        dc.ci = nil
    })

    dc.db.mu.Lock()
    dc.db.numOpen--
    dc.db.maybeOpenNewConnections()
    dc.db.mu.Unlock()

    atomic.AddUint64(&dc.db.numClosed, 1)
    return err
}

https://github.com/golang/go/blob/master/src/database/sql/sql.go#L574-L603

silenceper commented 3 years ago

这个可以用,我找时间把这块优化下

ansoda commented 3 years ago

我也遇到了同样的close后hang住的问题,看能否更新一个版本优化下

upwell commented 3 years ago

提了一个 pull request来修复这个问题,参考了 database/sql 的代码。 @silenceper 我放在自己的项目里面跑了一段时间,暂时没有发现问题。

bli22ard commented 6 months ago

提了一个 pull request来修复这个问题,参考了 database/sql 的代码。 @silenceper 我放在自己的项目里面跑了一段时间,暂时没有发现问题。

我看了下你修复方法, 大概思路是,在关闭的时候创建连接,用来激活 ret, ok := <-req , 这种方式还会存在一些问题,如果createConnection一直返回失败,阻塞会一直进行下去,实际业务中, 你可能需要给前端返回系统错误,而不是后端一直处于等待状态,这种可能出现的无限等待不是很合理。 我觉得根本问题是获取连接没有超时时间,如果有超时时间,这些问题都迎刃而解