最近在做业务开发的时候,线上出现了定时任务实行失败的异常,查看日志,最主要的原因是提示: connection holder is null
, 经过代码排查了,是由于druid的连接池导致,所以记录下这个原因排查详细过程。
环境配置
由于之前的项目是比较老的,所以对druid的版本相对要老一点,使用的是1.1.7
版本,具体maven
如下:
<dependency> <groupId>com.alibaba</groupId> <artifactId>druid</artifactId> <version>1.1.7</version> </dependency>
示例师范
druid配置
示例采用的properties
的配置方式, 具体配置如下:
url=jdbc:mysql://192.168.3.37:3306/learn username=root password=mysql filters=config maxActive=20 initialSize=1 maxWait=6000 minIdle=1 timeBetweenEvictionRunsMillis=60000 minEvictableIdleTimeMillis=30000 testWhileIdle=true testOnBorrow=true testOnReturn=true poolPreparedStatements=true maxOpenPreparedStatements=20 asyncInit=true removeAbandoned=true
以上的配置是比较常规的配置信息, 其中跟此次实践有关的,主要为removeAbandoned=true
配置信息, 将在后面讲解
示例代码
package com.druid; import com.alibaba.druid.pool.DruidDataSourceFactory; import javax.sql.DataSource; import java.io.InputStream; import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.SQLException; import java.util.Properties; import java.util.concurrent.TimeUnit; public class DruidDataSourceTest { public static void main(String[] args) { // 记载配置文件 InputStream is = DruidDataSourceTest.class.getResourceAsStream("/druid.properties"); Properties properties = new Properties(); Connection connection = null; DataSource dataSource = null; try { properties.load(is); System.out.println("加载配置w完成..."); // 创建factory dataSource = DruidDataSourceFactory.createDataSource(properties); System.out.println("连接池创建成功"); // 获取Connection connection = dataSource.getConnection(); connection.setAutoCommit(false); // 模拟处理业务逻辑 System.out.println("执行业务逻辑.."); PreparedStatement statement = connection.prepareStatement("select * from user"); statement.execute(); Thread.sleep(TimeUnit.MINUTES.toMillis(6)); System.out.println("业务逻辑执行完成.."); connection.commit(); } catch (Exception e) { e.printStackTrace(); } finally { try { if (connection != null) { connection.close(); } } catch (SQLException e) { // ignore } } } }
这段代码使用了Thread.sleep
的方式模拟了一个业务逻辑,跟遇到的处理任务处理批量任务比较类似,然后我们将任务跑起来,在任务执行完成后,将的到一下的错误信息:
java.sql.SQLException: connection holder is null at com.alibaba.druid.pool.DruidPooledConnection.checkStateInternal(DruidPooledConnection.java:1157) at com.alibaba.druid.pool.DruidPooledConnection.checkState(DruidPooledConnection.java:1148) at com.alibaba.druid.pool.DruidPooledConnection.commit(DruidPooledConnection.java:743) at com.druid.DruidDataSourceTest.main(DruidDataSourceTest.java:38)
源码分析
在以上的还原现场的情况下,和我们预期的到的很相似,因此我们通过查看源码的方式查看,为什么会的到这样的结果。
DruidDatasourceFactory
这个类看名字基本上就能够知道,作为工厂创建DruidDatasource
对象,可以简单看一下代码:
public static DataSource createDataSource(Properties properties) throws Exception { return createDataSource((Map) properties); } public static DataSource createDataSource(Map properties) throws Exception { DruidDataSource dataSource = new DruidDataSource(); config(dataSource, properties); return dataSource; } public static void config(DruidDataSource dataSource, Map<?, ?> properties) throws SQLException { ... value = (String) properties.get(PROP_INIT); if ("true".equals(value)) { dataSource.init(); } }
在类中最重要的就是config
方法,config
方法主要是从map
中读取各种配置,然后加载到Datasource
对象中,便于直接使用Datasource
获取数据库连接信息。在config
最后一段代码中,可以看出,Datasource
本身包含了初始化init()
方法,该方法会在Datasource
正式使用的时候执行必要的初始化操作,该方法调用可以通过配置init=true
控制或者再获取connection
的时候执行延迟初始化。
DruidDatasource
该类是作为核心类,因为我们使用最多的还是通过Datasource.getConnection()
方法,因此直接查看getConnection()
方法源码:
@Override public DruidPooledConnection getConnection() throws SQLException { return getConnection(maxWait); } public DruidPooledConnection getConnection(long maxWaitMillis) throws SQLException { init(); if (filters.size() > 0) { FilterChainImpl filterChain = new FilterChainImpl(this); return filterChain.dataSource_connect(this, maxWaitMillis); } else { return getConnectionDirect(maxWaitMillis); } }
在获取Connection
链接的时候,都会优先执行初始化的操作.
init()
在Datasource
正式投入使用的时候,都会执行init()
方法,完成初始化操作。直接看init()
源码:
public void init() throws SQLException { // 是否已经初始化完成, 如果是则直接返回 if (inited) { return; } // 获取锁,保证只有线程执行初始化操作 final ReentrantLock lock = this.lock; try { lock.lockInterruptibly(); } catch (InterruptedException e) { throw new SQLException("interrupt", e); } boolean init = false; try { if (inited) { return; } initStackTrace = Utils.toString(Thread.currentThread().getStackTrace()); this.id = DruidDriver.createDataSourceId(); if (this.id > 1) { long delta = (this.id - 1) * 100000; this.connectionIdSeed.addAndGet(delta); this.statementIdSeed.addAndGet(delta); this.resultSetIdSeed.addAndGet(delta); this.transactionIdSeed.addAndGet(delta); } if (this.jdbcUrl != null) { this.jdbcUrl = this.jdbcUrl.trim(); initFromWrapDriverUrl(); } // 初始化filter对象 for (Filter filter : filters) { filter.init(this); } // 获取数据库类型 if (this.dbType == null || this.dbType.length() == 0) { this.dbType = JdbcUtils.getDbType(jdbcUrl, null); } if (JdbcConstants.MYSQL.equals(this.dbType) || JdbcConstants.MARIADB.equals(this.dbType) || JdbcConstants.ALIYUN_ADS.equals(this.dbType)) { boolean cacheServerConfigurationSet = false; if (this.connectProperties.containsKey("cacheServerConfiguration")) { cacheServerConfigurationSet = true; } else if (this.jdbcUrl.indexOf("cacheServerConfiguration") != -1) { cacheServerConfigurationSet = true; } if (cacheServerConfigurationSet) { this.connectProperties.put("cacheServerConfiguration", "true"); } } ..... // 省略验证代码 // 通过SPI的方式加载服务, 具体是什么,不在关注范围内 initFromSPIServiceLoader(); // 加载driver类 if (this.driver == null) { if (this.driverClass == null || this.driverClass.isEmpty()) { this.driverClass = JdbcUtils.getDriverClassName(this.jdbcUrl); } if (MockDriver.class.getName().equals(driverClass)) { driver = MockDriver.instance; } else { driver = JdbcUtils.createDriver(driverClassLoader, driverClass); } } else { if (this.driverClass == null) { this.driverClass = driver.getClass().getName(); } } initCheck(); initExceptionSorter(); initValidConnectionChecker(); validationQueryCheck(); if (isUseGlobalDataSourceStat()) { dataSourceStat = JdbcDataSourceStat.getGlobal(); if (dataSourceStat == null) { dataSourceStat = new JdbcDataSourceStat("Global", "Global", this.dbType); JdbcDataSourceStat.setGlobal(dataSourceStat); } if (dataSourceStat.getDbType() == null) { dataSourceStat.setDbType(this.dbType); } } else { dataSourceStat = new JdbcDataSourceStat(this.name, this.jdbcUrl, this.dbType, this.connectProperties); } dataSourceStat.setResetStatEnable(this.resetStatEnable); // 初始化连接池管理缓存,其实就是数组 connections = new DruidConnectionHolder[maxActive]; evictConnections = new DruidConnectionHolder[maxActive]; keepAliveConnections = new DruidConnectionHolder[maxActive]; SQLException connectError = null; boolean asyncInit = this.asyncInit && createScheduler == null; if (!asyncInit) { try { // init connections for (int i = 0, size = getInitialSize(); i < size; ++i) { PhysicalConnectionInfo pyConnectInfo = createPhysicalConnection(); DruidConnectionHolder holder = new DruidConnectionHolder(this, pyConnectInfo); connections[poolingCount] = holder; incrementPoolingCount(); } if (poolingCount > 0) { poolingPeak = poolingCount; poolingPeakTime = System.currentTimeMillis(); } } catch (SQLException ex) { LOG.error("init datasource error, url: " + this.getUrl(), ex); connectError = ex; } } createAndLogThread(); createAndStartCreatorThread(); // 重点关注这里,连接池对connection链接的管理方式 createAndStartDestroyThread(); initedLatch.await(); init = true; initedTime = new Date(); // 注册监控mbean对象 registerMbean(); if (connectError != null && poolingCount == 0) { throw connectError; } // 如果是长连接,则通过线程池管理空闲链接 if (keepAlive) { // async fill to minIdle if (createScheduler != null) { for (int i = 0; i < minIdle; ++i) { createTaskCount++; CreateConnectionTask task = new CreateConnectionTask(); this.createSchedulerFuture = createScheduler.submit(task); } } else { this.emptySignal(); } } } catch (SQLException e) { LOG.error("{dataSource-" + this.getID() + "} init error", e); throw e; } catch (InterruptedException e) { throw new SQLException(e.getMessage(), e); } catch (RuntimeException e){ LOG.error("{dataSource-" + this.getID() + "} init error", e); throw e; } catch (Error e){ LOG.error("{dataSource-" + this.getID() + "} init error", e); throw e; } finally { // 标记初始化完成 inited = true; lock.unlock(); if (init && LOG.isInfoEnabled()) { String msg = "{dataSource-" + this.getID(); if (this.name != null && !this.name.isEmpty()) { msg += ","; msg += this.name; } msg += "} inited"; LOG.info(msg); } } }
这里主要关注下createAndStartDestroyThread()方法,
createAndStartDestroyThread
protected void createAndStartDestroyThread() { // 创建链接销毁任务对象 destroyTask = new DestroyTask(); // 如果绑定了线程,则使用连接池执行任务 if (destroyScheduler != null) { long period = timeBetweenEvictionRunsMillis; if (period <= 0) { period = 1000; } // 如果没有配置周期,则为1秒钟,在以上配置中,我们设置的为60秒 destroySchedulerFuture = destroyScheduler.scheduleAtFixedRate(destroyTask, period, period, TimeUnit.MILLISECONDS); initedLatch.countDown(); return; } // 如果没有绑定线程池,则使用单独的线程执行,该类其实跟线程池实现类型都是一致的。都是在一定周期后执行链接移除任务 String threadName = "Druid-ConnectionPool-Destroy-" + System.identityHashCode(this); destroyConnectionThread = new DestroyConnectionThread(threadName); destroyConnectionThread.start(); }
通过上面分析可以得知,连接池对链接的管理是通过单据的线程周期的管理,这个执行周期如下:
- 判断是否配置
timeBetweenEvictionRunsMillis
,如果配置,则该配置的时间作为周期轮询检测 - 如果没有配置,默认为
1秒
一次对连接池链接管理销毁
DestroyTask
最终链接的销毁是通过该类来实现和完成的,具体代码如下:
public class DestroyTask implements Runnable { @Override public void run() { shrink(true, keepAlive); // 判断是否对连接池链接执行销毁, 依据removeAbandoned=true开启 if (isRemoveAbandoned()) { removeAbandoned(); } } }
在任务中其实需要removeAbandoned=true
配置开启,然后才会执行removeAbandoned()
方法,该方法其实在主类Datasource
之中。
removeAbandoned()
public int removeAbandoned() { int removeCount = 0; // 获取当前时间 long currrentNanos = System.nanoTime(); List<DruidPooledConnection> abandonedList = new ArrayList<DruidPooledConnection>(); activeConnectionLock.lock(); try { Iterator<DruidPooledConnection> iter = activeConnections.keySet().iterator(); for (; iter.hasNext();) { DruidPooledConnection pooledConnection = iter.next(); if (pooledConnection.isRunning()) { continue; } // 判断链接第一次获取时间倒现在经历的间隔时间 long timeMillis = (currrentNanos - pooledConnection.getConnectedTimeNano()) / (1000 * 1000); // 如果间隔时间大于removeAbandonedTimeoutMillis的时间,默认为5分钟时,将链接从当前活跃链接池中移除,并加入销毁列表 if (timeMillis >= removeAbandonedTimeoutMillis) { iter.remove(); pooledConnection.setTraceEnable(false); abandonedList.add(pooledConnection); } } } finally { activeConnectionLock.unlock(); } if (abandonedList.size() > 0) { // 遍历需要销毁的链接对象 for (DruidPooledConnection pooledConnection : abandonedList) { final ReentrantLock lock = pooledConnection.lock; lock.lock(); try { // 如果链接已经失效,则停止处理 if (pooledConnection.isDisable()) { continue; } } finally { lock.unlock(); } // 关闭链接。这里就是产生问题的根源,也就是说当超过了removeAbandonedTimeOutMillis时间的链接会被最终关闭 JdbcUtils.close(pooledConnection); pooledConnection.abandond(); removeAbandonedCount++; removeCount++; if (isLogAbandoned()) { StringBuilder buf = new StringBuilder(); buf.append("abandon connection, owner thread: "); buf.append(pooledConnection.getOwnerThread().getName()); buf.append(", connected at : "); buf.append(pooledConnection.getConnectedTimeMillis()); buf.append(", open stackTrace\n"); StackTraceElement[] trace = pooledConnection.getConnectStackTrace(); for (int i = 0; i < trace.length; i++) { buf.append("\tat "); buf.append(trace[i].toString()); buf.append("\n"); } buf.append("ownerThread current state is " + pooledConnection.getOwnerThread().getState() + ", current stackTrace\n"); trace = pooledConnection.getOwnerThread().getStackTrace(); for (int i = 0; i < trace.length; i++) { buf.append("\tat "); buf.append(trace[i].toString()); buf.append("\n"); } LOG.error(buf.toString()); } } } return removeCount; }
从该方法的分析,其实当链接的使用时间超过removeAbandonedTimeOutMillis
时间的时候,将会从可用连接池中移除,然后被关闭。链接的关闭操作,也是比较简单的。
DruidPooledConnection
链接中的关闭操作,主要代码如下
@Override public void close() throws SQLException { if (this.disable) { return; } // 数据库连接持有对象 DruidConnectionHolder holder = this.holder; if (holder == null) { if (dupCloseLogEnable) { LOG.error("dup close"); } return; } DruidAbstractDataSource dataSource = holder.getDataSource(); // 判断当前关闭线程和持有链接线程是否为同一个线程 boolean isSameThread = this.getOwnerThread() == Thread.currentThread(); if (!isSameThread) { // 如果不是,则异步关闭 dataSource.setAsyncCloseConnectionEnable(true); } if (dataSource.isAsyncCloseConnectionEnable()) { // 执行关闭逻辑, 关闭逻辑其实都差不多, 都会判断是否有filter, 然后执行recycle()方法 syncClose(); return; } for (ConnectionEventListener listener : holder.getConnectionEventListeners()) { listener.connectionClosed(new ConnectionEvent(this)); } List<Filter> filters = dataSource.getProxyFilters(); if (filters.size() > 0) { FilterChainImpl filterChain = new FilterChainImpl(dataSource); filterChain.dataSource_recycle(this); } else { recycle(); } this.disable = true; }
public void recycle() throws SQLException { if (this.disable) { return; } DruidConnectionHolder holder = this.holder; if (holder == null) { if (dupCloseLogEnable) { LOG.error("dup close"); } return; } if (!this.abandoned) { DruidAbstractDataSource dataSource = holder.getDataSource(); dataSource.recycle(this); } // 将holder设置为空 this.holder = null; conn = null; transactionInfo = null; // 关闭状态置为true closed = true; }
因此这里就讲到了为什么会有holder
为空的逻辑,这里代码分析就算完成了。我们再次回到开始的错误connection holder is null
的提示,这其实是因为holder
已经被设置为空,然后我再去commit()
事务的时候,就会出现这样的问题:
private void checkStateInternal() throws SQLException { // 判断holder是否为空 if (holder == null) { if (disableError != null) { throw new SQLException("connection holder is null", disableError); } else { throw new SQLException("connection holder is null"); } } // 判断链接是否已经关闭 if (closed) { if (disableError != null) { throw new SQLException("connection closed", disableError); } else { throw new SQLException("connection closed"); } } // 判断链接是否已被禁用 if (disable) { if (disableError != null) { throw new SQLException("connection disabled", disableError); } else { throw new SQLException("connection disabled"); } } }
getConnection()
是否所有的连接都会受到连接池的管理呢?其实不是,在上面代码中,连接销毁任务都是针对activeConnections
进行的,而这个链接,只有在removeAbandoned
开始时才会在获取链接的时候加入到activeConnections
列表中,具体代码如下:
public DruidPooledConnection getConnectionDirect(long maxWaitMillis) throws SQLException { int notFullTimeoutRetryCnt = 0; for (;;) { ..... 省略链接创建流程代码 // 当removeAbandoned=true的时候,将创建好的链接加入到activeConnections中 if (removeAbandoned) { StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace(); poolableConnection.connectStackTrace = stackTrace; poolableConnection.setConnectedTimeNano(); poolableConnection.traceEnable = true; activeConnectionLock.lock(); try { activeConnections.put(poolableConnection, PRESENT); } finally { activeConnectionLock.unlock(); } } if (!this.defaultAutoCommit) { poolableConnection.setAutoCommit(false); } return poolableConnection; } }
总结
因此通过上面的解析,我们不难看出,因此该问题的主要原因主要有两个:
- 同一个数据库连接使用时间过长,导致连接池释放链接
removeAbandonedTimeOutMillis
时间比业务处理时间短。
因此可以看出,要解决这个问题,也可以通过两个途径解决:
- 控制数据库连接的使用时间,对处理过长业务逻辑进行优化,对批量任务,每个任务单独使用数据连接
- 将
removeAbandonedTimeOutMillis
时间调长。这种方案我是不太建议,因为这样的话,可能会导致其他业务无法获取到连接的情况
其他版本
其实这个提示在druid
比较高的版本已经被优化了,因为对数连接池释放了数据库连接之后,连接应该是处理关闭的,所以在较高版本的druid
上,会提示connection closed
.