Druid数据库连接异常:connection holder is null

最近在做业务开发的时候,线上出现了定时任务实行失败的异常,查看日志,最主要的原因是提示: connection holder is null, 经过代码排查了,是由于druid的连接池导致,所以记录下这个原因排查详细过程。

环境配置

由于之前的项目是比较老的,所以对druid的版本相对要老一点,使用的是1.1.7版本,具体maven如下:

<dependency>
  <groupId>com.alibaba</groupId>
  <artifactId>druid</artifactId>
  <version>1.1.7</version>
</dependency>

示例师范

druid配置

示例采用的properties的配置方式, 具体配置如下:

url=jdbc:mysql://192.168.3.37:3306/learn
username=root
password=mysql
filters=config
maxActive=20
initialSize=1
maxWait=6000
minIdle=1
timeBetweenEvictionRunsMillis=60000
minEvictableIdleTimeMillis=30000
testWhileIdle=true
testOnBorrow=true
testOnReturn=true
poolPreparedStatements=true
maxOpenPreparedStatements=20
asyncInit=true
removeAbandoned=true

以上的配置是比较常规的配置信息, 其中跟此次实践有关的,主要为removeAbandoned=true配置信息, 将在后面讲解

示例代码

package com.druid;

import com.alibaba.druid.pool.DruidDataSourceFactory;

import javax.sql.DataSource;
import java.io.InputStream;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.Properties;
import java.util.concurrent.TimeUnit;

public class DruidDataSourceTest {

    public static void main(String[] args) {
        // 记载配置文件
        InputStream is = DruidDataSourceTest.class.getResourceAsStream("/druid.properties");
        Properties properties = new Properties();
        Connection connection = null;
        DataSource dataSource = null;
        try {
            properties.load(is);
            System.out.println("加载配置w完成...");

            // 创建factory
            dataSource = DruidDataSourceFactory.createDataSource(properties);
            System.out.println("连接池创建成功");

            // 获取Connection
            connection = dataSource.getConnection();
            connection.setAutoCommit(false);
            // 模拟处理业务逻辑
            System.out.println("执行业务逻辑..");
            PreparedStatement statement = connection.prepareStatement("select * from user");
            statement.execute();
            Thread.sleep(TimeUnit.MINUTES.toMillis(6));
            System.out.println("业务逻辑执行完成..");
            connection.commit();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {

            try {
                if (connection != null) {
                    connection.close();
                }
            } catch (SQLException e) {
                // ignore
            }
        }

    }
}

这段代码使用了Thread.sleep的方式模拟了一个业务逻辑,跟遇到的处理任务处理批量任务比较类似,然后我们将任务跑起来,在任务执行完成后,将的到一下的错误信息:

java.sql.SQLException: connection holder is null
    at com.alibaba.druid.pool.DruidPooledConnection.checkStateInternal(DruidPooledConnection.java:1157)
    at com.alibaba.druid.pool.DruidPooledConnection.checkState(DruidPooledConnection.java:1148)
    at com.alibaba.druid.pool.DruidPooledConnection.commit(DruidPooledConnection.java:743)
    at com.druid.DruidDataSourceTest.main(DruidDataSourceTest.java:38)

源码分析

在以上的还原现场的情况下,和我们预期的到的很相似,因此我们通过查看源码的方式查看,为什么会的到这样的结果。

DruidDatasourceFactory

这个类看名字基本上就能够知道,作为工厂创建DruidDatasource对象,可以简单看一下代码:

public static DataSource createDataSource(Properties properties) throws Exception {
        return createDataSource((Map) properties);
    }

public static DataSource createDataSource(Map properties) throws Exception {
        DruidDataSource dataSource = new DruidDataSource();
        config(dataSource, properties);
        return dataSource;
    }
public static void config(DruidDataSource dataSource, Map<?, ?> properties) throws SQLException {
        ...  
        value = (String) properties.get(PROP_INIT);
        if ("true".equals(value)) {
            dataSource.init();
        }
}

在类中最重要的就是config方法,config方法主要是从map中读取各种配置,然后加载到Datasource对象中,便于直接使用Datasource获取数据库连接信息。在config最后一段代码中,可以看出,Datasource本身包含了初始化init()方法,该方法会在Datasource正式使用的时候执行必要的初始化操作,该方法调用可以通过配置init=true控制或者再获取connection的时候执行延迟初始化。

DruidDatasource

该类是作为核心类,因为我们使用最多的还是通过Datasource.getConnection()方法,因此直接查看getConnection()方法源码:

@Override
   public DruidPooledConnection getConnection() throws SQLException {
       return getConnection(maxWait);
   }

   public DruidPooledConnection getConnection(long maxWaitMillis) throws SQLException {
       init();

       if (filters.size() > 0) {
           FilterChainImpl filterChain = new FilterChainImpl(this);
           return filterChain.dataSource_connect(this, maxWaitMillis);
       } else {
           return getConnectionDirect(maxWaitMillis);
       }
   }

在获取Connection链接的时候,都会优先执行初始化的操作.

init()

Datasource正式投入使用的时候,都会执行init()方法,完成初始化操作。直接看init()源码:

public void init() throws SQLException {
        // 是否已经初始化完成, 如果是则直接返回
        if (inited) {
            return;
        }

        // 获取锁,保证只有线程执行初始化操作
        final ReentrantLock lock = this.lock;
        try {
            lock.lockInterruptibly();
        } catch (InterruptedException e) {
            throw new SQLException("interrupt", e);
        }

        boolean init = false;
        try {
            if (inited) {
                return;
            }

            initStackTrace = Utils.toString(Thread.currentThread().getStackTrace());

            this.id = DruidDriver.createDataSourceId();
            if (this.id > 1) {
                long delta = (this.id - 1) * 100000;
                this.connectionIdSeed.addAndGet(delta);
                this.statementIdSeed.addAndGet(delta);
                this.resultSetIdSeed.addAndGet(delta);
                this.transactionIdSeed.addAndGet(delta);
            }

            if (this.jdbcUrl != null) {
                this.jdbcUrl = this.jdbcUrl.trim();
                initFromWrapDriverUrl();
            }

            // 初始化filter对象
            for (Filter filter : filters) {
                filter.init(this);
            }

            // 获取数据库类型
            if (this.dbType == null || this.dbType.length() == 0) {
                this.dbType = JdbcUtils.getDbType(jdbcUrl, null);
            }

            if (JdbcConstants.MYSQL.equals(this.dbType)
                    || JdbcConstants.MARIADB.equals(this.dbType)
                    || JdbcConstants.ALIYUN_ADS.equals(this.dbType)) {
                boolean cacheServerConfigurationSet = false;
                if (this.connectProperties.containsKey("cacheServerConfiguration")) {
                    cacheServerConfigurationSet = true;
                } else if (this.jdbcUrl.indexOf("cacheServerConfiguration") != -1) {
                    cacheServerConfigurationSet = true;
                }
                if (cacheServerConfigurationSet) {
                    this.connectProperties.put("cacheServerConfiguration", "true");
                }
            }

            ..... // 省略验证代码

            // 通过SPI的方式加载服务, 具体是什么,不在关注范围内
            initFromSPIServiceLoader();

            // 加载driver类
            if (this.driver == null) {
                if (this.driverClass == null || this.driverClass.isEmpty()) {
                    this.driverClass = JdbcUtils.getDriverClassName(this.jdbcUrl);
                }

                if (MockDriver.class.getName().equals(driverClass)) {
                    driver = MockDriver.instance;
                } else {
                    driver = JdbcUtils.createDriver(driverClassLoader, driverClass);
                }
            } else {
                if (this.driverClass == null) {
                    this.driverClass = driver.getClass().getName();
                }
            }

            initCheck();

            initExceptionSorter();
            initValidConnectionChecker();
            validationQueryCheck();

            if (isUseGlobalDataSourceStat()) {
                dataSourceStat = JdbcDataSourceStat.getGlobal();
                if (dataSourceStat == null) {
                    dataSourceStat = new JdbcDataSourceStat("Global", "Global", this.dbType);
                    JdbcDataSourceStat.setGlobal(dataSourceStat);
                }
                if (dataSourceStat.getDbType() == null) {
                    dataSourceStat.setDbType(this.dbType);
                }
            } else {
                dataSourceStat = new JdbcDataSourceStat(this.name, this.jdbcUrl, this.dbType, this.connectProperties);
            }
            dataSourceStat.setResetStatEnable(this.resetStatEnable);

            // 初始化连接池管理缓存,其实就是数组
            connections = new DruidConnectionHolder[maxActive];
            evictConnections = new DruidConnectionHolder[maxActive];
            keepAliveConnections = new DruidConnectionHolder[maxActive];

            SQLException connectError = null;

            boolean asyncInit = this.asyncInit && createScheduler == null;
            if (!asyncInit) {
                try {
                    // init connections
                    for (int i = 0, size = getInitialSize(); i < size; ++i) {
                        PhysicalConnectionInfo pyConnectInfo = createPhysicalConnection();
                        DruidConnectionHolder holder = new DruidConnectionHolder(this, pyConnectInfo);
                        connections[poolingCount] = holder;
                        incrementPoolingCount();
                    }

                    if (poolingCount > 0) {
                        poolingPeak = poolingCount;
                        poolingPeakTime = System.currentTimeMillis();
                    }
                } catch (SQLException ex) {
                    LOG.error("init datasource error, url: " + this.getUrl(), ex);
                    connectError = ex;
                }
            }

            createAndLogThread();
            createAndStartCreatorThread();
            // 重点关注这里,连接池对connection链接的管理方式
            createAndStartDestroyThread();

            initedLatch.await();
            init = true;

            initedTime = new Date();
            // 注册监控mbean对象
            registerMbean();

            if (connectError != null && poolingCount == 0) {
                throw connectError;
            }

            // 如果是长连接,则通过线程池管理空闲链接
            if (keepAlive) {
                // async fill to minIdle
                if (createScheduler != null) {
                    for (int i = 0; i < minIdle; ++i) {
                        createTaskCount++;
                        CreateConnectionTask task = new CreateConnectionTask();
                        this.createSchedulerFuture = createScheduler.submit(task);
                    }
                } else {
                    this.emptySignal();
                }
            }
        } catch (SQLException e) {
            LOG.error("{dataSource-" + this.getID() + "} init error", e);
            throw e;
        } catch (InterruptedException e) {
            throw new SQLException(e.getMessage(), e);
        } catch (RuntimeException e){
            LOG.error("{dataSource-" + this.getID() + "} init error", e);
            throw e;
        } catch (Error e){
            LOG.error("{dataSource-" + this.getID() + "} init error", e);
            throw e;
        }
        finally {
            // 标记初始化完成
            inited = true;
            lock.unlock();

            if (init && LOG.isInfoEnabled()) {
                String msg = "{dataSource-" + this.getID();

                if (this.name != null && !this.name.isEmpty()) {
                    msg += ",";
                    msg += this.name;
                }

                msg += "} inited";

                LOG.info(msg);
            }
        }
    }

这里主要关注下createAndStartDestroyThread()方法,

createAndStartDestroyThread

protected void createAndStartDestroyThread() {
        // 创建链接销毁任务对象
        destroyTask = new DestroyTask();

        // 如果绑定了线程,则使用连接池执行任务
        if (destroyScheduler != null) {
            long period = timeBetweenEvictionRunsMillis;
            if (period <= 0) {
                period = 1000;
            }
            // 如果没有配置周期,则为1秒钟,在以上配置中,我们设置的为60秒
            destroySchedulerFuture = destroyScheduler.scheduleAtFixedRate(destroyTask, period, period,
                                                                          TimeUnit.MILLISECONDS);
            initedLatch.countDown();
            return;
        }

        // 如果没有绑定线程池,则使用单独的线程执行,该类其实跟线程池实现类型都是一致的。都是在一定周期后执行链接移除任务
        String threadName = "Druid-ConnectionPool-Destroy-" + System.identityHashCode(this);
        destroyConnectionThread = new DestroyConnectionThread(threadName);
        destroyConnectionThread.start();
    }

通过上面分析可以得知,连接池对链接的管理是通过单据的线程周期的管理,这个执行周期如下:

  • 判断是否配置timeBetweenEvictionRunsMillis,如果配置,则该配置的时间作为周期轮询检测
  • 如果没有配置,默认为1秒一次对连接池链接管理销毁

DestroyTask

最终链接的销毁是通过该类来实现和完成的,具体代码如下:

public class DestroyTask implements Runnable {

        @Override
        public void run() {
            shrink(true, keepAlive);

            // 判断是否对连接池链接执行销毁, 依据removeAbandoned=true开启
            if (isRemoveAbandoned()) {
                removeAbandoned();
            }
        }

    }

在任务中其实需要removeAbandoned=true配置开启,然后才会执行removeAbandoned()方法,该方法其实在主类Datasource之中。

removeAbandoned()

public int removeAbandoned() {
        int removeCount = 0;
        // 获取当前时间
        long currrentNanos = System.nanoTime();

        List<DruidPooledConnection> abandonedList = new ArrayList<DruidPooledConnection>();

        activeConnectionLock.lock();
        try {
            Iterator<DruidPooledConnection> iter = activeConnections.keySet().iterator();

            for (; iter.hasNext();) {
                DruidPooledConnection pooledConnection = iter.next();

                if (pooledConnection.isRunning()) {
                    continue;
                }

                // 判断链接第一次获取时间倒现在经历的间隔时间
                long timeMillis = (currrentNanos - pooledConnection.getConnectedTimeNano()) / (1000 * 1000);

                // 如果间隔时间大于removeAbandonedTimeoutMillis的时间,默认为5分钟时,将链接从当前活跃链接池中移除,并加入销毁列表
                if (timeMillis >= removeAbandonedTimeoutMillis) {
                    iter.remove();
                    pooledConnection.setTraceEnable(false);
                    abandonedList.add(pooledConnection);
                }
            }
        } finally {
            activeConnectionLock.unlock();
        }

        if (abandonedList.size() > 0) {
            // 遍历需要销毁的链接对象
            for (DruidPooledConnection pooledConnection : abandonedList) {
                final ReentrantLock lock = pooledConnection.lock;
                lock.lock();
                try {
                    // 如果链接已经失效,则停止处理
                    if (pooledConnection.isDisable()) {
                        continue;
                    }
                } finally {
                    lock.unlock();
                }

                // 关闭链接。这里就是产生问题的根源,也就是说当超过了removeAbandonedTimeOutMillis时间的链接会被最终关闭
                JdbcUtils.close(pooledConnection);
                pooledConnection.abandond();
                removeAbandonedCount++;
                removeCount++;

                if (isLogAbandoned()) {
                    StringBuilder buf = new StringBuilder();
                    buf.append("abandon connection, owner thread: ");
                    buf.append(pooledConnection.getOwnerThread().getName());
                    buf.append(", connected at : ");
                    buf.append(pooledConnection.getConnectedTimeMillis());
                    buf.append(", open stackTrace\n");

                    StackTraceElement[] trace = pooledConnection.getConnectStackTrace();
                    for (int i = 0; i < trace.length; i++) {
                        buf.append("\tat ");
                        buf.append(trace[i].toString());
                        buf.append("\n");
                    }

                    buf.append("ownerThread current state is " + pooledConnection.getOwnerThread().getState()
                               + ", current stackTrace\n");
                    trace = pooledConnection.getOwnerThread().getStackTrace();
                    for (int i = 0; i < trace.length; i++) {
                        buf.append("\tat ");
                        buf.append(trace[i].toString());
                        buf.append("\n");
                    }

                    LOG.error(buf.toString());
                }
            }
        }

        return removeCount;
    }

从该方法的分析,其实当链接的使用时间超过removeAbandonedTimeOutMillis时间的时候,将会从可用连接池中移除,然后被关闭。链接的关闭操作,也是比较简单的。

DruidPooledConnection

链接中的关闭操作,主要代码如下

@Override
    public void close() throws SQLException {
        if (this.disable) {
            return;
        }

        // 数据库连接持有对象
        DruidConnectionHolder holder = this.holder;
        if (holder == null) {
            if (dupCloseLogEnable) {
                LOG.error("dup close");
            }
            return;
        }

        DruidAbstractDataSource dataSource = holder.getDataSource();
        // 判断当前关闭线程和持有链接线程是否为同一个线程
        boolean isSameThread = this.getOwnerThread() == Thread.currentThread();
        
        if (!isSameThread) {
            // 如果不是,则异步关闭
            dataSource.setAsyncCloseConnectionEnable(true);
        }
        
        if (dataSource.isAsyncCloseConnectionEnable()) {
            // 执行关闭逻辑, 关闭逻辑其实都差不多, 都会判断是否有filter, 然后执行recycle()方法
            syncClose();
            return;
        }

        for (ConnectionEventListener listener : holder.getConnectionEventListeners()) {
            listener.connectionClosed(new ConnectionEvent(this));
        }

        
        List<Filter> filters = dataSource.getProxyFilters();
        if (filters.size() > 0) {
            FilterChainImpl filterChain = new FilterChainImpl(dataSource);
            filterChain.dataSource_recycle(this);
        } else {
            recycle();
        }

        this.disable = true;
    }
public void recycle() throws SQLException {
        if (this.disable) {
            return;
        }

        DruidConnectionHolder holder = this.holder;
        if (holder == null) {
            if (dupCloseLogEnable) {
                LOG.error("dup close");
            }
            return;
        }

        if (!this.abandoned) {
            DruidAbstractDataSource dataSource = holder.getDataSource();
            dataSource.recycle(this);
        }

        // 将holder设置为空
        this.holder = null;
        conn = null;
        transactionInfo = null;
        // 关闭状态置为true
        closed = true;
    }

因此这里就讲到了为什么会有holder为空的逻辑,这里代码分析就算完成了。我们再次回到开始的错误connection holder is null的提示,这其实是因为holder已经被设置为空,然后我再去commit()事务的时候,就会出现这样的问题:

private void checkStateInternal() throws SQLException {
        // 判断holder是否为空
        if (holder == null) {
            if (disableError != null) {
                throw new SQLException("connection holder is null", disableError);
            } else {
                throw new SQLException("connection holder is null");
            }
        }

        // 判断链接是否已经关闭
        if (closed) {
            if (disableError != null) {
                throw new SQLException("connection closed", disableError);
            } else {
                throw new SQLException("connection closed");
            }
        }

        // 判断链接是否已被禁用
        if (disable) {
            if (disableError != null) {
                throw new SQLException("connection disabled", disableError);
            } else {
                throw new SQLException("connection disabled");
            }
        }
    }

getConnection()

是否所有的连接都会受到连接池的管理呢?其实不是,在上面代码中,连接销毁任务都是针对activeConnections进行的,而这个链接,只有在removeAbandoned开始时才会在获取链接的时候加入到activeConnections列表中,具体代码如下:

public DruidPooledConnection getConnectionDirect(long maxWaitMillis) throws SQLException {
        int notFullTimeoutRetryCnt = 0;
        for (;;) {
            ..... 省略链接创建流程代码

            // 当removeAbandoned=true的时候,将创建好的链接加入到activeConnections中
            if (removeAbandoned) {
                StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace();
                poolableConnection.connectStackTrace = stackTrace;
                poolableConnection.setConnectedTimeNano();
                poolableConnection.traceEnable = true;

                activeConnectionLock.lock();
                try {
                    activeConnections.put(poolableConnection, PRESENT);
                } finally {
                    activeConnectionLock.unlock();
                }
            }

            if (!this.defaultAutoCommit) {
                poolableConnection.setAutoCommit(false);
            }

            return poolableConnection;
        }
    }

总结

因此通过上面的解析,我们不难看出,因此该问题的主要原因主要有两个:

  • 同一个数据库连接使用时间过长,导致连接池释放链接
  • removeAbandonedTimeOutMillis时间比业务处理时间短。

因此可以看出,要解决这个问题,也可以通过两个途径解决:

  • 控制数据库连接的使用时间,对处理过长业务逻辑进行优化,对批量任务,每个任务单独使用数据连接
  • removeAbandonedTimeOutMillis时间调长。这种方案我是不太建议,因为这样的话,可能会导致其他业务无法获取到连接的情况

其他版本

其实这个提示在druid比较高的版本已经被优化了,因为对数连接池释放了数据库连接之后,连接应该是处理关闭的,所以在较高版本的druid上,会提示connection closed.

 

 

 

 

 

Leave a Comment

Comments

No comments yet. Why don’t you start the discussion?

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注