coleifer / peewee

a small, expressive orm -- supports postgresql, mysql, sqlite and cockroachdb
http://docs.peewee-orm.com/
MIT License
11.06k stars 1.37k forks source link

Performance bottleneck due to locking operations in PooledMySQLDatabase #2894

Closed Vinshare closed 4 months ago

Vinshare commented 4 months ago

Hello,

I'm using Peewee version 3.13.3 in my project. Recently, I encountered a performance issue. While troubleshooting, I read through the code of PooledMySQLDatabase and found potential issues in the connect and close methods.

To ensure thread safety, locking operations are performed during connect and close, and the creation and closing of database connection objects also occur within the locking process.

I've set max_connections to 100 and stale_timeout to 180. This means that every 180 seconds, all connections in my connection pool expire. The next attempt to connect will cyclically close these 100 connection objects. Due to the presence of locks, this process occurs in a single thread. Unfortunately, during this time, I encountered network issues, leading to delays in the entire service.

Of course, this doesn't mean Peewee is bad; I'm just seeking help. Is there a better way to configure this?

Related code here:

class PooledDatabase(object):
    ......
    def _connect(self):
        while True:
            try:
                # Remove the oldest connection from the heap.
                ts, conn = heapq.heappop(self._connections)
                key = self.conn_key(conn)
            except IndexError:
                ts = conn = None
                logger.debug('No connection available in pool.')
                break
            else:
                if self._is_closed(conn):
                    # This connecton was closed, but since it was not stale
                    # it got added back to the queue of available conns. We
                    # then closed it and marked it as explicitly closed, so
                    # it's safe to throw it away now.
                    # (Because Database.close() calls Database._close()).
                    logger.debug('Connection %s was closed.', key)
                    ts = conn = None
                elif self._stale_timeout and self._is_stale(ts):
                    # If we are attempting to check out a stale connection,
                    # then close it. We don't need to mark it in the "closed"
                    # set, because it is not in the list of available conns
                    # anymore.
                    logger.debug('Connection %s was stale, closing.', key)
                    self._close(conn, True)
                    ts = conn = None
                else:
                    break

        if conn is None:
            if self._max_connections and (
                    len(self._in_use) >= self._max_connections):
                raise MaxConnectionsExceeded('Exceeded maximum connections.')
            conn = super(PooledDatabase, self)._connect()
            ts = time.time() - random.random() / 1000
            key = self.conn_key(conn)
            logger.debug('Created new connection %s.', key)

        self._in_use[key] = PoolConnection(ts, conn, time.time())
        return conn
    ......
    def _close(self, conn, close_conn=False):
        key = self.conn_key(conn)
        if close_conn:
            super(PooledDatabase, self)._close(conn)
        elif key in self._in_use:
            pool_conn = self._in_use.pop(key)
            if self._stale_timeout and self._is_stale(pool_conn.timestamp):
                logger.debug('Closing stale connection %s.', key)
                super(PooledDatabase, self)._close(conn)
            elif self._can_reuse(conn):
                logger.debug('Returning %s to pool.', key)
                heapq.heappush(self._connections, (pool_conn.timestamp, conn))
            else:
                logger.debug('Closed %s.', key)

class Database(_callable_context_manager):
    ......
    def connect(self, reuse_if_open=False):
        with self._lock:
            if self.deferred:
                raise InterfaceError('Error, database must be initialized '
                                     'before opening a connection.')
            if not self._state.closed:
                if reuse_if_open:
                    return False
                raise OperationalError('Connection already opened.')

            self._state.reset()
            with __exception_wrapper__:
                self._state.set_connection(self._connect())
                if self.server_version is None:
                    self._set_server_version(self._state.conn)
                self._initialize_connection(self._state.conn)
        return True
    ......  
    def close(self):
        with self._lock:
            if self.deferred:
                raise InterfaceError('Error, database must be initialized '
                                     'before opening a connection.')
            if self.in_transaction():
                raise OperationalError('Attempting to close database while '
                                       'transaction is open.')
            is_open = not self._state.closed
            try:
                if is_open:
                    with __exception_wrapper__:
                        self._close(self._state.conn)
            finally:
                self._state.reset()
            return is_open
coleifer commented 4 months ago

I've set max_connections to 100 and stale_timeout to 180. This means that every 180 seconds, all connections in my connection pool expire.

This stood out to me, as this would imply you're opening 100 connections at approximately the same time. That will absolutely exacerbate any kind of latency issues if your db server is slow in accepting connections.

You can try increasing the MySQL server's idle time and bumping up the stale timeout, but this sounds like the issue is in your connection management strategy. The pool implementation is fairly simple and you can write your own version that would be more tailored to your usage if you prefer going that route.