yusank / yusank.github.io

Blog
https://yusank.github.io
2 stars 0 forks source link

posts/conn-pool/ #18

Open utterances-bot opened 2 years ago

utterances-bot commented 2 years ago

Go 语言实现连接池 - Yusank`s Site

https://yusank.space/posts/conn-pool/

abserari commented 2 years ago

链接池的设计会有什么缺点吗?

yusank commented 2 years ago

@abserari 设计上来说应该算是比较保守的做法,对外提供的接口不会很多花里胡哨的,尽量提供最基础最核心的用法。更多的能力可以通过实现接口的时候通过初始化参数的方式做到一些功能扩展, 如:

暂时想到这些

J1nDi commented 2 years ago

有上传这个项目吗?

yusank commented 2 years ago

@J1nDi 目前是用在一个项目内部 我创建一个单独项目上传GitHub 后艾特你

yusank commented 2 years ago

Hi @J1nDi, 项目在这里 https://github.com/yusank/conn-pool

J1nDi commented 2 years ago

非常感谢!但是看到初始化和调用的时候,在factory函数这里又卡住了,能麻烦你再写一下main函数调用一下吗?就像这个文章里https://www.cnblogs.com/jkko123/p/7235257.html 麻烦你了🙏

yusank commented 2 years ago

@J1nDi 使用案例是吧 我明天给你补上

其实也不难 factory 就是产生一个连接和关闭的方法。以普通的 tcp 连接为例哈

func (impl)Factory()(interface{},error) {
    c, err := net.Dial("tcp", addr)
    if err != nil {
    return nil, err 
    }

    return c, err
}

func (impl) Close(i interface{}) {
    conn := i.(net.Conn)
    conn.Close()
}

func (impl) Ping(i interface{}) {
    // tcp 层面不支持 ping 操作 所以这里不做任何操作
    // 如果是自定义协议或者 7 层协议就可以有 ping 的检查
}
J1nDi commented 2 years ago

请问netconf的链接可以用这个连接池吗?我用的netconf的包是github.com/Juniper/go-netconf/netconf 获取到链接的方式是session,err := netconf.DialSSH(ip, sshConfig) ssh用的包是golang.org/x/crypto/ssh sshConfig := &ssh.ClientConfig{User:"", Auth: []ssh.AuthMethod{ssh.Password()},HostKeyCallback: ssh.InsecureIgnoreHostKey(),}

yusank commented 2 years ago

@J1nDi 是个连接理论上都可以用的 Factory 设计的时候返回的就是 interface{} ,目的就是为了不限制使用场景。 不过如果你每次连接都是不一样的(比如每次连接的 ip 不一样)那你这个场景就不适合用连接池,你可能需要一个 map 结构去存 ip->conn 的结构

J1nDi commented 2 years ago

谢谢你的建议,昨天提到的调用案例也麻烦上传一下,感谢!刚接触go不久,不是太懂factory函数,想借此机会学习一下 func (impl)Factory()(interface{},error) { c, err := net.Dial("tcp", addr) if err != nil { return nil, err }

return c, err

} 还有这里的addr没有传参进来是怎么使用的

yusank commented 2 years ago

@J1nDi 嗯嗯 我看我晚上有时间补上吧。


Factory 不是 go 里的概念是 工厂模式 是一个编程模式与语言无关可以花时间了解一下。


在这个方法里 addr 肯定是实现这个接口的 impl 提供。比如:

type impl struct {
    addr string
    dialTimeout time.Duration
    // ... other needed fields
}
func (i *impl)Factory()(interface{},error) {
    c, err := net.Dial("tcp", i.addr)
    if err != nil {
    return nil, err 
    }

    return c, err
}
smelike commented 2 years ago

chanelPool 中的连接缓冲区   connReqs                 []chan connReq 这里的 []chan connReq  实则与 idleConn 相同的? 

idleTimeout, waitTimeOut time.Duration

smelike commented 2 years ago

Line 98: c.Release() // undefined

smelike commented 2 years ago

c.Release() 找到了,我还没看到后面的代码。

smelike commented 2 years ago

我现在用的是 1.18,在 func NewChannelPool 中,最后的 return c, nil 报错提示: can not use c as Pool value

yusank commented 2 years ago

@smelike connReqsidleConn 不太一样的,connReqs 更多是一个缓冲区,当有大量连接需要获取 & 释放时,可以通过这个缓冲区来暂存(而不是马上释放)放回去的连接然后有新的Get 请求时,直接从缓冲区返回一个可用连接,这样可以避免连接被大量释放和重新连接的过程。而 idleConn 就是确保至少有 N 个连接可用,这个概念很普遍就是最大控线连接

yusank commented 2 years ago

@smelike 看一下源码:https://github.com/yusank/conn-pool, 估计是漏了哪个方法 跟 go 版本没关系的

smelike commented 2 years ago

@yusank 我找到了原因。与 func (c *channelPool) Put(conn interface{}) error {} 相关。 但是根本原因是什么呢?我还是没看懂代码。 你可以稍微解释一下?感谢。

yusank commented 2 years ago

@smelike 也很简单,就是 channelPool 实现了 interface Pool 的定义,而下面这个方法的返回值就是 Pool 而不是 channelPool

func NewChannelPool(poolConfig *PoolConfig) (Pool, error) {

所以当 channelPool 没有实现 Pool 的时候 这个方法就会报错的

ZhangDahe commented 1 year ago

connReqs的设计会不会有问题? 1. 比如请求创建大量连接, 又没有放回去的连接. 导致 connReqs里存在了相当多的channel. 2. 先放回去连接, 执行了 req <- connReq{ idleConn: &idleConn{conn: conn, t: time.Now()},}, 然后又没有地方从这个req接收数据啊

yusank commented 1 year ago

@ZhangDahe 其实是有地方接受这个数据的 https://github.com/yusank/conn-pool/blob/34cda24dbefd3f4e88643f205b224ac7c1e9dcb2/pool.go#L141-L159

default:
    c.mu.Lock()
    log.Printf("openConn %v %v", c.openingConns, c.maxActive)
    if c.openingConns >= c.maxActive {
        req := make(chan connReq, 1)
                 // 正是这里 append 的 channel,在 put 方法里读取这个数组的第一个元素去写的 conn
        c.connReqs = append(c.connReqs, req)
        c.mu.Unlock()
        ret, ok := <-req
        if !ok {
            return nil, ErrMaxActiveConnReached
        }
        if timeout := c.idleTimeout; timeout > 0 {
            if ret.idleConn.t.Add(timeout).Before(time.Now()) {
                //丢弃并关闭该连接
                _ = c.Close(ret.idleConn.conn)
                continue
            }
        }
        return ret.idleConn.conn, nil
    }
lawlielt commented 1 year ago

demo测试发现几个问题,希望不吝赐教。

  1. Put和Close的锁是不是死锁了。
  2. 这种方式,感觉还是有些浪费,大于maxIdle的链接,建立之后,使用一次就释放了,是不是小于maxActive的部分,可以保留一段儿时间。
  3. 如果连接后续操作比较耗时的话,是不是可以考虑限制最大等待队列长度。
yusank commented 1 year ago

@lawlielt

  1. 不会
  2. 可以这么搞,但是这个时间就是实现者自己去把控了,可以做一个二层缓冲区,超过最大空闲连接的部分的连接在这里暂时托管,但是这个过程以及和其他空闲连接的关系管理可能会稍微麻烦些
  3. 可以的,factory 层面可以做一个超时管理 或者连接池加对应的逻辑,我觉得完全可以根据实际情况扩展或者二次修改,毕竟这个项目也不算很完善,我也只在一些数据库连接和 mq 连接数用了(live 环境)实际效果还可以,但是没有遇到你说的耗时长的情况
JanC238 commented 1 year ago

非常感谢大佬的详细教程,受益匪浅 我在学习过程中也发现 PutClose 的锁会死锁 通过测试发现是在 Put 方法末尾 return c.Close(conn) 时,return 的执行顺序先于 deferPut 方法里还未 defer 解锁,Close 方法就尝试加锁了

yusank commented 1 year ago

Hi @JanC238, 这块确实是有这个问题我一开始也是遇到这个问题了,后来我只是在内部版本里修复 博客这块我忘了更新(sorry for the trouble),具体解决方案是这样的, 将 Close 拆分成有锁和无锁两个方法,在 Put 这种已经拿到锁的方法里就是直接调用无锁的Close即可避免这个问题。