TFdream / blog

个人技术博客,博文写在 Issues 里。
Apache License 2.0
129 stars 18 forks source link

阿里巴巴Druid经常出现连接不可用的问题 #362

Open TFdream opened 3 years ago

TFdream commented 3 years ago

我司后端以前遗留的Spring Boot服务数据库连接池都用的阿里巴巴Druid,最近隔三差五会收到异常报警:

org.springframework.dao.RecoverableDataAccessException: 
### Error querying database.  Cause: com.mysql.cj.jdbc.exceptions.CommunicationsException: Communications link failure

The last packet successfully received from the server was 59,911 milliseconds ago. The last packet sent successfully to the server was 59,912 milliseconds ago.
### The error may exist in URL [jar:file:/data/app.jar!/BOOT-INF/classes!/mapper/eweishop/OrderGoodsMapper.xml]
### The error may involve com.renzhenmall.oms.storage.mapper.eweishop.OrderGoodsMapper.selectByPrimaryKey-Inline
### The error occurred while setting parameters
### SQL: select            id, uid, shop_id, order_id, goods_id, status, member_id, price, price_unit, price_original,      price_discount, price_change, credit, credit_unit, credit_original, coupon_price,      member_discount_price, total, title, option_id, option_title, thumb, goods_code,      unit, weight, volume, deduct_credit, add_credit, form_data, refund_status, refund_type,      refund_id, package_id, package_cancel_reason, comment_status, activity_package, create_time,      dispatch, is_join_member_price, member_price, source, commission_self_money, cost_price,      parent_shop_id, parent_order_id         from es_shop_order_goods     where id = ?
### Cause: com.mysql.cj.jdbc.exceptions.CommunicationsException: Communications link failure

The last packet successfully received from the server was 59,911 milliseconds ago. The last packet sent successfully to the server was 59,912 milliseconds ago.
; Communications link failure

The last packet successfully received from the server was 59,911 milliseconds ago. The last packet sent successfully to the server was 59,912 milliseconds ago.; nested exception is com.mysql.cj.jdbc.exceptions.CommunicationsException: Communications link failure

The last packet successfully received from the server was 59,911 milliseconds ago. The last packet sent successfully to the server was 59,912 milliseconds ago.

这个错误的原因很明显,使用了一个已经失效的连接去做db操作,就会返回这个错误。也就是说db操作的时候从数据库连接池里拿到了一个不可用的连接。

但是正常来说这种情况是不会发生的,因为我们是配置了time-between-eviction-runs-millis的,理论上应该是会定时对连接池中的连接做检查的,但是实际结果显然并没有。

spring.datasource.eweishop.master.initial-size = 3
spring.datasource.eweishop.master.max-active = 100
spring.datasource.eweishop.master.min-idle = 1
spring.datasource.eweishop.master.max-wait = 20000
spring.datasource.eweishop.master.test-on-borrow = false
spring.datasource.eweishop.master.test-on-return = false
spring.datasource.eweishop.master.test-while-idle = true
spring.datasource.eweishop.master.validation-query = SELECT 'x'
spring.datasource.eweishop.master.time-between-eviction-runs-millis = 60000
spring.datasource.eweishop.master.min-evictable-idle-time-millis = 300000

这也是官方文档上推荐的配置,那么问题到底出在哪儿了?

注:依赖的druid-spring-boot-starter版本为1.1.22

<dependency>
   <groupId>com.alibaba</groupId>
   <artifactId>druid-spring-boot-starter</artifactId>
   <version>1.1.22</version>
</dependency>

只能去翻druid源码了,DruidDataSourceWrapper 它继承了com.alibaba.druid.pool.DruidDataSource


public class DruidDataSource extends DruidAbstractDataSource implements DruidDataSourceMBean, ManagedDataSource, Referenceable, Closeable, Cloneable, ConnectionPoolDataSource, MBeanRegistration {

    public DruidDataSource(){
        this(false);
    }

    public DruidDataSource(boolean fairLock){
        super(fairLock);

        configFromPropety(System.getProperties());
    }

}

在核心类DruidDataSource中有个init方法:

    public void init() throws SQLException {
        if (inited) {
            return;
        }

        // bug fixed for dead lock, for issue #2980
        DruidDriver.getInstance();

        final ReentrantLock lock = this.lock;
        try {
            lock.lockInterruptibly();
        } catch (InterruptedException e) {
            throw new SQLException("interrupt", e);
        }

        boolean init = false;
        try {
            if (inited) {
                return;
            }
            createAndLogThread();
            createAndStartCreatorThread();
            createAndStartDestroyThread();

            initedLatch.await();
            init = true;
        } catch (SQLException e) {
            LOG.error("{dataSource-" + this.getID() + "} init error", e);
            throw e;
        } finally {
            inited = true;
            lock.unlock();
            }
        }
}

在 init方法中会执行 createAndStartDestroyThread方法来创建一个定时任务来执行DestroyTask:

    // threads
    private volatile ScheduledFuture<?>      destroySchedulerFuture;
    private DestroyTask                      destroyTask;

    protected void createAndStartDestroyThread() {
        destroyTask = new DestroyTask();

        if (destroyScheduler != null) {
            long period = timeBetweenEvictionRunsMillis;
            if (period <= 0) {
                period = 1000;
            }
            destroySchedulerFuture = destroyScheduler.scheduleAtFixedRate(destroyTask, period, period,
                                                                          TimeUnit.MILLISECONDS);
            initedLatch.countDown();
            return;
        }

        String threadName = "Druid-ConnectionPool-Destroy-" + System.identityHashCode(this);
        destroyConnectionThread = new DestroyConnectionThread(threadName);
        destroyConnectionThread.start();
    }

这里 你可能会好奇 init方法是在哪里被调用的,我们来看看 DruidDataSource#getConnection 方法:

    @Override
    public DruidPooledConnection getConnection() throws SQLException {
        return getConnection(maxWait);
    }

    public DruidPooledConnection getConnection(long maxWaitMillis) throws SQLException {
        init();

        if (filters.size() > 0) {
            FilterChainImpl filterChain = new FilterChainImpl(this);
            return filterChain.dataSource_connect(this, maxWaitMillis);
        } else {
            return getConnectionDirect(maxWaitMillis);
        }
    }

在getConnection方法第一行就会调用 init方法来执行初始化。

DestroyTask

接着,我们来看DestroyTask 都做了些什么:


    public class DestroyTask implements Runnable {
        public DestroyTask() {

        }

        @Override
        public void run() {
            shrink(true, keepAlive);

            if (isRemoveAbandoned()) {
                removeAbandoned();
            }
        }

    }

其中isRemoveAbandoned()没有配置,默认是false的。所以,这个task就执行了一句话:shrink(true, keepAlive)

在shrink方法里面的最后我们可以看到有这段代码:


    public void shrink(boolean checkTime, boolean keepAlive) {
        try {
            lock.lockInterruptibly();
        } catch (InterruptedException e) {
            return;
        }

        boolean needFill = false;
        int evictCount = 0;
        int keepAliveCount = 0;

       //省略...

        if (keepAliveCount > 0) {
            // keep order
            for (int i = keepAliveCount - 1; i >= 0; --i) {
                DruidConnectionHolder holer = keepAliveConnections[i];
                Connection connection = holer.getConnection();
                holer.incrementKeepAliveCheckCount();

                boolean validate = false;
                try {
                    this.validateConnection(connection);  //重点
                    validate = true;
                } catch (Throwable error) {
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("keepAliveErr", error);
                    }
                    // skip
                }

                boolean discard = !validate;
                if (validate) {
                    holer.lastKeepTimeMillis = System.currentTimeMillis();
                    boolean putOk = put(holer, 0L);
                    if (!putOk) {
                        discard = true;
                    }
                }

                if (discard) {
                    try {
                        connection.close();
                    } catch (Exception e) {
                        // skip
                    }

                    lock.lock();
                    try {
                        discardCount++;

                        if (activeCount + poolingCount <= minIdle) {
                            emptySignal();
                        }
                    } finally {
                        lock.unlock();
                    }
                }
            }
            this.getDataSourceStat().addKeepAliveCheckCount(keepAliveCount);
            Arrays.fill(keepAliveConnections, null);
        }
}

我们重点关注 validateConnection 方法:


    public void validateConnection(Connection conn) throws SQLException {
        String query = getValidationQuery();
        if (conn.isClosed()) {
            throw new SQLException("validateConnection: connection closed");
        }

        if (validConnectionChecker != null) {
            boolean result = true;
            Exception error = null;
            try {
                result = validConnectionChecker.isValidConnection(conn, validationQuery, validationQueryTimeout);

                if (result && onFatalError) {
                    lock.lock();
                    try {
                        if (onFatalError) {
                            onFatalError = false;
                        }
                    } finally {
                        lock.unlock();
                    }
                }
            } catch (SQLException ex) {
                throw ex;
            } catch (Exception ex) {
                error = ex;
            }

            if (!result) {
                SQLException sqlError = error != null ? //
                    new SQLException("validateConnection false", error) //
                    : new SQLException("validateConnection false");
                throw sqlError;
            }
            return;
        }

        if (null != query) {
            Statement stmt = null;
            ResultSet rs = null;
            try {
                stmt = conn.createStatement();
                if (getValidationQueryTimeout() > 0) {
                    stmt.setQueryTimeout(getValidationQueryTimeout());
                }
                rs = stmt.executeQuery(query);
                if (!rs.next()) {
                    throw new SQLException("validationQuery didn't return a row");
                }

                if (onFatalError) {
                    lock.lock();
                    try {
                        if (onFatalError) {
                            onFatalError = false;
                        }
                    }
                    finally {
                        lock.unlock();
                    }
                }
            } finally {
                JdbcUtils.close(rs);
                JdbcUtils.close(stmt);
            }
        }
    }

validConnectionChecker 初始化逻辑在 DruidDataSource#initValidConnectionChecker()方法中:


    private void initValidConnectionChecker() {
        if (this.validConnectionChecker != null) {
            return;
        }

        String realDriverClassName = driver.getClass().getName();
        if (JdbcUtils.isMySqlDriver(realDriverClassName)) {
            this.validConnectionChecker = new MySqlValidConnectionChecker();

        } else if (realDriverClassName.equals(JdbcConstants.ORACLE_DRIVER)
                || realDriverClassName.equals(JdbcConstants.ORACLE_DRIVER2)) {
            this.validConnectionChecker = new OracleValidConnectionChecker();

        } else if (realDriverClassName.equals(JdbcConstants.SQL_SERVER_DRIVER)
                   || realDriverClassName.equals(JdbcConstants.SQL_SERVER_DRIVER_SQLJDBC4)
                   || realDriverClassName.equals(JdbcConstants.SQL_SERVER_DRIVER_JTDS)) {
            this.validConnectionChecker = new MSSQLValidConnectionChecker();

        } else if (realDriverClassName.equals(JdbcConstants.POSTGRESQL_DRIVER)
                || realDriverClassName.equals(JdbcConstants.ENTERPRISEDB_DRIVER)
                || realDriverClassName.equals(JdbcConstants.POLARDB_DRIVER)) {
            this.validConnectionChecker = new PGValidConnectionChecker();
        }
    }

由于 我们使用的MySQL数据库,所以直接看 MySqlValidConnectionChecker类:

public class MySqlValidConnectionChecker extends ValidConnectionCheckerAdapter implements ValidConnectionChecker, Serializable {

    public static final int DEFAULT_VALIDATION_QUERY_TIMEOUT = 1;
    public static final String DEFAULT_VALIDATION_QUERY = "SELECT 1";

    public boolean isValidConnection(Connection conn, String validateQuery, int validationQueryTimeout) throws Exception {
        if (conn.isClosed()) {
            return false;
        }

        String query = validateQuery;
        if (validateQuery == null || validateQuery.isEmpty()) {
            query = DEFAULT_VALIDATION_QUERY;
        }

        Statement stmt = null;
        ResultSet rs = null;
        try {
            stmt = conn.createStatement();
            if (validationQueryTimeout > 0) {
                stmt.setQueryTimeout(validationQueryTimeout);
            }
            rs = stmt.executeQuery(query);
            return true;
        } finally {
            JdbcUtils.close(rs);
            JdbcUtils.close(stmt);
        }

    }
}

解决办法

从代码中可以看出来,keepAlive的默认值是false,这样就会导致每次shrink之后的keepAliveCount==0,也就进不到最后检查存活连接的逻辑中。并且会导致minIdle的配置失效,连接池最后会被回收至只剩0个连接。所以,我们还差一下配置,keepAlive必须配置为true。