MyBatis-执行器模块
执行器模块
Executor是MyBaits核心接口之一,定义了数据库操作最基本的方法,SqlSession的功能都是基于它来实现的
什么是执行器
什么是执行器?Mybatis中所有的Mapper语句的执行都是通过Executor进行的,Executor是Mybatis的一个核心接口,其定义如下。从其定义的接口方法我们可以看出,对应的增删改语句是通过Executor接口的update方法进行的,查询是通过query方法进行的。虽然Executor接口的实现类有BaseExecutor和CachingExecutor,而BaseExecutor的子类又有SimpleExecutor、ReuseExecutor和BatchExecutor,但BaseExecutor是一个抽象类,其只实现了一些公共的封装,而把真正的核心实现都通过方法抽象出来给子类实现,如doUpdate()、doQuery();CachingExecutor只是在Executor的基础上加入了缓存的功能,底层还是通过Executor调用的,所以真正有作用的Executor只有SimpleExecutor、ReuseExecutor和BatchExecutor。它们都是自己实现的Executor核心功能,没有借助任何其它的Executor实现,它们是实现不同也就注定了它们的功能也是不一样的。Executor是跟SqlSession绑定在一起的,每一个SqlSession都拥有一个新的Executor对象,由Configuration创建。
大致流程
Executor中的大部分方法的调用链其实是差不多的,下面都是深入源码分析执行过程,如果你没有时间或者暂时不想深入研究的话,给你下面的执行流程图作为参考
Executor接口
Executor接口是SQL执行器的核心,所有的sql操作都是通过Executor接口来完成的,底层有各种的实现
/**
* Executor 接口
* @author Clinton Begin
*/
public interface Executor {
/**
* 无结果处理
*/
ResultHandler NO_RESULT_HANDLER = null;
/**
* 更新处理
* @param ms
* @param parameter
* @return
* @throws SQLException
*/
int update(MappedStatement ms, Object parameter) throws SQLException;
/**
* 查询
* @param ms
* @param parameter
* @param rowBounds
* @param resultHandler
* @param cacheKey
* @param boundSql
* @param <E>
* @return
* @throws SQLException
*/
<E> List<E> query(MappedStatement ms, Object parameter, RowBounds rowBounds, ResultHandler resultHandler, CacheKey cacheKey, BoundSql boundSql) throws SQLException;
/**
* 查询
* @param ms
* @param parameter
* @param rowBounds
* @param resultHandler
* @param <E>
* @return
* @throws SQLException
*/
<E> List<E> query(MappedStatement ms, Object parameter, RowBounds rowBounds, ResultHandler resultHandler) throws SQLException;
/**
* 游标查询
* @param ms
* @param parameter
* @param rowBounds
* @param <E>
* @return
* @throws SQLException
*/
<E> Cursor<E> queryCursor(MappedStatement ms, Object parameter, RowBounds rowBounds) throws SQLException;
/**
* 刷新数据库
* @return
* @throws SQLException
*/
List<BatchResult> flushStatements() throws SQLException;
/**
* 提交事务
* @param required
* @throws SQLException
*/
void commit(boolean required) throws SQLException;
/**
* 回滚实物
* @param required
* @throws SQLException
*/
void rollback(boolean required) throws SQLException;
/**
* 生成缓存Key
* @param ms
* @param parameterObject
* @param rowBounds
* @param boundSql
* @return
*/
CacheKey createCacheKey(MappedStatement ms, Object parameterObject, RowBounds rowBounds, BoundSql boundSql);
/**
* 是否已经缓存
* @param ms
* @param key
* @return
*/
boolean isCached(MappedStatement ms, CacheKey key);
/**
* 清除缓存
*/
void clearLocalCache();
/**
* 延迟加载
* @param ms
* @param resultObject
* @param property
* @param key
* @param targetType
*/
void deferLoad(MappedStatement ms, MetaObject resultObject, String property, CacheKey key, Class<?> targetType);
/**
* 获取事务
* @return
*/
Transaction getTransaction();
/**
* 关闭
* @param forceRollback
*/
void close(boolean forceRollback);
/**
* 是否关闭
* @return
*/
boolean isClosed();
/**
* 设置执行器增强器
* @param executor
*/
void setExecutorWrapper(Executor executor);
}
CachingExecutor类
缓存执行器相对于其他执行器的差别在于,首先是在query()方法中判断是否使用二级缓存(也就是mapper级别的缓存)。虽然mybatis默认启用了CachingExecutor,但是如果在mapper层面没有明确设置二级缓存的话,就退化为SimpleExecutor了。二级缓存的维护由TransactionalCache(事务化缓存)负责,当在TransactionalCacheManager(事务化缓存管理器)中调用putObject和removeObject方法的时候并不是马上就把对象存放到缓存或者从缓存中删除,而是先把这个对象放到entriesToAddOnCommit和entriesToRemoveOnCommit这两个HashMap之中的一个里,然后当执行commit/rollback方法时再真正地把对象存放到缓存或者从缓存中删除,具体可以参见TransactionalCache.commit/rollback方法。
这个Executor执行类是使用内存的,将数据保存到缓存中,这样可以有效的解决增删改查性能。缓存的作用域为mapper(namespace),所以同一个namespace的操作会影响到相同作用域下的缓存,这样不同的域对同一个表进行操作很有可能导致缓存中的数据不一样,出现问题,还是不推荐使用。
/**
* 缓存执行器
*
* @author Clinton Begin
* @author Eduardo Macarron
*/
public class CachingExecutor implements Executor {
/**
*
*/
private final Executor delegate;
//事务缓存管理器
private final TransactionalCacheManager tcm = new TransactionalCacheManager();
/**
* 构造方法
*
* @param delegate
*/
public CachingExecutor(Executor delegate) {
this.delegate = delegate;
delegate.setExecutorWrapper(this);
}
/**
* 获取事务
*
* @return
*/
@Override
public Transaction getTransaction() {
return delegate.getTransaction();
}
/**
* 关闭
*
* @param forceRollback
*/
@Override
public void close(boolean forceRollback) {
try {
//issues #499, #524 and #573
if (forceRollback) {
tcm.rollback();
} else {
tcm.commit();
}
} finally {
delegate.close(forceRollback);
}
}
/**
* 检查是否关闭
*
* @return
*/
@Override
public boolean isClosed() {
return delegate.isClosed();
}
/**
* 更新操作
*
* @param ms
* @param parameterObject
* @return
* @throws SQLException
*/
@Override
public int update(MappedStatement ms, Object parameterObject) throws SQLException {
//刷新缓存
flushCacheIfRequired(ms);
return delegate.update(ms, parameterObject);
}
/**
* 查询
*
* @param ms
* @param parameterObject
* @param rowBounds
* @param resultHandler
* @param <E>
* @return
* @throws SQLException
*/
@Override
public <E> List<E> query(MappedStatement ms, Object parameterObject, RowBounds rowBounds, ResultHandler resultHandler) throws SQLException {
//获取BoundSql
BoundSql boundSql = ms.getBoundSql(parameterObject);
//创建缓存key
CacheKey key = createCacheKey(ms, parameterObject, rowBounds, boundSql);
return query(ms, parameterObject, rowBounds, resultHandler, key, boundSql);
}
/**
* 游标查询
*
* @param ms
* @param parameter
* @param rowBounds
* @param <E>
* @return
* @throws SQLException
*/
@Override
public <E> Cursor<E> queryCursor(MappedStatement ms, Object parameter, RowBounds rowBounds) throws SQLException {
//刷新环迅
flushCacheIfRequired(ms);
return delegate.queryCursor(ms, parameter, rowBounds);
}
/**
* 查询
*
* @param ms
* @param parameterObject
* @param rowBounds
* @param resultHandler
* @param key
* @param boundSql
* @param <E>
* @return
* @throws SQLException
*/
@Override
public <E> List<E> query(MappedStatement ms, Object parameterObject, RowBounds rowBounds, ResultHandler resultHandler, CacheKey key, BoundSql boundSql)
throws SQLException {
//获取二级缓存
Cache cache = ms.getCache();
if (cache != null) {
//是否刷新二级缓存
flushCacheIfRequired(ms);
//开启了缓存
if (ms.isUseCache() && resultHandler == null) {
ensureNoOutParams(ms, boundSql);
//获取缓存数据
/**
* PerpetualCache是默认二级缓存实现类
* Map<Object, Object> cache = new HashMap<Object, Object>(); map的key就是CacheKey key
* CacheKey中有个hashcode = multiplier * hashcode + 每个update(Object object)object的hashCode()
* update()方法会向updateList添加元素
* CacheKey重写的equals()方法中先判断hashcode是否相等
* 然后用updateList每个对象的equals()判断
* 这两个条件都满足就说明缓存命中,cache.get(key)也就有值
*/
@SuppressWarnings("unchecked")
List<E> list = (List<E>) tcm.getObject(cache, key);
//如果没有缓存进行数据库查询
if (list == null) {
//进行数据库查询 底层调用queryFromDatabase
list = delegate.query(ms, parameterObject, rowBounds, resultHandler, key, boundSql);
//将数据放入缓存
tcm.putObject(cache, key, list); // issue #578 and #116
}
return list;
}
}
//未开启缓存或缓存不生效直接查询
return delegate.query(ms, parameterObject, rowBounds, resultHandler, key, boundSql);
}
/**
* 刷新Statements
*
* @return
* @throws SQLException
*/
@Override
public List<BatchResult> flushStatements() throws SQLException {
return delegate.flushStatements();
}
/**
* 提交事务
*
* @param required
* @throws SQLException
*/
@Override
public void commit(boolean required) throws SQLException {
delegate.commit(required);
tcm.commit();
}
/**
* 回滚事务
*
* @param required
* @throws SQLException
*/
@Override
public void rollback(boolean required) throws SQLException {
try {
delegate.rollback(required);
} finally {
if (required) {
tcm.rollback();
}
}
}
private void ensureNoOutParams(MappedStatement ms, BoundSql boundSql) {
if (ms.getStatementType() == StatementType.CALLABLE) {
for (ParameterMapping parameterMapping : boundSql.getParameterMappings()) {
if (parameterMapping.getMode() != ParameterMode.IN) {
throw new ExecutorException("Caching stored procedures with OUT params is not supported. Please configure useCache=false in " + ms.getId() + " statement.");
}
}
}
}
/**
* 创建缓存key
*
* @param ms
* @param parameterObject
* @param rowBounds
* @param boundSql
* @return
*/
@Override
public CacheKey createCacheKey(MappedStatement ms, Object parameterObject, RowBounds rowBounds, BoundSql boundSql) {
return delegate.createCacheKey(ms, parameterObject, rowBounds, boundSql);
}
/**
* 是否已经缓存
*
* @param ms
* @param key
* @return
*/
@Override
public boolean isCached(MappedStatement ms, CacheKey key) {
return delegate.isCached(ms, key);
}
/**
* 延迟加载
*
* @param ms
* @param resultObject
* @param property
* @param key
* @param targetType
*/
@Override
public void deferLoad(MappedStatement ms, MetaObject resultObject, String property, CacheKey key, Class<?> targetType) {
delegate.deferLoad(ms, resultObject, property, key, targetType);
}
/**
* 清除本地缓存
*/
@Override
public void clearLocalCache() {
delegate.clearLocalCache();
}
/**
* 是否刷新二级缓存
*
* @param ms
*/
private void flushCacheIfRequired(MappedStatement ms) {
Cache cache = ms.getCache();
if (cache != null && ms.isFlushCacheRequired()) {
tcm.clear(cache);
}
}
@Override
public void setExecutorWrapper(Executor executor) {
throw new UnsupportedOperationException("This method should not be called");
}
}
BaseExecutor类
它是一个实现了Executor接口的抽象类,实现了接口中的大部分方法,其中就是使用了模板模式,它主要提供了缓存和事物管理的基本功能,不同的实现类,只要实现4个基本方法来完成数据库的相关操作,这4个抽象方法:
- doUpdate()
- doQuery()
- doFlushStatement()
- doQueryCursor。
<settings>
<setting name="defaultExecutorType" value="SIMPLE"/> <!--SIMPLE、REUSE、BATCH-->
</settings>
配置之后在Configuration类中的newExecutor()函数会选择具体使用的子类,实现如下:
/**
* 创建一个新的执行器
*
* @param transaction 事务
* @param executorType 执行器类型
* @return
*/
public Executor newExecutor(Transaction transaction, ExecutorType executorType) {
//根据executorType来选择实现子类
executorType = executorType == null ? defaultExecutorType : executorType;
//默认执行器为SIMPLE类型
executorType = executorType == null ? ExecutorType.SIMPLE : executorType;
Executor executor;
//批处理执行器
if (ExecutorType.BATCH == executorType) {
executor = new BatchExecutor(this, transaction);
//可复用执行器
} else if (ExecutorType.REUSE == executorType) {
executor = new ReuseExecutor(this, transaction);
} else {
executor = new SimpleExecutor(this, transaction);
}
//是否使用全局缓存
if (cacheEnabled) {
executor = new CachingExecutor(executor);
}
executor = (Executor) interceptorChain.pluginAll(executor);
return executor;
}
结构信息
/**
* @author Clinton Begin
*/
public abstract class BaseExecutor implements Executor {
private static final Log log = LogFactory.getLog(BaseExecutor.class);
//事务
protected Transaction transaction;
//Executor 增强
protected Executor wrapper;
//线程安全的延迟加载队列
protected ConcurrentLinkedQueue<DeferredLoad> deferredLoads;
//本地缓存
protected PerpetualCache localCache;
//本地参数缓存
protected PerpetualCache localOutputParameterCache;
//mybatis的配置信息
protected Configuration configuration;
//查询堆栈
protected int queryStack;
//关闭状态
private boolean closed;
/**
* 构造方法
*
* @param configuration
* @param transaction
*/
protected BaseExecutor(Configuration configuration, Transaction transaction) {
this.transaction = transaction;
this.deferredLoads = new ConcurrentLinkedQueue<>();
this.localCache = new PerpetualCache("LocalCache");
this.localOutputParameterCache = new PerpetualCache("LocalOutputParameterCache");
this.closed = false;
this.configuration = configuration;
this.wrapper = this;
}
}
查询相关
先从缓存中查询,不存在然后再从物理数据库中查询
/**
* 查询
*
* @param ms
* @param parameter
* @param rowBounds
* @param resultHandler
* @param <E>
* @return
* @throws SQLException
*/
@Override
public <E> List<E> query(MappedStatement ms, Object parameter, RowBounds rowBounds, ResultHandler resultHandler) throws SQLException {
//获取绑定sql
BoundSql boundSql = ms.getBoundSql(parameter);
//创建缓存key
CacheKey key = createCacheKey(ms, parameter, rowBounds, boundSql);
return query(ms, parameter, rowBounds, resultHandler, key, boundSql);
}
/**
* 查询
*
* @param ms
* @param parameter
* @param rowBounds
* @param resultHandler
* @param key
* @param boundSql
* @param <E>
* @return
* @throws SQLException
*/
@SuppressWarnings("unchecked")
@Override
public <E> List<E> query(MappedStatement ms, Object parameter, RowBounds rowBounds, ResultHandler resultHandler, CacheKey key, BoundSql boundSql) throws SQLException {
ErrorContext.instance().resource(ms.getResource()).activity("executing a query").object(ms.getId());
//如果已经关闭,报错
if (closed) {
throw new ExecutorException("Executor was closed.");
}
//先清局部缓存,再查询,但仅仅查询堆栈为0才清,为了处理递归调用
if (queryStack == 0 && ms.isFlushCacheRequired()) {
clearLocalCache();
}
List<E> list;
try {
//加一,这样递归调用到上面的时候就不会再清局部缓存了
queryStack++;
//根据cachekey从localCache去查
list = resultHandler == null ? (List<E>) localCache.getObject(key) : null;
if (list != null) {
//如果查到localCache缓存,处理localOutputParameterCache
handleLocallyCachedOutputParameters(ms, key, parameter, boundSql);
} else {
//从数据库查
list = queryFromDatabase(ms, parameter, rowBounds, resultHandler, key, boundSql);
}
} finally {
//清空堆栈
queryStack--;
}
if (queryStack == 0) {
//延迟加载队列中所有元素
for (DeferredLoad deferredLoad : deferredLoads) {
deferredLoad.load();
}
//清空延迟加载队列
deferredLoads.clear();
//如果是statement,清本地缓存
if (configuration.getLocalCacheScope() == LocalCacheScope.STATEMENT) {
// issue #482
clearLocalCache();
}
}
return list;
}
数据库中查询数据
从数据库中查询数据最终调用的是抽象方法doQuery
/**
* 从数据库中查询数据
*
* @param ms
* @param parameter
* @param rowBounds
* @param resultHandler
* @param key
* @param boundSql
* @param <E>
* @return
* @throws SQLException
*/
private <E> List<E> queryFromDatabase(MappedStatement ms, Object parameter, RowBounds rowBounds, ResultHandler resultHandler, CacheKey key, BoundSql boundSql) throws SQLException {
List<E> list;
//向缓存中放入占位符
localCache.putObject(key, EXECUTION_PLACEHOLDER);
try {
//执行查询
list = doQuery(ms, parameter, rowBounds, resultHandler, boundSql);
} finally {
//清除占位符
localCache.removeObject(key);
}
//讲查询结构放入缓存
localCache.putObject(key, list);
//如果是存储过程,OUT参数也加入缓存
if (ms.getStatementType() == StatementType.CALLABLE) {
localOutputParameterCache.putObject(key, parameter);
}
return list;
}
更新方法
查询,删除,更新最终都是调用的update方法,然后调用doUpdate抽象方法
/**
* 更新
*
* @param ms
* @param parameter
* @return
* @throws SQLException
*/
@Override
public int update(MappedStatement ms, Object parameter) throws SQLException {
ErrorContext.instance().resource(ms.getResource()).activity("executing an update").object(ms.getId());
if (closed) {
throw new ExecutorException("Executor was closed.");
}
//清除缓存
clearLocalCache();
//执行更新
return doUpdate(ms, parameter);
}
缓存相关
MyBatis 对于其 Key 的生成采取规则为:[mappedStementId + offset + limit + SQL + queryParams + environment]生成一个哈希码
/**
* 创建缓存key
*
* @param ms
* @param parameterObject
* @param rowBounds
* @param boundSql
* @return
*/
@Override
public CacheKey createCacheKey(MappedStatement ms, Object parameterObject, RowBounds rowBounds, BoundSql boundSql) {
if (closed) {
throw new ExecutorException("Executor was closed.");
}
//MyBatis 对于其 Key 的生成采取规则为:[mappedStementId + offset + limit + SQL + queryParams + environment]生成一个哈希码
CacheKey cacheKey = new CacheKey();
cacheKey.update(ms.getId());
cacheKey.update(rowBounds.getOffset());
cacheKey.update(rowBounds.getLimit());
cacheKey.update(boundSql.getSql());
List<ParameterMapping> parameterMappings = boundSql.getParameterMappings();
TypeHandlerRegistry typeHandlerRegistry = ms.getConfiguration().getTypeHandlerRegistry();
// mimic DefaultParameterHandler logic
for (ParameterMapping parameterMapping : parameterMappings) {
if (parameterMapping.getMode() != ParameterMode.OUT) {
Object value;
String propertyName = parameterMapping.getProperty();
if (boundSql.hasAdditionalParameter(propertyName)) {
value = boundSql.getAdditionalParameter(propertyName);
} else if (parameterObject == null) {
value = null;
} else if (typeHandlerRegistry.hasTypeHandler(parameterObject.getClass())) {
value = parameterObject;
} else {
MetaObject metaObject = configuration.newMetaObject(parameterObject);
value = metaObject.getValue(propertyName);
}
cacheKey.update(value);
}
}
if (configuration.getEnvironment() != null) {
// issue #176
cacheKey.update(configuration.getEnvironment().getId());
}
return cacheKey;
}
/**
* 是否已经缓存
*
* @param ms
* @param key
* @return
*/
@Override
public boolean isCached(MappedStatement ms, CacheKey key) {
return localCache.getObject(key) != null;
}
/**
* 清空本地缓存,一个map结构
*/
@Override
public void clearLocalCache() {
if (!closed) {
//清空本地缓存
localCache.clear();
//清空本地参数缓存
localOutputParameterCache.clear();
}
}
事务相关
/**
* 获取事务
*
* @return
*/
@Override
public Transaction getTransaction() {
if (closed) {
throw new ExecutorException("Executor was closed.");
}
return transaction;
}
/**
* 提交事务
*
* @param required
* @throws SQLException
*/
@Override
public void commit(boolean required) throws SQLException {
if (closed) {
throw new ExecutorException("Cannot commit, transaction is already closed");
}
//清空本地缓存
clearLocalCache();
//刷新statements对象
flushStatements();
//required为true则自动提交
if (required) {
transaction.commit();
}
}
/**
* 回滚事务
*
* @param required
* @throws SQLException
*/
@Override
public void rollback(boolean required) throws SQLException {
if (!closed) {
try {
//清空本地缓存
clearLocalCache();
//刷新statements对象
flushStatements(true);
} finally {
//required为true则回滚事务
if (required) {
transaction.rollback();
}
}
}
}
SimpleExecutor类
SimpleExecutor通过类名可以看出,它是一个简单的执行类,并不会做一些处理就执行sql
/**
* @author Clinton Begin
*/
public class SimpleExecutor extends BaseExecutor {
/**
* 构造方法
*
* @param configuration
* @param transaction
*/
public SimpleExecutor(Configuration configuration, Transaction transaction) {
super(configuration, transaction);
}
/**
* 更新操作
*
* @param ms
* @param parameter
* @return
* @throws SQLException
*/
@Override
public int doUpdate(MappedStatement ms, Object parameter) throws SQLException {
Statement stmt = null;
try {
//获取Configuration对象
Configuration configuration = ms.getConfiguration();
//创建StatementHandler对象
StatementHandler handler = configuration.newStatementHandler(this, ms, parameter, RowBounds.DEFAULT, null, null);
stmt = prepareStatement(handler, ms.getStatementLog());
//最终是一个statement进行处理
return handler.update(stmt);
} finally {
//关闭Statement
closeStatement(stmt);
}
}
/**
* 查询操作
*
* @param ms
* @param parameter
* @param rowBounds
* @param resultHandler
* @param boundSql
* @param <E>
* @return
* @throws SQLException
*/
@Override
public <E> List<E> doQuery(MappedStatement ms, Object parameter, RowBounds rowBounds, ResultHandler resultHandler, BoundSql boundSql) throws SQLException {
Statement stmt = null;
try {
//获取Configuration对象
Configuration configuration = ms.getConfiguration();
//创建StatementHandler对象
StatementHandler handler = configuration.newStatementHandler(wrapper, ms, parameter, rowBounds, resultHandler, boundSql);
stmt = prepareStatement(handler, ms.getStatementLog());
//最终是一个statement进行处理
return handler.query(stmt, resultHandler);
} finally {
//关闭Statement
closeStatement(stmt);
}
}
/**
* 游标查询
*
* @param ms
* @param parameter
* @param rowBounds
* @param boundSql
* @param <E>
* @return
* @throws SQLException
*/
@Override
protected <E> Cursor<E> doQueryCursor(MappedStatement ms, Object parameter, RowBounds rowBounds, BoundSql boundSql) throws SQLException {
Configuration configuration = ms.getConfiguration();
StatementHandler handler = configuration.newStatementHandler(wrapper, ms, parameter, rowBounds, null, boundSql);
Statement stmt = prepareStatement(handler, ms.getStatementLog());
stmt.closeOnCompletion();
return handler.queryCursor(stmt);
}
/**
* 刷新Statements
*
* @param isRollback
* @return
*/
@Override
public List<BatchResult> doFlushStatements(boolean isRollback) {
return Collections.emptyList();
}
/**
* 获取prepareStatement
*
* @param handler
* @param statementLog
* @return
* @throws SQLException
*/
private Statement prepareStatement(StatementHandler handler, Log statementLog) throws SQLException {
Statement stmt;
Connection connection = getConnection(statementLog);
stmt = handler.prepare(connection, transaction.getTimeout());
handler.parameterize(stmt);
return stmt;
}
}
BatchExecutor类
通过批量操作来提高性能
/**
* 批量执行器
*
* @author Jeff Butler
*/
public class BatchExecutor extends BaseExecutor {
public static final int BATCH_UPDATE_RETURN_VALUE = Integer.MIN_VALUE + 1002;
//Statement列表
private final List<Statement> statementList = new ArrayList<>();
//结果集列表
private final List<BatchResult> batchResultList = new ArrayList<>();
//当前需要执行的sql
private String currentSql;
//当前操作的MappedStatement
private MappedStatement currentStatement;
/**
* 构造方法
*
* @param configuration
* @param transaction
*/
public BatchExecutor(Configuration configuration, Transaction transaction) {
super(configuration, transaction);
}
@Override
public int doUpdate(MappedStatement ms, Object parameterObject) throws SQLException {
//获得配置信息
final Configuration configuration = ms.getConfiguration();
//获得StatementHandler
final StatementHandler handler = configuration.newStatementHandler(this, ms, parameterObject, RowBounds.DEFAULT, null, null);
//获取BoundSql
final BoundSql boundSql = handler.getBoundSql();
//获取需要执行的sql
final String sql = boundSql.getSql();
final Statement stmt;
//如果sql语句等于当前sql MappedStatement 等于当前Map碰到Statement
if (sql.equals(currentSql) && ms.equals(currentStatement)) {
int last = statementList.size() - 1;
//获取最后一个Statement
stmt = statementList.get(last);
applyTransactionTimeout(stmt);
handler.parameterize(stmt);//fix Issues 322
//有相同的MappedStatement和参数
BatchResult batchResult = batchResultList.get(last);
batchResult.addParameterObject(parameterObject);
} else {
//如果不存在就创建一个批处理操作
//获取连接
Connection connection = getConnection(ms.getStatementLog());
stmt = handler.prepare(connection, transaction.getTimeout());
handler.parameterize(stmt); //fix Issues 322
currentSql = sql;
currentStatement = ms;
//添加批量处理操作的Statement
statementList.add(stmt);
//添加结果集
batchResultList.add(new BatchResult(ms, sql, parameterObject));
}
//最终是调用jdbc的批处理操作
handler.batch(stmt);
return BATCH_UPDATE_RETURN_VALUE;
}
/**
* 批量查询
*
* @param ms
* @param parameterObject
* @param rowBounds
* @param resultHandler
* @param boundSql
* @param <E>
* @return
* @throws SQLException
*/
@Override
public <E> List<E> doQuery(MappedStatement ms, Object parameterObject, RowBounds rowBounds, ResultHandler resultHandler, BoundSql boundSql)
throws SQLException {
Statement stmt = null;
try {
//刷新Statements
flushStatements();
//获取配置信息
Configuration configuration = ms.getConfiguration();
//获得StatementHandler
StatementHandler handler = configuration.newStatementHandler(wrapper, ms, parameterObject, rowBounds, resultHandler, boundSql);
//获取连接
Connection connection = getConnection(ms.getStatementLog());
stmt = handler.prepare(connection, transaction.getTimeout());
handler.parameterize(stmt);
//执行查询
return handler.query(stmt, resultHandler);
} finally {
//关闭Statement
closeStatement(stmt);
}
}
/**
* 批量操作游标
*
* @param ms
* @param parameter
* @param rowBounds
* @param boundSql
* @param <E>
* @return
* @throws SQLException
*/
@Override
protected <E> Cursor<E> doQueryCursor(MappedStatement ms, Object parameter, RowBounds rowBounds, BoundSql boundSql) throws SQLException {
flushStatements();
Configuration configuration = ms.getConfiguration();
StatementHandler handler = configuration.newStatementHandler(wrapper, ms, parameter, rowBounds, null, boundSql);
Connection connection = getConnection(ms.getStatementLog());
Statement stmt = handler.prepare(connection, transaction.getTimeout());
stmt.closeOnCompletion();
handler.parameterize(stmt);
return handler.queryCursor(stmt);
}
/**
* 批量刷新Statements 记录执行次数
*
* @param isRollback
* @return
* @throws SQLException
*/
@Override
public List<BatchResult> doFlushStatements(boolean isRollback) throws SQLException {
try {
List<BatchResult> results = new ArrayList<>();
if (isRollback) {
return Collections.emptyList();
}
//如果进行了批量处理
for (int i = 0, n = statementList.size(); i < n; i++) {
Statement stmt = statementList.get(i);
applyTransactionTimeout(stmt);
BatchResult batchResult = batchResultList.get(i);
try {
//记录批量处理执行操作的条数
batchResult.setUpdateCounts(stmt.executeBatch());
MappedStatement ms = batchResult.getMappedStatement();
//参数对象集合
List<Object> parameterObjects = batchResult.getParameterObjects();
//生成key
KeyGenerator keyGenerator = ms.getKeyGenerator();
if (Jdbc3KeyGenerator.class.equals(keyGenerator.getClass())) {
Jdbc3KeyGenerator jdbc3KeyGenerator = (Jdbc3KeyGenerator) keyGenerator;
jdbc3KeyGenerator.processBatch(ms, stmt, parameterObjects);
} else if (!NoKeyGenerator.class.equals(keyGenerator.getClass())) { //issue #141
for (Object parameter : parameterObjects) {
keyGenerator.processAfter(this, ms, stmt, parameter);
}
}
// Close statement to close cursor #1109
closeStatement(stmt);
} catch (BatchUpdateException e) {
StringBuilder message = new StringBuilder();
message.append(batchResult.getMappedStatement().getId())
.append(" (batch index #")
.append(i + 1)
.append(")")
.append(" failed.");
if (i > 0) {
message.append(" ")
.append(i)
.append(" prior sub executor(s) completed successfully, but will be rolled back.");
}
throw new BatchExecutorException(message.toString(), e, results, batchResult);
}
//记录操作
results.add(batchResult);
}
return results;
} finally {
for (Statement stmt : statementList) {
closeStatement(stmt);
}
currentSql = null;
statementList.clear();
batchResultList.clear();
}
}
}
ReuseExecutor类
ReuseExecutor,顾名思义,是可以重用的Executor。它重用的是Statement对象,它会在内部利用一个Map把创建的Statement都缓存起来,每次在执行一条SQL语句时,它都会去判断之前是否存在基于该SQL缓存的Statement对象,存在而且之前缓存的Statement对象对应的Connection还没有关闭的时候就继续用之前的Statement对象,否则将创建一个新的Statement对象,并将其缓存起来。因为每一个新的SqlSession都有一个新的Executor对象,所以我们缓存在ReuseExecutor上的Statement的作用域是同一个SqlSession
/**
* 重复执行器
*
* @author Clinton Begin
*/
public class ReuseExecutor extends BaseExecutor {
/**
* Statement对象缓存
* key :sql
* value: Statement对象
*/
private final Map<String, Statement> statementMap = new HashMap<>();
/**
* 构造方法
*
* @param configuration
* @param transaction
*/
public ReuseExecutor(Configuration configuration, Transaction transaction) {
super(configuration, transaction);
}
/**
* 执行更新
*
* @param ms
* @param parameter
* @return
* @throws SQLException
*/
@Override
public int doUpdate(MappedStatement ms, Object parameter) throws SQLException {
//获取配置信息
Configuration configuration = ms.getConfiguration();
//获取StatementHandler
StatementHandler handler = configuration.newStatementHandler(this, ms, parameter, RowBounds.DEFAULT, null, null);
//获取Statement
Statement stmt = prepareStatement(handler, ms.getStatementLog());
//执行更新
return handler.update(stmt);
}
/**
* 执行查询
*
* @param ms
* @param parameter
* @param rowBounds
* @param resultHandler
* @param boundSql
* @param <E>
* @return
* @throws SQLException
*/
@Override
public <E> List<E> doQuery(MappedStatement ms, Object parameter, RowBounds rowBounds, ResultHandler resultHandler, BoundSql boundSql) throws SQLException {
//获取配置信息
Configuration configuration = ms.getConfiguration();
//获取StatementHandler
StatementHandler handler = configuration.newStatementHandler(wrapper, ms, parameter, rowBounds, resultHandler, boundSql);
//获取Statement
Statement stmt = prepareStatement(handler, ms.getStatementLog());
//执行查询
return handler.query(stmt, resultHandler);
}
/**
* 游标查询
*
* @param ms
* @param parameter
* @param rowBounds
* @param boundSql
* @param <E>
* @return
* @throws SQLException
*/
@Override
protected <E> Cursor<E> doQueryCursor(MappedStatement ms, Object parameter, RowBounds rowBounds, BoundSql boundSql) throws SQLException {
//获取配置信息
Configuration configuration = ms.getConfiguration();
//获取StatementHandler
StatementHandler handler = configuration.newStatementHandler(wrapper, ms, parameter, rowBounds, null, boundSql);
//获取Statement
Statement stmt = prepareStatement(handler, ms.getStatementLog());
//执行查询
return handler.queryCursor(stmt);
}
/**
* 刷新Statements
*
* @param isRollback
* @return
*/
@Override
public List<BatchResult> doFlushStatements(boolean isRollback) {
for (Statement stmt : statementMap.values()) {
closeStatement(stmt);
}
statementMap.clear();
return Collections.emptyList();
}
/**
* 处理Statement
*
* @param handler
* @param statementLog
* @return
* @throws SQLException
*/
private Statement prepareStatement(StatementHandler handler, Log statementLog) throws SQLException {
Statement stmt;
//获取BoundSql
BoundSql boundSql = handler.getBoundSql();
//获取执行sql
String sql = boundSql.getSql();
//检查sql对应的statement是否存在
if (hasStatementFor(sql)) {
//存在则获取Statement对象
stmt = getStatement(sql);
//设置事务超时时间
applyTransactionTimeout(stmt);
} else {
//不存在则创建Statement
//获取连接
Connection connection = getConnection(statementLog);
//创建Statement对象
stmt = handler.prepare(connection, transaction.getTimeout());
//将Statement放入map
putStatement(sql, stmt);
}
handler.parameterize(stmt);
return stmt;
}
/**
* 检查sql对应的statement是否存在
*
* @param sql
* @return
*/
private boolean hasStatementFor(String sql) {
try {
return statementMap.keySet().contains(sql) && !statementMap.get(sql).getConnection().isClosed();
} catch (SQLException e) {
return false;
}
}
/**
* 根据sql获取Statement对象
*
* @param s
* @return
*/
private Statement getStatement(String s) {
return statementMap.get(s);
}
/**
* 讲Statement对象放入map
*
* @param sql
* @param stmt
*/
private void putStatement(String sql, Statement stmt) {
statementMap.put(sql, stmt);
}
}