什么是数据库连接池

数据库连接池负责分配、管理和释放数据库连接,它允许应用程序重复使用一个现有的数据库连接,而不是再重新建立一个;释放空闲时间超过最大空闲时间的数据库连接来避免因为没有释放数据库连接而引起的数据库连接遗漏。这项技术能明显提高对数据库操作的性能。

在使用 Mybatis 的时候,数据库的连接一般都会使用第三方的数据源组件,如 C3P0,DBCP 和 Druid 等,其实 Mybatis 也有自己的数据源实现,可以连接数据库,还有连接池的功能,下面就来看看 Mybatis 自己实现的数据源头和连接池的一个实现原理。

类结构

Mybatis 数据源的实现主要是在 datasource 包:

img

我们常见的数据库连接池都需要实现javax.sql.DataSource接口mybatis提供了UnpooledDataSource和PooledDataSource的实现, 一个使用连接池,一个不使用连接池此外,对于这两个类,Mybatis 还提供了两个工厂类进行创建对象,是工厂方法模式的一个应用,首先来看下它们的一个类图:

img

我们先看下工厂类

DataSourceFactory

数据源连接池工厂类接口

该类是 JndiDataSourceFactory 和 UnpooledDataSourceFactory 两个工厂类的顶层接口,只定义了两个方法

/**
 * 数据源连接池 接口
 * @author Clinton Begin
 */
public interface DataSourceFactory {
  /**
   * 设置属性
   * @param props
   */
  void setProperties(Properties props);

  /**
   * 获取数据源连接池
   * @return
   */
  DataSource getDataSource();

}

UnpooledDataSourceFactory

UnpooledDataSourceFactory 主要用来创建 UnpooledDataSource 对象,它会在构造方法中初始化 UnpooledDataSource 对象,并在 setProperties 方法中完成对 UnpooledDataSource 对象的配置

/**
 * UnpooledDataSource 连接工厂
 *
 * @author Clinton Begin
 */
public class UnpooledDataSourceFactory implements DataSourceFactory {
  // 数据库驱动前缀
  private static final String DRIVER_PROPERTY_PREFIX = "driver.";
  private static final int DRIVER_PROPERTY_PREFIX_LENGTH = DRIVER_PROPERTY_PREFIX.length();

  // 对应的数据源,即 UnpooledDataSource
  protected DataSource dataSource;

  //构造方法
  public UnpooledDataSourceFactory() {
    this.dataSource = new UnpooledDataSource();
  }

  /**
   * 对数据源 UnpooledDataSource 进行配置
   *
   * @param properties
   */
  @Override
  public void setProperties(Properties properties) {
    //数据源属性文件
    Properties driverProperties = new Properties();
    // 创建 DataSource 相应的 MetaObject
    MetaObject metaDataSource = SystemMetaObject.forObject(dataSource);
    // 遍历 properties 集合,该集合中存放了数据源需要的信息
    for (Object key : properties.keySet()) {
      String propertyName = (String) key;
      // 以 "driver." 开头的配置项是对 DataSource 的配置,记录到 driverProperties  中
      if (propertyName.startsWith(DRIVER_PROPERTY_PREFIX)) {
        String value = properties.getProperty(propertyName);
        //设置连接的属性和值
        driverProperties.setProperty(propertyName.substring(DRIVER_PROPERTY_PREFIX_LENGTH), value);
        // 该属性是否有 set 方法
      } else if (metaDataSource.hasSetter(propertyName)) {
        // 获取对应的属性值
        String value = (String) properties.get(propertyName);
        // 根据属性类型进行类型的转换,主要是 Integer, Long, Boolean 三种类型的转换
        Object convertedValue = convertValue(metaDataSource, propertyName, value);
        // 设置DataSource 的相关属性值
        metaDataSource.setValue(propertyName, convertedValue);
      } else {
        throw new DataSourceException("Unknown DataSource property: " + propertyName);
      }
    }
    // 设置 DataSource.driverProerties 属性值
    if (driverProperties.size() > 0) {
      metaDataSource.setValue("driverProperties", driverProperties);
    }
  }

  // 返回数据源
  @Override
  public DataSource getDataSource() {
    return dataSource;
  }

  // 类型转
  private Object convertValue(MetaObject metaDataSource, String propertyName, String value) {
    Object convertedValue = value;
    Class<?> targetType = metaDataSource.getSetterType(propertyName);
    if (targetType == Integer.class || targetType == int.class) {
      convertedValue = Integer.valueOf(value);
    } else if (targetType == Long.class || targetType == long.class) {
      convertedValue = Long.valueOf(value);
    } else if (targetType == Boolean.class || targetType == boolean.class) {
      convertedValue = Boolean.valueOf(value);
    }
    return convertedValue;
  }

}

JndiDataSourceFactory 依赖 JNDI 服务器中获取用户配置的 DataSource,这里可以不看。

UnpooledDataSourceFactory工厂类主要的作用是对数据源进行配置。

PooledDataSourceFactory

PooledDataSourceFactory 主要用来创建 PooledDataSource 对象,它继承了 UnpooledDataSource 类,设置 DataSource 参数的方法复用UnpooledDataSource 中的 setProperties 方法,只是数据源返回的是 PooledDataSource 对象而已。

/**
 * 连接池工厂
 * @author Clinton Begin
 */
public class PooledDataSourceFactory extends UnpooledDataSourceFactory {
  /**
   * 创建连接池对象
   */
  public PooledDataSourceFactory() {
    this.dataSource = new PooledDataSource();
  }

}

以上这些就是 Mybatis 用来创建数据源的工厂类,下面就来看下数据源的主要实现。

UnpooledDataSource

UnpooledDataSource 不使用连接池来创建数据库连接,每次获取数据库连接时都会创建一个新的连接进行返回;

初始化过程

在static静态块中进行驱动注册

/**
 * 未使用连接池的类
 *
 * @author Clinton Begin
 * @author Eduardo Macarron
 */
public class UnpooledDataSource implements DataSource {
  // 加载 Driver 类的类加载器
  private ClassLoader driverClassLoader;
  // 数据库连接驱动的相关配置
  private Properties driverProperties;
  // 缓存所有已注册的数据库连接驱动
  private static Map<String, Driver> registeredDrivers = new ConcurrentHashMap<>();
  //数据源配置相关
  private String driver;
  private String url;
  private String username;
  private String password;
  // 是否自动提交
  private Boolean autoCommit;
  //默认事务隔离级别
  private Integer defaultTransactionIsolationLevel;
  //默认超时时间
  private Integer defaultNetworkTimeout;

  /**
   *  静态块,在初始化的时候,从 DriverManager 中获取所有的已注册的驱动信息,并缓存到该类的 registeredDrivers集合中
   */
  static {
    //获取驱动信息
    Enumeration<Driver> drivers = DriverManager.getDrivers();
    while (drivers.hasMoreElements()) {
      Driver driver = drivers.nextElement();
      //注册进集合中
      registeredDrivers.put(driver.getClass().getName(), driver);
    }
  }

  /**
   * 无参构造
   */
  public UnpooledDataSource() {
  }

  public UnpooledDataSource(String driver, String url, String username, String password) {
    this.driver = driver;
    this.url = url;
    this.username = username;
    this.password = password;
  }

  public UnpooledDataSource(String driver, String url, Properties driverProperties) {
    this.driver = driver;
    this.url = url;
    this.driverProperties = driverProperties;
  }
}

获取一个连接

这里面没有提供连接销毁的方法,在连接池中有销毁连接的方法

/**
 * 获取一个连接
 *
 * @return 连接对象
 * @throws SQLException
 */
@Override
public Connection getConnection() throws SQLException {
  return doGetConnection(username, password);
}

@Override
public Connection getConnection(String username, String password) throws SQLException {
  return doGetConnection(username, password);
}
/**
 * 创建一个数据库连接
 *
 * @param username
 * @param password
 * @return
 * @throws SQLException
 */
private Connection doGetConnection(String username, String password) throws SQLException {
  Properties props = new Properties();
  if (driverProperties != null) {
    props.putAll(driverProperties);
  }
  //设置用户
  if (username != null) {
    props.setProperty("user", username);
  }
  //设置密码
  if (password != null) {
    props.setProperty("password", password);
  }
  return doGetConnection(props);
}

/**
 * 创建一个连接对象
 *
 * @param properties 数据库属性文件
 * @return 数据库连接
 * @throws SQLException
 */
private Connection doGetConnection(Properties properties) throws SQLException {
  //初始化驱动信息
  initializeDriver();
  // 通过 DriverManager 来获取一个数据库连接
  Connection connection = DriverManager.getConnection(url, properties);
  //配置连接信息
  configureConnection(connection);
  return connection;
}

/**
 * 初始化驱动信息
 *
 * @throws SQLException
 */
private synchronized void initializeDriver() throws SQLException {
  // 如果当前的驱动还没有注册,则进行注册
  if (!registeredDrivers.containsKey(driver)) {
    Class<?> driverType;
    try {
      //驱动的类加载器不为空
      if (driverClassLoader != null) {
        //获取驱动类型
        driverType = Class.forName(driver, true, driverClassLoader);
      } else {
        driverType = Resources.classForName(driver);
      }
      // DriverManager requires the driver to be loaded via the system ClassLoader.
      // http://www.kfu.com/~nsayer/Java/dyn-jdbc.html
      //创建驱动
      Driver driverInstance = (Driver) driverType.newInstance();
      //向JDBC注册驱动
      DriverManager.registerDriver(new DriverProxy(driverInstance));
      //向registeredDrivers注册驱动
      registeredDrivers.put(driver, driverInstance);
    } catch (Exception e) {
      throw new SQLException("Error setting driver on UnpooledDataSource. Cause: " + e);
    }
  }
}

/**
 * 配置数据库连接信息
 * @param conn
 * @throws SQLException
 */
private void configureConnection(Connection conn) throws SQLException {
  //设置超时时间
  if (defaultNetworkTimeout != null) {
    conn.setNetworkTimeout(Executors.newSingleThreadExecutor(), defaultNetworkTimeout);
  }
  //设置是否自动提交
  if (autoCommit != null && autoCommit != conn.getAutoCommit()) {
    conn.setAutoCommit(autoCommit);
  }
  //设置事务隔离等级
  if (defaultTransactionIsolationLevel != null) {
    conn.setTransactionIsolation(defaultTransactionIsolationLevel);
  }
}

以上代码就是 UnpooledDataSource 类的主要实现逻辑,每次获取连接都是从数据库新创建一个连接进行返回,又因为,数据库连接的创建是一个耗时的操作,且数据库连接是非常珍贵的资源,如果每次获取连接都创建一个,则可能会造成系统的瓶颈,拖垮响应速度等,这时就需要数据库连接池了,Mybatis 也提供了自己数据库连接池的实现,就是 PooledDataSource 类。

PooledDataSource

PooledDataSource 是一个比较复杂的类,PooledDataSource 新创建数据库连接是使用 UnpooledDataSource 来实现的,且 PooledDataSource 并不会管理 java.sql.Connection 对象,而是管理 PooledConnection 对象,在 PooledConnection 中封装了真正的数据库连接对象和其代理对象;此外,由于它是一个连接池,所以还需要管理连接池的状态,比如有多少连接是空闲的,还可以创建多少连接,此时,就需要一个类来管理连接池的对象,即 PoolState 对象;先来看下 PooledDataSource 的一个 UML 图:

img

PooledConnection

它主要是用来管理数据库连接的,它是一个代理类,实现了 InvocationHandler 接口

主体方法

/**
 * mybatis 连接池对象 使用的是 动态代理
 *
 * @author Clinton Begin
 */
class PooledConnection implements InvocationHandler {
  //关闭方法
  private static final String CLOSE = "close";
  private static final Class<?>[] IFACES = new Class<?>[]{Connection.class};

  private final int hashCode;
  // 记录当前的 PooledConnection 对象所在的 PooledDataSource 对象,该 PooledConnection 对象是从 PooledDataSource 对象中获取的,当调用 close 方法时会将 PooledConnection 放回该 PooledDataSource 中去
  private final PooledDataSource dataSource;
  //真实连接
  private final Connection realConnection;
  // 数据库连接的代理对象
  private final Connection proxyConnection;
  // 从连接池中取出该连接的时间戳
  private long checkoutTimestamp;
  // 该连接创建的时间戳
  private long createdTimestamp;
  // 该连接最后一次被使用的时间戳
  private long lastUsedTimestamp;
  // 用于标识该连接所在的连接池,由URL+username+password 计算出来的hash值
  private int connectionTypeCode;
  // 该连接是否有效
  private boolean valid;

  /**
   * Constructor for SimplePooledConnection that uses the Connection and PooledDataSource passed in.
   * 创建连接
   *
   * @param connection - the connection that is to be presented as a pooled connection
   * @param dataSource - the dataSource that the connection is from
   */
  public PooledConnection(Connection connection, PooledDataSource dataSource) {
    this.hashCode = connection.hashCode();
    this.realConnection = connection;
    this.dataSource = dataSource;
    this.createdTimestamp = System.currentTimeMillis();
    this.lastUsedTimestamp = System.currentTimeMillis();
    this.valid = true;
    this.proxyConnection = (Connection) Proxy.newProxyInstance(Connection.class.getClassLoader(), IFACES, this);
  }

  /**
   * Invalidates the connection.
   *   废弃该连接 设置连接未不可用
   */
  public void invalidate() {
    valid = false;
  }
      /**
   * Method to see if the connection is usable.
   * 检查连接是否可用 连接可用并且能够被ping,
   *   1.判断 valid 字段
   *   2.向数据库中发送检测测试的SQL,查看真正的连接还是否有效
   * @return True if the connection is usable
   */
  public boolean isValid() {
    return valid && realConnection != null && dataSource.pingConnection(this);
  }
    
  /**
   * 检查连接
   * @throws SQLException
   */
  private void checkConnection() throws SQLException {
    if (!valid) {
      throw new SQLException("Error accessing PooledConnection. Connection is invalid.");
    }
  }
}

invoke代理方法

该方法是 proxyConnection 这个连接代理对象的真正代理逻辑,它会对 close 方法进行代理,并且在调用真正的连接之前对连接进行检测。

/**
   * Required for InvocationHandler implementation.
   * 动态代理的入口方法
   *
   * @param proxy  - not used
   * @param method - the method to be executed
   * @param args   - the parameters to be passed to the method
   * @see java.lang.reflect.InvocationHandler#invoke(Object, java.lang.reflect.Method, Object[])
   */
  @Override
  public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
    String methodName = method.getName();
    //如果关闭连接 将当前连接放回连接池 资源复用
    if (CLOSE.hashCode() == methodName.hashCode() && CLOSE.equals(methodName)) {
      dataSource.pushConnection(this);
      return null;
    }
    try {
      //不是Object对象的一些方法例如 toString等方法
      if (!Object.class.equals(method.getDeclaringClass())) {
        // issue #579 toString() should never fail
        // throw an SQLException instead of a Runtime
        // 执行之前,需要进行连接的检测
        checkConnection();
      }
      // 调用数据库真正的连接进行执行
      return method.invoke(realConnection, args);
    } catch (Throwable t) {
      throw ExceptionUtil.unwrapThrowable(t);
    }
  }

PoolState

PoolState 类主要是用来管理连接池的状态,比如哪些连接是空闲的,哪些是活动的,还可以创建多少连接等。该类中只是定义了一些属性来进行控制连接池的状态,并没有任何的方法。

/**
 * 连接池状态
 * @author Clinton Begin
 */
public class PoolState {
  // 该 PoolState 属于哪个 PooledDataSource
  protected PooledDataSource dataSource;
  // 来用存放空闲的 pooledConnection 连接
  protected final List<PooledConnection> idleConnections = new ArrayList<>();
  // 用来存放活跃的 PooledConnection 连接
  protected final List<PooledConnection> activeConnections = new ArrayList<>();
  // 请求数据库连接的次数
  protected long requestCount = 0;
  // 请求数据库连接的次数
  protected long accumulatedRequestTime = 0;
  // checkoutTime 表示从连接池中获取连接到归还连接的时间
  // accumulatedCheckoutTime 记录了所有连接的累计 checkoutTime 时长
  protected long accumulatedCheckoutTime = 0;
  // 连接超时的连接个数
  protected long claimedOverdueConnectionCount = 0;
  // 累计超时时间
  protected long accumulatedCheckoutTimeOfOverdueConnections = 0;
  // 累计等待时间
  protected long accumulatedWaitTime = 0;
  // 等待次数
  protected long hadToWaitCount = 0;
  // 无效的连接数
  protected long badConnectionCount = 0;

  /**
   * 构造方法
   * @param dataSource
   */
  public PoolState(PooledDataSource dataSource) {
    this.dataSource = dataSource;
  }
}

PooledDataSource

终于等来了我们的主角,PooledDataSource 它是一个简单的,同步的,线程安全的数据库连接池

知道了 UnpooledDataSource 用来创建数据库新的连接,PooledConnection 用来管理连接池中的连接,PoolState 用来管理连接池的状态之后,来看下 PooledDataSource 的一个逻辑,该类中主要有以下几个方法:获取数据库连接的方法 popConnection,把连接放回连接池的方法 pushConnection,检测数据库连接是否有效的方法 pingConnection ,还有 关闭连接池中所有连接的方法 forceCloseAll

主体方法

/**
 * This is a simple, synchronous, thread-safe database connection pool.
 * <p>
 * 简单的 带有锁的 线程安全的 数据库连接池
 *
 * @author Clinton Begin
 */
public class PooledDataSource implements DataSource {

  private static final Log log = LogFactory.getLog(PooledDataSource.class);

  /**
   * 数据库连接池状态
   */
  private final PoolState state = new PoolState(this);

  // 用来创建真正的数据库连接对象
  private final UnpooledDataSource dataSource;

  // OPTIONAL CONFIGURATION FIELDS
  // 最大活跃的连接数,默认为 10
  protected int poolMaximumActiveConnections = 10;
  // 最大空闲连接数,默认为 5
  protected int poolMaximumIdleConnections = 5;
  // 最大获取连接的时长
  protected int poolMaximumCheckoutTime = 20000;
  // 在无法获取到连接时,最大等待的时间
  protected int poolTimeToWait = 20000;
  //脸是失败重试次数
  protected int poolMaximumLocalBadConnectionTolerance = 3;
  // 在检测一个连接是否可用时,会向数据库发送一个测试 SQL
  protected String poolPingQuery = "NO PING QUERY SET";
  //是否启动连接池ping
  protected boolean poolPingEnabled;
  // 当连接超过 poolPingConnectionsNotUsedFor 毫秒未使用时,会发送一次测试 SQL 语句,测试连接是否正常
  protected int poolPingConnectionsNotUsedFor;
  // 标志着当前的连接池,是 url+username+password 的 hash 值
  private int expectedConnectionTypeCode;

  /**
   * 创建连接池
   */
  public PooledDataSource() {
    dataSource = new UnpooledDataSource();
  }

  public PooledDataSource(UnpooledDataSource dataSource) {
    this.dataSource = dataSource;
  }
      /**
   *  生成 连接的hash 值
   * @param url
   * @param username
   * @param password
   * @return
   */
  private int assembleConnectionTypeCode(String url, String username, String password) {
    return ("" + url + username + password).hashCode();
  }
}

获取连接

从 连接池中获取连接的方法主要是在 popConnection 中实现的,先来看下它的一个流程图:

img

代码逻辑如下

  /**
   * 获取连接
   *
   * @return
   * @throws SQLException
   */
  @Override
  public Connection getConnection() throws SQLException {
    return popConnection(dataSource.getUsername(), dataSource.getPassword()).getProxyConnection();
  }

  /**
   * 获取连接
   * @param username
   * @param password
   * @return
   * @throws SQLException
   */
  @Override
  public Connection getConnection(String username, String password) throws SQLException {
    return popConnection(username, password).getProxyConnection();
  }

/**
   * 从连接池中获取连接
   * @param username
   * @param password
   * @return
   * @throws SQLException
   */
  private PooledConnection popConnection(String username, String password) throws SQLException {

    //等待个数
    boolean countedWait = false;
    // PooledConnection 对象
    PooledConnection conn = null;
    //耗时计算
    long t = System.currentTimeMillis();
    // 无效的连接个数
    int localBadConnectionCount = 0;
    //自旋
    while (conn == null) {
      synchronized (state) {
        // 检测是否还有空闲的连接
        if (!state.idleConnections.isEmpty()) {
          // Pool has available connection
          // 连接池中还有空闲的连接,则直接获取连接返回
          conn = state.idleConnections.remove(0);
          if (log.isDebugEnabled()) {
            log.debug("Checked out connection " + conn.getRealHashCode() + " from pool.");
          }
        } else {
          // 连接池中已经没有空闲连接了
          // Pool does not have available connection
          if (state.activeConnections.size() < poolMaximumActiveConnections) {
            // Can create new connection
            // 活跃的连接数没有达到最大值,则创建一个新的数据库连接
            conn = new PooledConnection(dataSource.getConnection(), this);
            if (log.isDebugEnabled()) {
              log.debug("Created connection " + conn.getRealHashCode() + ".");
            }

          } else {
            // Cannot create new connection
            // 如果活跃的连接数已经达到允许的最大值了,则不能创建新的数据库连接
            // 获取最先创建的那个活跃的连接
            PooledConnection oldestActiveConnection = state.activeConnections.get(0);
            long longestCheckoutTime = oldestActiveConnection.getCheckoutTime();
            // 检测该连接是否超时
            if (longestCheckoutTime > poolMaximumCheckoutTime) {
              // 如果该连接超时,则进行相应的统计
              // Can claim overdue connection
              //超时连接个数统计
              state.claimedOverdueConnectionCount++;
              //统计过期的连接超时时间
              state.accumulatedCheckoutTimeOfOverdueConnections += longestCheckoutTime;
              //统计超时时间
              state.accumulatedCheckoutTime += longestCheckoutTime;
              // 将超时连接移出 activeConnections 集合
              state.activeConnections.remove(oldestActiveConnection);
              //如果没有设置自动提交
              if (!oldestActiveConnection.getRealConnection().getAutoCommit()) {
                try {
                  // 如果超时未提交,则自动回滚
                  oldestActiveConnection.getRealConnection().rollback();
                } catch (SQLException e) {
                  /*
                     Just log a message for debug and continue to execute the following
                     statement like nothing happened.
                     Wrap the bad connection with a new PooledConnection, this will help
                     to not interrupt current executing thread and give current thread a
                     chance to join the next competition for another valid/good database
                     connection. At the end of this loop, bad {@link @conn} will be set as null.
                   */
                  log.debug("Bad connection. Could not roll back");
                }
              }
              // 创建新的 PooledConnection 对象,但是真正的数据库连接并没有创建
              //刚刚获取的真实连接 新瓶装旧酒
              conn = new PooledConnection(oldestActiveConnection.getRealConnection(), this);
              //设置创建时间
              conn.setCreatedTimestamp(oldestActiveConnection.getCreatedTimestamp());
              //设置最后使用时间
              conn.setLastUsedTimestamp(oldestActiveConnection.getLastUsedTimestamp());
              // 设置该超时的旧的连接为无效
              oldestActiveConnection.invalidate();
              if (log.isDebugEnabled()) {
                log.debug("Claimed overdue connection " + conn.getRealHashCode() + ".");
              }
            } else {
              //如果第一个连接还没有超时,说明其他的都没有超时,只能等待了
              // Must wait
              try {
                //等待个数统计
                if (!countedWait) {
                  //等待计数
                  state.hadToWaitCount++;
                  //下次不进行计数
                  countedWait = true;
                }
                if (log.isDebugEnabled()) {
                  log.debug("Waiting as long as " + poolTimeToWait + " milliseconds for connection.");
                }
                long wt = System.currentTimeMillis();
                //超时等待 阻塞等待
                state.wait(poolTimeToWait);
                //统计等待的时长
                state.accumulatedWaitTime += System.currentTimeMillis() - wt;
              } catch (InterruptedException e) {
                //异常退出
                break;
              }
            }
          }
        }
        // 已经获取到连接,如果连接不为空
        if (conn != null) {
          // ping to server and check the connection is valid or not
          //连接状态可用
          if (conn.isValid()) {
            // 如果连连接有效,事务未提交则回滚
            if (!conn.getRealConnection().getAutoCommit()) {
              //进行回滚 恢复到最初状态
              conn.getRealConnection().rollback();
            }
            //设置连接类型
            conn.setConnectionTypeCode(assembleConnectionTypeCode(dataSource.getUrl(), username, password));
            //设置检查超时的时间
            conn.setCheckoutTimestamp(System.currentTimeMillis());
            //设置最后使用时间
            conn.setLastUsedTimestamp(System.currentTimeMillis());
            // 把连接加入到活跃集合中去
            state.activeConnections.add(conn);
            //统计请求计数
            state.requestCount++;
            //统计请求时长
            state.accumulatedRequestTime += System.currentTimeMillis() - t;
          } else {
            //如果验证连接失败
            if (log.isDebugEnabled()) {
              log.debug("A bad connection (" + conn.getRealHashCode() + ") was returned from the pool, getting another connection.");
            }
            //统计错误连接个数
            state.badConnectionCount++;
            localBadConnectionCount++;
            //连接置为null 进行GC
            conn = null;
            //无效连接个数 > 最大空闲连接数 + 失败尝试次数
            //大部分无效连接,可以停止服务了
            if (localBadConnectionCount > (poolMaximumIdleConnections + poolMaximumLocalBadConnectionTolerance)) {
              if (log.isDebugEnabled()) {
                log.debug("PooledDataSource: Could not get a good connection to the database.");
              }
              //抛异常 因为获取的都是不可用的连接
              throw new SQLException("PooledDataSource: Could not get a good connection to the database.");
            }
          }
        }
      }

    }
    //如果连接还未空
    if (conn == null) {
      if (log.isDebugEnabled()) {
        log.debug("PooledDataSource: Unknown severe error condition.  The connection pool returned a null connection.");
      }
      //抛出异常
      throw new SQLException("PooledDataSource: Unknown severe error condition.  The connection pool returned a null connection.");
    }

    //返回连接
    return conn;
  }

以上就是从连接池获取连接的主要逻辑。

关闭连接

把连接放入的连接池中以供下次重新使用,把连接放入到连接池中的方法为 pushConnection 方法,它也是 PooledDataSource 类的一个主要方法。

img

代码逻辑如下

/**
 *  把不用的连接放入到连接池中
 *
 * @param conn
 * @throws SQLException
 */
protected void pushConnection(PooledConnection conn) throws SQLException {
  synchronized (state) {
    // 首先从活跃的集合中移除掉该连接
    state.activeConnections.remove(conn);
    //验证连接是否有效
    if (conn.isValid()) {
      // 如果空闲连接数没有达到最大值,且 PooledConnection 为该连接池的连接
      if (state.idleConnections.size() < poolMaximumIdleConnections && conn.getConnectionTypeCode() == expectedConnectionTypeCode) {
        //验证超时时间汇总
        state.accumulatedCheckoutTime += conn.getCheckoutTime();
        //如果没有自动提交
        if (!conn.getRealConnection().getAutoCommit()) {
          //进行回滚
          conn.getRealConnection().rollback();
        }
        //根据真实连接创建一个新的池化的连接 真是链接还用当前的连接
        PooledConnection newConn = new PooledConnection(conn.getRealConnection(), this);
        //添加到空闲连接
        state.idleConnections.add(newConn);
        //设置创建时间
        newConn.setCreatedTimestamp(conn.getCreatedTimestamp());
        //设置最后使用时间
        newConn.setLastUsedTimestamp(conn.getLastUsedTimestamp());
        //设置原来的连接为无效
        conn.invalidate();
        if (log.isDebugEnabled()) {
          log.debug("Returned connection " + newConn.getRealHashCode() + " to pool.");
        }
        //唤醒阻塞等待的线程
        state.notifyAll();
      } else {
        // 如果空闲连接数已经达到最大值
        //计算超时时间汇总
        state.accumulatedCheckoutTime += conn.getCheckoutTime();
        //未设置自动提交
        if (!conn.getRealConnection().getAutoCommit()) {
          //回滚
          conn.getRealConnection().rollback();
        }
        //关闭真正的数据库连接
        conn.getRealConnection().close();
        if (log.isDebugEnabled()) {
          log.debug("Closed connection " + conn.getRealHashCode() + ".");
        }
        // 设置该连接为无效状态
        conn.invalidate();
      }
    } else {
      //如果连接状态失败
      //打印日志
      if (log.isDebugEnabled()) {
        log.debug("A bad connection (" + conn.getRealHashCode() + ") attempted to return to the pool, discarding connection.");
      }
      //无效连接个数加1
      state.badConnectionCount++;
    }
  }
}

以上代码就是把不用的连接放入到连接池中以供下次使用,

在上面两个方法中,都调用了 isValid 方法来检测连接是否可用。

检查连接

该方法除了检测 valid 字段外,还会调用 pingConnection 方法来尝试让数据库执行测试 SQL 语句,从而检测真正的数据库连接对象是否依然正常可用

/**
 * Method to check to see if a connection is still usable
 *   向数据库发送测试 SQL 来检测真正的数据库连接是否可用
 * @param conn - the connection to check
 * @return True if the connection is still usable
 */
protected boolean pingConnection(PooledConnection conn) {
  // 结果
  boolean result = true;

  try {
    // 检测真正的数据库连接是否已经关闭
    result = !conn.getRealConnection().isClosed();
  } catch (SQLException e) {
    if (log.isDebugEnabled()) {
      log.debug("Connection " + conn.getRealHashCode() + " is BAD: " + e.getMessage());
    }
    result = false;
  }
  // 如果真正的数据库连接还没关闭
  if (result) {
    // 是否执行测试 SQL 语句
    if (poolPingEnabled) {
      // 长时间(poolPingConnectionsNotUsedFor 指定的时长)未使用的连接,才需要ping操作来检测连接是否正常
      if (poolPingConnectionsNotUsedFor >= 0 && conn.getTimeElapsedSinceLastUse() > poolPingConnectionsNotUsedFor) {
        try {
          if (log.isDebugEnabled()) {
            log.debug("Testing connection " + conn.getRealHashCode() + " ...");
          }
          // 发送测试 SQL 语句执行
          Connection realConn = conn.getRealConnection();
          try (Statement statement = realConn.createStatement()) {
            statement.executeQuery(poolPingQuery).close();
          }
          //回滚
          if (!realConn.getAutoCommit()) {
            realConn.rollback();
          }
          result = true;
          if (log.isDebugEnabled()) {
            log.debug("Connection " + conn.getRealHashCode() + " is GOOD!");
          }
        } catch (Exception e) {
          log.warn("Execution of ping query '" + poolPingQuery + "' failed: " + e.getMessage());
          try {
            //关闭真实连接
            conn.getRealConnection().close();
          } catch (Exception e2) {
            //ignore
          }
          result = false;
          if (log.isDebugEnabled()) {
            log.debug("Connection " + conn.getRealHashCode() + " is BAD: " + e.getMessage());
          }
        }
      }
    }
  }
  return result;
}

关闭所有连接

当修改 PooledDataSource 相应的字段,如 数据库的 URL,用户名或密码等,需要将连接池中连接全部关闭,之后获取连接的时候从重新初始化

/**
 * Closes all active and idle connections in the pool.
 * 关闭所有的活动以及空闲连接
 */
public void forceCloseAll() {
  synchronized (state) {
    expectedConnectionTypeCode = assembleConnectionTypeCode(dataSource.getUrl(), dataSource.getUsername(), dataSource.getPassword());
    //遍历关闭活动的连接
    for (int i = state.activeConnections.size(); i > 0; i--) {
      try {
        PooledConnection conn = state.activeConnections.remove(i - 1);
        // 设置连接为无效状态
        conn.invalidate();
        //获取真实连接
        Connection realConn = conn.getRealConnection();
        //是否设置了自动提交
        if (!realConn.getAutoCommit()) {
          //否则进行回滚,保证数据一致性
          realConn.rollback();
        }
        //关闭真实连接
        realConn.close();
      } catch (Exception e) {
        // ignore
      }
    }
    //遍历关闭空闲的连接
    for (int i = state.idleConnections.size(); i > 0; i--) {
      try {
        PooledConnection conn = state.idleConnections.remove(i - 1);
        // 设置为无效状态
        conn.invalidate();
        //获取真实连接
        Connection realConn = conn.getRealConnection();
        //如果没有设置自动提交
        if (!realConn.getAutoCommit()) {
          //进行数据回滚
          realConn.rollback();
        }
        //关闭真实的链接
        realConn.close();
      } catch (Exception e) {
        // ignore
      }
    }
  }
  if (log.isDebugEnabled()) {
    log.debug("PooledDataSource forcefully closed/removed all connections.");
  }
}

总结

在连接池中提到了 连接池中的最大连接数和最大空闲数,在 获取连接和把连接放入连接池中都有判断,

  1. 获取连接:首先从连接池中进行获取,如果连接池中已经没有空闲的连接了,则会判断当前的活跃连接数是否已经达到允许的最大值了,如果没有,则还可以创建新的连接,之后把它放到活跃的集合中进行使用,如果当前活跃的已达到最大值,则阻塞。
  2. 返还连接到连接池,在返还连接的时候,进行判断,如果空闲连接数已达到允许的最大值,则直接关闭真正的数据库连接,否则把该连接放入到空闲集合中以供下次使用。

Mybatis 数据源中,主要的代码逻辑还是在连接池类 PooledDataSource 中,对于获取连接的方法 popConnection,返还连接的方法 pushConnection ,需要结合上图来看,才能看得清楚。