Mybatis源码学习:mapper关于事务的补充

  最近在整理某个系统需要优化的点的时候,发现在查询时速度很慢,经过排查代码,发现同事在某个service出现了循环调用另一个service的方法,而另一份servcei是查询数据库的,而这些service都没使用@@Transactional事务,同时,根据debug打印的sql查询语句,发现单条sql语句查询都很快,初步怀疑是频繁创建会话,重复打开关闭连接消耗了大量时间,于是重写了这个service的方法,直接使用相应的mapper,然后使用@@Transactional声明为一个事务,这样所有的sql都在一个会话中了,只打开关闭了一次连接,响应的速度极大的提高了。

  那么,在之前的mapper注册注入的文章中,学习到通过注入mapper的方式时,每次调用方法都是创建一个sqlsession。而使用@@Transactional事务注解后,service方法中所有mapper的调用都是一个会话了,在service方法结束之后统一对会话进行提交,那么这又是怎么实现的呢。

  通过之前的学习,已知道注入后,本质上是SqlSessionTemplate通过内部代理类SqlSessionInterceptor对象sqlSessionProxy,再看一遍SqlSessionInterceptorinvoke()方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
SqlSession sqlSession = getSqlSession(
SqlSessionTemplate.this.sqlSessionFactory,
SqlSessionTemplate.this.executorType,
SqlSessionTemplate.this.exceptionTranslator);
try {
Object result = method.invoke(sqlSession, args);
if (!isSqlSessionTransactional(sqlSession, SqlSessionTemplate.this.sqlSessionFactory)) {
sqlSession.commit(true);
}
return result;
} catch (Throwable t) {
Throwable unwrapped = unwrapThrowable(t);
if (SqlSessionTemplate.this.exceptionTranslator != null && unwrapped instanceof PersistenceException) {
closeSqlSession(sqlSession, SqlSessionTemplate.this.sqlSessionFactory);
sqlSession = null;
Throwable translated = SqlSessionTemplate.this.exceptionTranslator.translateExceptionIfPossible((PersistenceException) unwrapped);
if (translated != null) {
unwrapped = translated;
}
}
throw unwrapped;
} finally {
if (sqlSession != null) {
closeSqlSession(sqlSession, SqlSessionTemplate.this.sqlSessionFactory);
}
}
}

  先getSqlSession()

1
2
3
4
5
6
7
8
9
10
11
public static SqlSession getSqlSession(SqlSessionFactory sessionFactory, ExecutorType executorType, PersistenceExceptionTranslator exceptionTranslator) {
SqlSessionHolder holder = (SqlSessionHolder) TransactionSynchronizationManager.getResource(sessionFactory);
SqlSession session = sessionHolder(executorType, holder);
if (session != null) {
return session;
}
LOGGER.debug(() -> "Creating a new SqlSession");
session = sessionFactory.openSession(executorType);
registerSessionHolder(sessionFactory, executorType, exceptionTranslator, session);
return session;
}

  从getSqlSession()这个方法中可以看到,先通过TransactionSynchronizationManager.getResource(sessionFactory)获取SqlSessionHolder,从TransactionSynchronizationManager类的名字似乎是关于事务的,进入getResource这个方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
   private static final ThreadLocal<Map<Object, Object>> resources =
new NamedThreadLocal<>("Transactional resources");
@Nullable
public static Object getResource(Object key) {
Object actualKey = TransactionSynchronizationUtils.unwrapResourceIfNecessary(key);
Object value = doGetResource(actualKey);
// 日志打印
return value;
}
private static Object doGetResource(Object actualKey) {
Map<Object, Object> map = resources.get();
if (map == null) {
return null;
}
Object value = map.get(actualKey);
if (value instanceof ResourceHolder && ((ResourceHolder) value).isVoid()) {
map.remove(actualKey);
if (map.isEmpty()) {
resources.remove();
}
value = null;
}
return value;
}

  可以看到,该类的resources属性是一个ThreadLocal对象,泛型是Map<Object, Object>,也就是说保存和当前线程绑定的一个map,然后根据key获取map中保存的对象

service层方法中mapper的调用都是在一个线程中进行的

  而SqlSessionUtils#getSqlSession()根据sessionFactory获取了一个SqlSessionHolder对象,反推之,说明在某个过程中,曾经放入了一个SqlSessionHolder对象,其实在这个方法后面有句registerSessionHolder(sessionFactory, executorType, exceptionTranslator, session);这就是放入的操作,这个操作是在获取不到SqlSessionHolder,也就获取不到session之后,创建一个新的sqlSession,然后构造一个SqlSessionHolder对象,注册到TransactionSynchronizationManagerresourcesmap中,具体看下registerSessionHolder()这个方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
private static void registerSessionHolder(SqlSessionFactory sessionFactory, ExecutorType executorType,
PersistenceExceptionTranslator exceptionTranslator, SqlSession session) {
SqlSessionHolder holder;
if (TransactionSynchronizationManager.isSynchronizationActive()) {
Environment environment = sessionFactory.getConfiguration().getEnvironment();
if (environment.getTransactionFactory() instanceof SpringManagedTransactionFactory) {
LOGGER.debug(() -> "Registering transaction synchronization for SqlSession [" + session + "]");
holder = new SqlSessionHolder(session, executorType, exceptionTranslator);
TransactionSynchronizationManager.bindResource(sessionFactory, holder);
TransactionSynchronizationManager.registerSynchronization(new SqlSessionSynchronization(holder, sessionFactory));
holder.setSynchronizedWithTransaction(true);
holder.requested();
} else {
if (TransactionSynchronizationManager.getResource(environment.getDataSource()) == null) {
LOGGER.debug(() -> "SqlSession [" + session + "] was not registered for synchronization because DataSource is not transactional");
} else {
throw new TransientDataAccessResourceException(
"SqlSessionFactory must be using a SpringManagedTransactionFactory in order to use Spring transaction synchronization");
}
}
} else {
LOGGER.debug(() -> "SqlSession [" + session + "] was not registered for synchronization because synchronization is not active");
}
}

  通过TransactionSynchronizationManager.isSynchronizationActive()判断有事务之后(每当开启一个事务,就会调用TransactionSynchronizationManager#initSynchronization()对synchronizations初始化),并且判断环境中有spring管理的TransactionFactorybean之后,构造SqlSessionHolder对象与sessionFactory绑定,注册到TransactionSynchronizationManagerresourcesmap中。注意,这里有个操作TransactionSynchronizationManager.registerSynchronization(new SqlSessionSynchronization(holder, sessionFactory));,将在后面说到,在service层方法执行完,统一提交事务时会涉及到。

  结合上述,回到SqlSessionInterceptor#invoke(),可以知道,会尝试从TransactionSynchronizationManager的resources获取当前线程当前sessionFactory对应的SqlSessionHolder,如果之前放入过,则直接取出SqlSessionHolder中持有的sqlSession,如果没,则创建sqlSession返回,再根据是否有声明事务,构造SqlSessionHolder持有sqlSession,再注册进TransactionSynchronizationManager的resources,返回sqlSession。

  拿到sqlSession后,通过method.invoke(sqlSession, args)直接调用SqlSession的数据操作,获取结果。然后通过isSqlSessionTransactional()判断是否具有事务(尝试从TransactionSynchronizationManager的resources中获取当前线程对应的SqlSessionHolder,如果为空,或者持有的sqlSession不等于当前的sqlSession,说明不具有事务),如果无事务,那么就sqlSession.commit(true)立即提交当前会话。在返回结果之前,会执行closeSqlSession()( 即同上再次尝试从resources获取SqlSessionHolder,如果不具有事务,则关闭会话)。

sqlSession.commit()包含了提交事务的作用,但在与spring的事务整合时发现,一些属性会影响使其执行不到其中transaction.commit(),然后靠后续通过Connection调用NaticeSession发送commit指令给mysql来提交

  从以上了解到了没有事务情况下,每次mapper调用都会创建新的会话,并且会话会在执行完后立即提交,而有事务时,共用一个会话,执行完数据操作后并不会提交会话,通过debug日志知道,在service方法执行完后,统一提交了事务,那么是如何统一提交事务呢,以及为什么未使用事务mapper执行完就提交会话了呢。

  service的方法在被执行调用时,实际上是被代理类CglibAopProxy动态代理,并进入到通用回调DynamicAdvisedInterceptor中,看一下这个回调中的intercept():

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public Object intercept(Object proxy, Method method, Object[] args, MethodProxy methodProxy) throws Throwable {
Object oldProxy = null;
boolean setProxyContext = false;
Object target = null;
TargetSource targetSource = this.advised.getTargetSource();
try {
if (this.advised.exposeProxy) {
oldProxy = AopContext.setCurrentProxy(proxy);
setProxyContext = true;
}
target = targetSource.getTarget();
Class<?> targetClass = (target != null ? target.getClass() : null);
List<Object> chain = this.advised.getInterceptorsAndDynamicInterceptionAdvice(method, targetClass);
if (chain.isEmpty() && Modifier.isPublic(method.getModifiers())) {
Object[] argsToUse = AopProxyUtils.adaptArgumentsIfNecessary(method, args);
retVal = methodProxy.invoke(target, argsToUse);
}
else {
retVal = new CglibMethodInvocation(proxy, target, method, args, targetClass, chain, methodProxy).proceed();
}
retVal = processReturnType(proxy, target, method, retVal);
return retVal;
}
finally {
if (target != null && !targetSource.isStatic()) {
targetSource.releaseTarget(target);
}
if (setProxyContext) {
AopContext.setCurrentProxy(oldProxy);
}
}
}

  这里是一个关键点,对于有事务和无事务的分岔点就在这里,检查该方法(切点)的InvokerInterceptor数量,如果为空,那么就跳过创建MethodInvocation,而直接调用代理目标也就是service方法,如果不为空,则创建一个CglibMethodInvocation并执行。而如果方法有@Transactional注解修饰,该方法(切点)会拥有一个TransactionInterceptor,所以就不直接调用service方法,而是创建CglibMethodInvocation调用。

无事务的情况

  无事务时,直接调用代理目标也就是service方法,service调用mapper的方法,而调用mapper的方法,从前面的学习已经知道实质上调用SimpleExecutordoUpdate()doQuery(),而这两个方法实际上是调用StatementHandler的update和query,而再往后则是调用PreparedStatementexecute(),最终调用mysql的jdbc驱动包中原始会话NaticeSessionexecSQL(),第一次先建立连接,第二次发送SET autocommit=1,告诉mysql后续的操作都是自动提交,所以后续直接执行sql获取结果

有事务的情况

  就如上面所说,创建CglibMethodInvocation,然后反射调用TransactionInterceptorTransactionInterceptor继承自TransactionAspectSupportTransactionInterceptorinvoke()调用TransactionAspectSupportinvokeWithinTransaction()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
protected Object invokeWithinTransaction(Method method, @Nullable Class<?> targetClass,
final InvocationCallback invocation) throws Throwable {

// If the transaction attribute is null, the method is non-transactional.
TransactionAttributeSource tas = getTransactionAttributeSource();
final TransactionAttribute txAttr = (tas != null ? tas.getTransactionAttribute(method, targetClass) : null);
final PlatformTransactionManager tm = determineTransactionManager(txAttr);
final String joinpointIdentification = methodIdentification(method, targetClass, txAttr);

if (txAttr == null || !(tm instanceof CallbackPreferringPlatformTransactionManager)) {
// Standard transaction demarcation with getTransaction and commit/rollback calls.
TransactionInfo txInfo = createTransactionIfNecessary(tm, txAttr, joinpointIdentification);

Object retVal;
try {
retVal = invocation.proceedWithInvocation();
}
catch (Throwable ex) { completeTransactionAfterThrowing(txInfo, ex);
throw ex;
}
finally {
cleanupTransactionInfo(txInfo);
}
commitTransactionAfterReturning(txInfo);
return retVal;
}

else {
// CallbackPreferringPlatformTransactionManager情况下的处理
}
}

  可以观察到,这是一个环绕切面,先通过retVal = invocation.proceedWithInvocation()调用代理目标,也就是调用service方法执行切点,然后后置地调用commitTransactionAfterReturning(txInfo),这里是在切点执行之后,用于提交事务的。在调用service方法时,调用mapper的方法,在最终调用mysql驱动包时,建立连接后,给mysql发送了SET autocommit=0,即意味着后续的sql执行都先不提交,而在commitTransactionAfterReturning()

1
2
3
4
5
6
protected void commitTransactionAfterReturning(@Nullable TransactionInfo txInfo) {
if (txInfo != null && txInfo.getTransactionStatus() != null) {
// 日志打印省略
txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());
}
}

  这里获取到的TransactionManager是DataSourceTransactionManagerDataSourceTransactionManager继承AbstractPlatformTransactionManager,commit方法中再次调用processCommit()processCommit()调用doCommit(),在doCommit()中获取连接Connection,调用Connection#commit(),最终也是调用NativeSession#execSQL(),只不过给mysql发送的是commit,即提交之前发送的所有sql。

processCommit()中调用doCommit()之前,有一步调用triggerBeforeCommit(status),而在这个方法中调用TransactionSynchronizationUtils.triggerBeforeCommit(status.isReadOnly()),而在前文中,提到过,在创建sqlSession时,会构造SqlSessionHolder,然后注册到TransactionSynchronizationManager,同时TransactionSynchronizationManager.registerSynchronization(new SqlSessionSynchronization(holder, sessionFactory));创建了SqlSessionSynchronization对象注册。而TransactionSynchronizationUtils.triggerBeforeCommit(status.isReadOnly())中即对TransactionSynchronizationManager中所有的TransactionSynchronization对象遍历,执行它们的beforeCommit(),而SqlSessionSynchronizationbeforeCommit()中,获取持有sqlSessionHolder进而获取SqlSession,然后调用commit()

学习到这里,对mybatis的会话关于事务方面做了些补充,但学习的越多,疑惑也就越多(坑也就越多…),比如关于Spring如何管理事务的,将带着这些疑问进行后续的学习,做好学习笔记。

秋月 wechat
欢迎您扫一扫上面的微信公众号,订阅我的博客!
随缘打赏,您的支持将鼓励我继续创作!