Spring事务源码详解
事务其实也是基于AOP切面实现的。我们可以直接使用注解,开启事务管理功能:
@Configuration @ComponentScan("com.wml") @Import({JdbcConfig.class,TransactionConfig.class}) @PropertySource(value = "JdbcConfig.properties") @EnableTransactionManagement public class SpringConfiguration { }
主要是@EnableTransactionManagement
注解开启,除此,还需要定义数据源和事务管理器:
如上@Import注解,就分别引入了两个的配置类:
public class JdbcConfig { @Value("${jdbc.driver}") private String driver; @Value("${jdbc.url}") private String url; @Value("${jdbc.username}") private String username; @Value("${jdbc.password}") private String password; @Bean(name = "template") public JdbcTemplate createTemplate(DataSource dataSource){ return new JdbcTemplate(dataSource); } //这里配置datasource @Bean(name = "dataSource") public DataSource createDataSource(){ DriverManagerDataSource dataSource=new DriverManagerDataSource(); dataSource.setDriverClassName(driver); dataSource.setUrl(url); dataSource.setUsername(username); dataSource.setPassword(password); return dataSource; } }
这里配置事务管理器
public class TransactionConfig { /** * 创建事务管理器对象 * @param dataSource * @return */ @Bean(name = "transactionManager") public PlatformTransactionManager createTransaction(DataSource dataSource){ return new DataSourceTransactionManager(dataSource); } }
Spring在启动的时候会扫描到EnableTransactionManagement
注解,跟进入,里面引入了TransactionManagementConfigurationSelector
配置类:
@Import(TransactionManagementConfigurationSelector.class)
该类中有个如下的方法:
这个方法在扫描注解时,会调用到该方法,收集beanName,然后封装成beanDefinition对象
protected String[] selectImports(AdviceMode adviceMode) { switch (adviceMode) { case PROXY: return new String[] {AutoProxyRegistrar.class.getName(), ProxyTransactionManagementConfiguration.class.getName()}; case ASPECTJ: return new String[] {determineTransactionAspectClass()}; default: return null; } }
该方法中主要返回了两个类的beanName,我们主要看ProxyTransactionManagementConfiguration
类,该类用于创建事务的切面,同时,数据源和事务管理器也是在这里加载。
创建事务切面
//ProxyTransactionManagementConfiguration# @Bean(name = TransactionManagementConfigUtils.TRANSACTION_ADVISOR_BEAN_NAME) @Role(BeanDefinition.ROLE_INFRASTRUCTURE) public BeanFactoryTransactionAttributeSourceAdvisor transactionAdvisor() { //这个就很熟悉了,Adivsor,一定是个切面 BeanFactoryTransactionAttributeSourceAdvisor advisor = new BeanFactoryTransactionAttributeSourceAdvisor(); //1.将Transactional注解解析出来保存 advisor.setTransactionAttributeSource(transactionAttributeSource()); //2.获取Advice advisor.setAdvice(transactionInterceptor()); if (this.enableTx != null) { advisor.setOrder(this.enableTx.<Integer>getNumber("order")); } return advisor; } public TransactionAttributeSource transactionAttributeSource() { return new AnnotationTransactionAttributeSource(); }
解析事务注解,就是对注解中的参数进行封装,这里创建了一个AnnotationTransactionAttributeSource
:
public AnnotationTransactionAttributeSource(boolean publicMethodsOnly) { this.publicMethodsOnly = publicMethodsOnly; if (jta12Present || ejb3Present) { this.annotationParsers = new LinkedHashSet<>(4); //主要是用这个注解解析器,解析方法上面的@Transactional注解 this.annotationParsers.add(new SpringTransactionAnnotationParser()); if (jta12Present) { this.annotationParsers.add(new JtaTransactionAnnotationParser()); } if (ejb3Present) { this.annotationParsers.add(new Ejb3TransactionAnnotationParser()); } } else { this.annotationParsers = Collections.singleton(new SpringTransactionAnnotationParser()); } }
该类中主要借助SpringTransactionAnnotationParser
进行解析,将所有的注解参数封装到RuleBasedTransactionAttribute
中返回。
protected TransactionAttribute parseTransactionAnnotation(AnnotationAttributes attributes) { RuleBasedTransactionAttribute rbta = new RuleBasedTransactionAttribute(); Propagation propagation = attributes.getEnum("propagation"); rbta.setPropagationBehavior(propagation.value()); Isolation isolation = attributes.getEnum("isolation"); rbta.setIsolationLevel(isolation.value()); rbta.setTimeout(attributes.getNumber("timeout").intValue()); rbta.setReadOnly(attributes.getBoolean("readOnly")); rbta.setQualifier(attributes.getString("value")); List<RollbackRuleAttribute> rollbackRules = new ArrayList<>(); for (Class<?> rbRule : attributes.getClassArray("rollbackFor")) { rollbackRules.add(new RollbackRuleAttribute(rbRule)); } for (String rbRule : attributes.getStringArray("rollbackForClassName")) { rollbackRules.add(new RollbackRuleAttribute(rbRule)); } for (Class<?> rbRule : attributes.getClassArray("noRollbackFor")) { rollbackRules.add(new NoRollbackRuleAttribute(rbRule)); } for (String rbRule : attributes.getStringArray("noRollbackForClassName")) { rollbackRules.add(new NoRollbackRuleAttribute(rbRule)); } rbta.setRollbackRules(rollbackRules); return rbta; }
这都很简单。重点来看看如何创建Adivice的:
public TransactionInterceptor transactionInterceptor() { TransactionInterceptor interceptor = new TransactionInterceptor(); interceptor.setTransactionAttributeSource(transactionAttributeSource()); //事务管理器要跟数据源挂钩,所以需要自己定义 if (this.txManager != null) { interceptor.setTransactionManager(this.txManager); } return interceptor; }
这里涉及TransactionInterceptor
类,该类实现了TransactionAspectSupport
和MethodInterceptor
接口,因此该类一定是事务的Advice,结合前面讲的AOP调用,该类的invoke一定会在AOP的链式调用中被执行,如下:
public Object invoke(MethodInvocation invocation) throws Throwable { Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null); // 调用invokeWithinTransaction return invokeWithinTransaction(invocation.getMethod(), targetClass, invocation::proceed); }
重点在这里:
一、事务调用
位于TransactionAspectSupport
中,一个事务切面的Advice,和事务控制的逻辑都在该类中处理
protected Object invokeWithinTransaction(Method method, @Nullable Class<?> targetClass, final InvocationCallback invocation) throws Throwable { //1.用于获取事务属性的类 TransactionAttributeSource tas = getTransactionAttributeSource(); //2.获取方法有@Transactional注解的属性 final TransactionAttribute txAttr = (tas != null ? tas.getTransactionAttribute(method, targetClass) : null); //3.获取事务管理器 final PlatformTransactionManager tm = determineTransactionManager(txAttr); final String joinpointIdentification = methodIdentification(method, targetClass, txAttr); if (txAttr == null || !(tm instanceof CallbackPreferringPlatformTransactionManager)) { //4.创建事务 TransactionInfo txInfo = createTransactionIfNecessary(tm, txAttr, joinpointIdentification); Object retVal = null; try { // This is an around advice: Invoke the next interceptor in the chain. // This will normally result in a target object being invoked. //5.链式传递 retVal = invocation.proceedWithInvocation(); } catch (Throwable ex) { // target invocation exception //6.事务回滚 completeTransactionAfterThrowing(txInfo, ex); throw ex; } finally { cleanupTransactionInfo(txInfo); } //7.事务提交 commitTransactionAfterReturning(txInfo); return retVal; } else { final ThrowableHolder throwableHolder = new ThrowableHolder(); // It's a CallbackPreferringPlatformTransactionManager: pass a TransactionCallback in. try { Object result = ((CallbackPreferringPlatformTransactionManager) tm).execute(txAttr, status -> { TransactionInfo txInfo = prepareTransactionInfo(tm, txAttr, joinpointIdentification, status); try { return invocation.proceedWithInvocation(); } catch (Throwable ex) { if (txAttr.rollbackOn(ex)) { // A RuntimeException: will lead to a rollback. if (ex instanceof RuntimeException) { throw (RuntimeException) ex; } else { throw new ThrowableHolderException(ex); } } else { // A normal return value: will lead to a commit. throwableHolder.throwable = ex; return null; } } finally { cleanupTransactionInfo(txInfo); } }); // Check result state: It might indicate a Throwable to rethrow. if (throwableHolder.throwable != null) { throw throwableHolder.throwable; } return result; } catch (ThrowableHolderException ex) { throw ex.getCause(); } catch (TransactionSystemException ex2) { if (throwableHolder.throwable != null) { logger.error("Application exception overridden by commit exception", throwableHolder.throwable); ex2.initApplicationException(throwableHolder.throwable); } throw ex2; } catch (Throwable ex2) { if (throwableHolder.throwable != null) { logger.error("Application exception overridden by commit exception", throwableHolder.throwable); } throw ex2; } } }
该类主要做了如下的事:
获取方法有
@Transactional
注解的属性,这里最终就会调用到上面讲的parseTransactionAnnotation
方法,封装注解中的所有属性,得到TransactionAttribute
获取事务管理器,缓存中没有时,会执行通过
getBean
,获取PlatformTransactionManager
实例,就会获得我们配置的事务管理器实例,如下:defaultTransactionManager = this.beanFactory.getBean(PlatformTransactionManager.class);
创建事务
来看看如何创建事务的:
1.1 创建事务
protected TransactionInfo createTransactionIfNecessary(@Nullable PlatformTransactionManager tm, @Nullable TransactionAttribute txAttr, final String joinpointIdentification) { ................. TransactionStatus status = null; if (txAttr != null) { if (tm != null) { //代码1:开启事务: status = tm.getTransaction(txAttr); } else { ....logger.. } } //创建事务信息对象,记录新老事务信息对象 return prepareTransactionInfo(tm, txAttr, joinpointIdentification, status); }
1.1.1 开启事务
上面会调用事务管理器的getTransaction
开启事务【按照标注的代码1、2......顺序跟下面标题看】
public final TransactionStatus getTransaction(@Nullable TransactionDefinition definition) throws TransactionException { //代码1创建事务对象 Object transaction = doGetTransaction(); // Cache debug flag to avoid repeated checks. boolean debugEnabled = logger.isDebugEnabled(); if (definition == null) { // Use defaults if no transaction definition given. definition = new DefaultTransactionDefinition(); } //代码2:如果存在事务【第一次不会走这里】 if (isExistingTransaction(transaction)) { // Existing transaction found -> check propagation behavior to find out how to behave. return handleExistingTransaction(definition, transaction, debugEnabled); } // Check definition settings for new transaction. if (definition.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) { throw new InvalidTimeoutException("Invalid transaction timeout", definition.getTimeout()); } // No existing transaction found -> check propagation behavior to find out how to proceed. if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) { throw new IllegalTransactionStateException( "No existing transaction found for transaction marked with propagation 'mandatory'"); } //代码3:第一次没有事务走到这 else if(){ ..看1.1.1.3部分. } else { ...... boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS); return prepareTransactionStatus(definition, null, true, newSynchronization, debugEnabled, null); } }
1.1.1.1创建事务对象
在DataSourceTransactionManager
类中
protected Object doGetTransaction() { //DataSourceTransactionObject内部维护了ConnectionHolder,ConnectionHolder中又维护了连接对象connection DataSourceTransactionObject txObject = new DataSourceTransactionObject(); //是允许嵌套事务 txObject.setSavepointAllowed(isNestedTransactionAllowed()); //获取数据源对象 ConnectionHolder conHolder = (ConnectionHolder) TransactionSynchronizationManager.getResource(obtainDataSource()); txObject.setConnectionHolder(conHolder, false); return txObject; }
获取数据源和连接对象
obtainDataSource()方法获取到了数据源datasource,即下面的actualKey
private static Object doGetResource(Object actualKey) { Map<Object, Object> map = resources.get(); if (map == null) { return null; } Object value = map.get(actualKey); // Transparently remove ResourceHolder that was marked as void... if (value instanceof ResourceHolder && ((ResourceHolder) value).isVoid()) { map.remove(actualKey); // Remove entire ThreadLocal if empty... if (map.isEmpty()) { resources.remove(); } value = null; } return value; }
可以看到是从resources拿连接对象,而resources是一个ThreadLocal:ThreadLocal<Map<Object, Object>> resources
,使用了一个Map,map的key就是datasource,value就是对应的连接对象,这里就是根据当前的数据源拿到对应的连接对象,之所以这样映射,是因为我们可以配置多个数据源。第一次创建事务的时候拿不到连接对象。
1.1.1.2已经存在事务
回到1.1.1,拿到事务对象后,会进入下面的if逻辑,如果当前已经存在事务【第一次不会进入这里】,就返回一个TransactionStatus
对象,在这里会进行传播属性的判断。【建议先跳过这部分内容,看完第一个流程后再看,即下面的1.1.1.3的doBegin后再看】
private TransactionStatus handleExistingTransaction( TransactionDefinition definition, Object transaction, boolean debugEnabled) throws TransactionException { //1.PROPAGATION_NEVER 非事务运行,如果存在事务,则抛异常 if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NEVER) { throw new IllegalTransactionStateException( "Existing transaction found for transaction marked with propagation 'never'"); } //2.PROPAGATION_NOT_SUPPORTED 如果以非事务方式执行操作,如果存在事务,就把当前事务挂起 if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NOT_SUPPORTED) { if (debugEnabled) { logger.debug("Suspending current transaction"); } //挂起当前事务 Object suspendedResources = suspend(transaction); boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS); //修改事务状态信息,把事务的一些信息存储到当前线程中,ThreadLocal中 return prepareTransactionStatus( definition, null, false, newSynchronization, debugEnabled, suspendedResources); } //3.PROPAGATION_REQUIRES_NEW 有事务就挂起当前事务,再创建新的事务 if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW) { if (debugEnabled) { logger.debug("Suspending current transaction, creating new transaction with name [" + definition.getName() + "]"); } //挂起当前事务[看后面讲解] SuspendedResourcesHolder suspendedResources = suspend(transaction); try { boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER); DefaultTransactionStatus status = newTransactionStatus( definition, transaction, true, newSynchronization, debugEnabled, suspendedResources); //这里进行创建新的事务,后面有讲解 doBegin(transaction, definition); prepareSynchronization(status, definition); return status; } catch (RuntimeException | Error beginEx) { resumeAfterBeginException(transaction, suspendedResources, beginEx); throw beginEx; } } //4.PROPAGATION_NESTED嵌套事务 if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) { if (!isNestedTransactionAllowed()) { throw new NestedTransactionNotSupportedException( "Transaction manager does not allow nested transactions by default - " + "specify 'nestedTransactionAllowed' property with value 'true'"); } //默认true if (useSavepointForNestedTransaction()) { //注意,这里事务状态为false,而不是true DefaultTransactionStatus status = prepareTransactionStatus(definition, transaction, false, false, debugEnabled, null); //创建回滚点 status.createAndHoldSavepoint(); return status; } else { // Nested transaction through nested begin and commit/rollback calls. // Usually only for JTA: Spring synchronization might get activated here // in case of a pre-existing JTA transaction. boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER); DefaultTransactionStatus status = newTransactionStatus( definition, transaction, true, newSynchronization, debugEnabled, null); doBegin(transaction, definition); prepareSynchronization(status, definition); return status; } } if (isValidateExistingTransaction()) { if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT) { Integer currentIsolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel(); if (currentIsolationLevel == null || currentIsolationLevel != definition.getIsolationLevel()) { Constants isoConstants = DefaultTransactionDefinition.constants; throw new IllegalTransactionStateException("Participating transaction with definition [" + definition + "] specifies isolation level which is incompatible with existing transaction: " + (currentIsolationLevel != null ? isoConstants.toCode(currentIsolationLevel, DefaultTransactionDefinition.PREFIX_ISOLATION) : "(unknown)")); } } if (!definition.isReadOnly()) { if (TransactionSynchronizationManager.isCurrentTransactionReadOnly()) { throw new IllegalTransactionStateException("Participating transaction with definition [" + definition + "] is not marked as read-only but existing transaction is"); } } } boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER); //如果上面都没匹配,就会走这里。 return prepareTransactionStatus(definition, transaction, false, newSynchronization, debugEnabled, null); }
挂起当前事务
挂起事务最终会执行doSuspend
方法:
可以看到,就是将连接对象ConnectionHolder
置空,然后再将第一次绑定的关系进行解绑
protected Object doSuspend(Object transaction) { DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction; txObject.setConnectionHolder(null); //解除绑定关系, return TransactionSynchronizationManager.unbindResource(obtainDataSource()); }
解绑操作核心代码:
Map<Object, Object> map = resources.get(); if (map == null) { return null; } Object value = map.remove(actualKey);
整个流程下来,发现没有我们的默认传播属性PROPAGATION_REQUIRED,因此会来到代码的最后:
return prepareTransactionStatus(definition, transaction, false, newSynchronization, debugEnabled, null);
,这里它将newTransaction
设为了false,因为默认传播属性会使用原来的事务;而在第一次创建事务或者传播属性为REQUIRES_NEW
时,会将该状态设为true,表示是一个新事务。
1.1.1.3 第一次不存在事务
如果第一次进来,则没有事务,就会进入这个else if
else if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED || definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW || definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) { //先挂起 SuspendedResourcesHolder suspendedResources = suspend(null); ... try { boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER); //创建事务状态对象,封装了事务对象运行时的的一些信息 DefaultTransactionStatus status = newTransactionStatus( definition, transaction, true, newSynchronization, debugEnabled, suspendedResources); //****在这里开启事务**** doBegin(transaction, definition); //开启事务后,改变事务状态 prepareSynchronization(status, definition); return status; }
- 创建事务状态对象,记录了事务流转过程中的状态数据,其中
newTransaction
属性为true时,就代表当前事务允许单独提交和回滚,一般是第一次创建事务或者事务传播属性为PROPAGATION_REQUIRES_NEW
的时候。如果为 false 则当前事务不能单独提交和回滚。 - 开启事务
- 改变事务状态
开启事务doBegin
doBegin主要做了如下的事:
- 从数据源拿到连接对象进行包装放到事务对象中
- 关闭事务自动提交。
con.setAutoCommit(false);
,不让数据库控制,而由我们自己控制【核心】 - 如果是第一次创建事务,则建立将当前线程与连接对象的关系,即将数据源和连接对象保存到ThreadLocal中
protected void doBegin(Object transaction, TransactionDefinition definition) { DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction; Connection con = null; try { //如果没有数据库连接 if (!txObject.hasConnectionHolder() || txObject.getConnectionHolder().isSynchronizedWithTransaction()) { //1. 从连接池里面获取连接 Connection newCon = obtainDataSource().getConnection(); if (logger.isDebugEnabled()) { logger.debug("Acquired Connection [" + newCon + "] for JDBC transaction"); } //2.把连接包装成ConnectionHolder,然后设置到事务对象中 txObject.setConnectionHolder(new ConnectionHolder(newCon), true); } txObject.getConnectionHolder().setSynchronizedWithTransaction(true); con = txObject.getConnectionHolder().getConnection(); //3.获取隔离级别 Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition); txObject.setPreviousIsolationLevel(previousIsolationLevel); if (con.getAutoCommit()) { txObject.setMustRestoreAutoCommit(true); if (logger.isDebugEnabled()) { logger.debug("Switching JDBC Connection [" + con + "] to manual commit"); } //关闭连接的自动提交(开启事务控制) con.setAutoCommit(false); } //设置只读事务 从这一点设置的时间点开始(时间点a)到这个事务结束的过程中,其他事务所提交的数据,该事务将看不见 //设置只读事务后,数据库就不会进行加锁等保证安全的操作,提高效率 prepareTransactionalConnection(con, definition); //自己提交关闭了,就说明已经开启事务了,事务是活动的 txObject.getConnectionHolder().setTransactionActive(true); int timeout = determineTimeout(definition); if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) { txObject.getConnectionHolder().setTimeoutInSeconds(timeout); } // Bind the connection holder to the thread. if (txObject.isNewConnectionHolder()) { //如果是新创建的事务,则建立当前线程和数据库连接的关系 TransactionSynchronizationManager.bindResource(obtainDataSource(), txObject.getConnectionHolder()); } } .... }
到这里就完成了第一次进入事务切面的处理,主要就是获取连接对象开启事务,接着会走到第一部分的代码5,invocation.proceedWithInvocation();
进行AOP的链式调用,如果我们有如下的业务代码,此时调用Service对象,也是个代理对象,因此又会进入getTransaction
,此时因为第一次已经将数据源数据库连接关系保存到了当前线程的ThreadLocal中,因此可以拿到连接对象,它就会走第一个if,即1.1.1.2部分。
@Transactional public Result add(User user) { userService.add(user); return new Result("OK"); }
根据AOP的链式调用,invocation.proceedWithInvocation();
执行完Advice方法后,就会调用被代理的方法代码,如果在这期间出现了异常。就会进入第一部分的catch中进行回滚completeTransactionAfterThrowing
:
1.2事务回滚
protected void completeTransactionAfterThrowing(@Nullable TransactionInfo txInfo, Throwable ex) { if (txInfo != null && txInfo.getTransactionStatus() != null) { if (txInfo.transactionAttribute != null && txInfo.transactionAttribute.rollbackOn(ex)) { try { txInfo.getTransactionManager().rollback(txInfo.getTransactionStatus()); } .....catch... } .... } }
最终会调用如下方法:
private void processRollback(DefaultTransactionStatus status, boolean unexpected) { try { boolean unexpectedRollback = unexpected; try { triggerBeforeCompletion(status); //如果是嵌套事务,则就会有回滚点 if (status.hasSavepoint()) { ... //这里就按照回滚点进行回滚 status.rollbackToHeldSavepoint(); } //都为PROPAGATION_REQUIRED最外层事务统一回滚 else if (status.isNewTransaction()) { ... doRollback(status); } else { // Participating in larger transaction if (status.hasTransaction()) { if (status.isLocalRollbackOnly() || isGlobalRollbackOnParticipationFailure()) { if (status.isDebug()) { logger.debug("Participating transaction failed - marking existing transaction as rollback-only"); } doSetRollbackOnly(status); } .. } .. // Unexpected rollback only matters here if we're asked to fail early if (!isFailEarlyOnGlobalRollbackOnly()) { unexpectedRollback = false; } } } .... triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK); ..... } finally { cleanupAfterCompletion(status); } }
在doRollback
中拿到连接对象调用rollback回滚:
protected void doRollback(DefaultTransactionStatus status) { DataSourceTransactionObject txObject = (DataSourceTransactionObject) status.getTransaction(); Connection con = txObject.getConnectionHolder().getConnection(); .. try { con.rollback(); } .... }
回滚这部分都几点需要特别注意的:
在回滚中,需要判断事务的状态,如果都是PROPAGATION_REQUIRED
传播属性,则只有在其NewTransaction
是true的时候,才会调用doRollback
回滚,即会在最外层的调用中统一进行回滚,不会在内层调用进行回滚。而为true的状态只有在第一次创建事务或传播属性为NEW时才是,因此如果传播属性为NEW,则出现异常就一定会回滚。
如果有回滚点,则按照回滚点回滚
事务提交回滚的几个问题【重要】
按照这个逻辑,就会有以下几个问题:
有三个事务ABC,在A中先调用B再调用C:
(没有特别说明的,都是默认传播属性REQUIRED,第一个A传播属性无关紧要,传播属性主要针对后面的事务)
- 如果ABC都是默认传播属性,如果在C出现异常,则会在A中统一回滚,三个都不会提交成功
- 如果B是NEW
- 如果在B中抛出异常,则会在A中回滚,A和B都不会提交成功,C压根不会执行到
- 如果在C抛出异常,因为B是NEW,所以B会提交成功(和A和C不是同一个连接),而C的异常会在A中捕获回滚,因此C不会提交成功
- 如果C是NEW
- 如果C抛异常,则B会提交成功,C会回滚提交失败
- 如果B抛异常,则都A和B提交失败,C不会调用到
- 如果B和C都是NEW
- 如果C抛异常,则B提交成功,C会回滚
- 如果B抛异常,则B会回滚,C调用不到
- 而如果我们在抛异常的时候,手动用try-catch捕获,则不会回滚,因为spring中的catch无法捕捉到异常
假如B和C都是NEW,如果在C中抛出异常,则B会提交成功不回滚,C不会提交成功进行回滚;
嵌套事务的问题
同样是上面的例子,假如此时B和C的属性为PROPAGATION_NESTED,即使用了嵌套事务,那么就会在这两个事务前创建回滚点,如果此时在C抛出异常,此时会发生什么?B会正常提交吗?
答案是不会的,因为如果C抛出异常,该异常最终会在最外层的A被捕获,在A中进行统一回滚,这样B和C都无法提交。要解决这个问题,就是想要在安全点前回滚,保证C出异常时B正常提交,就要让最外层的事务无法捕获异常。那么我们就可以按上面的在A中使用try-catch将B和C包裹起来,手动捕获异常,这样Spring就无法捕获回滚,B就能够正常提交。
另外如果在同类中,事务A中调用另一个带事务的B,则B的事务会失效,此时应该在该类注入本类,然后通过注入的bean调用B,才可以避免B的事务失效。
因为 spring通过TransactionInterceptor
类来控制事务开启,提交,回滚等, 它会创建一个目标类的代理类. 而在本示例中methodA
方法调用methodB
方法时,并不是通过代理类去调用,而是通过this调用本身的方法methodB
方法.所以methodB
方法的事务并不会开启。
class A{ @Transactional methodA(){ methodB(); } @Transactional methodB(){ dosomething } }
上面情况B的事务会失效,应该改为下面的:
class A{ @Autowired A a; @Transactional methodA(){ a.methodB(); } @Transactional methodB(){ dosomething } }
或者将方法B放在另一个类中,然后注入另一个类调用B方法;或者通过(A) AopContext.currentProxy().methodB()
调用。
1.3 事务提交
如果没有Advice需要调用,就会调用被代理的方法,如果没有出现异常,就会走到commitTransactionAfterReturning
提交事务,最终会调用下面的方法:
public final void commit(TransactionStatus status) throws TransactionException { if (status.isCompleted()) {//事务已经完成了,就不能再提交事务,抛出异常 throw new IllegalTransactionStateException( "Transaction is already completed - do not call commit or rollback more than once per transaction"); } DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status; if (defStatus.isLocalRollbackOnly()) { if (defStatus.isDebug()) { logger.debug("Transactional code has requested rollback"); } //处理异常回滚 processRollback(defStatus, false); return; } if (!shouldCommitOnGlobalRollbackOnly() && defStatus.isGlobalRollbackOnly()) { if (defStatus.isDebug()) { logger.debug("Global transaction is marked as rollback-only but transactional code requested commit"); } processRollback(defStatus, true); return; } ///提交 processCommit(defStatus); }
提交逻辑:
private void processCommit(DefaultTransactionStatus status) throws TransactionException { try { boolean beforeCompletionInvoked = false; try { boolean unexpectedRollback = false; prepareForCommit(status); triggerBeforeCommit(status); triggerBeforeCompletion(status); beforeCompletionInvoked = true; //如果有回滚点,则需要先把回滚点清除,回滚点是在嵌套事务中添加的,如果能走到这里,说明没有异常,执行成功,那么这些回滚点(安全点)就没有存在的意义了,所以直接清除掉 if (status.hasSavepoint()) { if (status.isDebug()) { logger.debug("Releasing transaction savepoint"); } unexpectedRollback = status.isGlobalRollbackOnly(); status.releaseHeldSavepoint(); } //这里根据NewTransaction状态就会出现两个情况: //1. 如果隔离级别都是PROPAGATION_REQUIRED,则最外层的调用才会走进来统一提交 //2.而如果是PROPAGATION_REQUIRES_NEW,每一个事务都会进来 else if (status.isNewTransaction()) { if (status.isDebug()) { logger.debug("Initiating transaction commit"); } unexpectedRollback = status.isGlobalRollbackOnly(); //内部获取连接对象调用其commit方法提交事务 doCommit(status); } else if (isFailEarlyOnGlobalRollbackOnly()) { unexpectedRollback = status.isGlobalRollbackOnly(); } // Throw UnexpectedRollbackException if we have a global rollback-only // marker but still didn't get a corresponding exception from commit. if (unexpectedRollback) { throw new UnexpectedRollbackException( "Transaction silently rolled back because it has been marked as rollback-only"); } } catch (UnexpectedRollbackException ex) { // can only be caused by doCommit triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK); throw ex; } catch (TransactionException ex) { // can only be caused by doCommit if (isRollbackOnCommitFailure()) { doRollbackOnCommitException(status, ex); } else { triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN); } throw ex; } catch (RuntimeException | Error ex) { if (!beforeCompletionInvoked) { triggerBeforeCompletion(status); } doRollbackOnCommitException(status, ex); throw ex; } // Trigger afterCommit callbacks, with an exception thrown there // propagated to callers but the transaction still considered as committed. try { triggerAfterCommit(status); } finally { triggerAfterCompletion(status, TransactionSynchronization.STATUS_COMMITTED); } } finally { //释放资源 cleanupAfterCompletion(status); } }
事务提交流程:
- 如果有回滚点,需要清除回滚点
- 如果传播属性为REQUIERD,则只有最外层的事务才会提交
- 如果传播属性为NEW,则每次都会提交
- 释放资源
2和3主要根据事务NewTransaction状态判断,而前面讲过,只有第一次创建事务和传播属性为NEW时,才会将该状态设为true,因为NEW每次都会将当前事务挂起,而创建新的事务。
释放资源
private void cleanupAfterCompletion(DefaultTransactionStatus status) { status.setCompleted(); if (status.isNewSynchronization()) { TransactionSynchronizationManager.clear(); } //NewTransaction为true才释放 if (status.isNewTransaction()) { doCleanupAfterCompletion(status.getTransaction()); } //如果当前事务有挂起的事务 if (status.getSuspendedResources() != null) { if (status.isDebug()) { logger.debug("Resuming suspended transaction after completion of inner transaction"); } Object transaction = (status.hasTransaction() ? status.getTransaction() : null); //恢复挂起连接的绑定 resume(transaction, (SuspendedResourcesHolder) status.getSuspendedResources()); } }
当仅当NewTransaction为true才释放,因为为true时表示所有操作都执行完了,然后将连接对象归还到连接池,再解除在ThreadLocal中绑定的线程和连接对象的关系。
如果还有事务被挂起,则恢复被挂起的事务的连接关系【前面挂起的事务解除了该关系】
二、注解事务和编程式事务
方法上面加了@Transactional
的事务就是注解事务
这种事务控制粒度不够细,如果方法流程很长的话会产生连接占用问题,连接占用就会导致整个系统吞吐量下降。
注解事务犹豫隔离级别的不同,可能导致可重复读的问题,比如抢火车票这种,需要用
到乐观锁,但是在重复抢票的时候由于是同一个连接对象,所以每次查询的数据是一样的。
用编程式事务解决了这个问题,这个查询语句是在 execute 方法内,当这个方法执行完
时就提交了事务,下次再进来时又是一个单独的新事务了。
编程式事务控制粒度更细,可以只关心需要事务控制的操作,如果不需要事务控制的代
码可以不放在 execute 方法内。
@Bean public TransactionTemplate transactionTemplate(PlatformTransactionManager platformTransactionManager) { TransactionTemplate transactionTemplate = new TransactionTemplate(); transactionTemplate.setTransactionManager(platformTransactionManager); return transactionTemplate; } @Autowired private TransactionTemplate transactionTemplate; public int getTicketModeOne() { Integer execute = transactionTemplate.execute(status -> { //1、获取锁 List<ZgTicket> zgTickets = commonMapper.queryTicketById("12306"); Map lockmap = new HashMap(); lockmap.put("ticketId", "12306"); lockmap.put("version", zgTickets.get(0).getVersion()); int i = commonMapper.updateLock(lockmap); if (i > 0) { //抢票 ZgTicket zgTicket = zgTickets.get(0); zgTicket.setTicketCount(2); int i1 = commonMapper.updateTicket(zgTicket); } return i; }); if (execute == 0) { //继续抢 getTicketModeOne(); } return 0; }
如果想更细,可以自己手动控制事务:
@Autowired PlatformTransactionManager platformTransactionManager; public void xxx() { DefaultTransactionDefinition defaultTransactionDefinition = new DefaultTransactionDefinition(); defaultTransactionDefinition.setPropagationBehavior(0); TransactionStatus transaction = platformTransactionManager.getTransaction(defaultTransactionDefinition); try { System.out.println("业务代码"); }catch (Exception e) { platformTransactionManager.rollback(transaction); } platformTransactionManager.commit(transaction); }
附1:事务的传播属性
所谓传播属性就是告诉spring如何去控制事务。
PROPAGATION_REQUIRED【默认】
如果当前没有事务,就新建一个事务,如果已经存在一个事务中,加入到这个事务中。即使用同一个事务
PROPAGATION_SUPPORTS
支持当前事务,如果当前没有事务,就以非事务方式执行。
PROPAGATION_MANDATORY
使用当前的事务,如果当前没有事务,就抛出异常PROPAGATION_REQUIRES_NEW√
新建事务,如果当前存在事务,把当前事务挂起。PROPAGATION_NOT_SUPPORTED
以非事务方式执行操作,如果当前存在事务,就把当前事务挂起。
PROPAGATION_NEVER
以非事务方式执行,如果当前存在事务,则抛出异常。
PROPAGATION_NESTED
如果当前存在事务,则在嵌套事务内执行。如果当前没有事务,则执行与PROPAGATION_REQUIRED类似的操作。
附2:隔离级别
ISOLATION_DEFAULT:
默认的隔离级别,使用数据库默认的事务隔离级别.(另外四个与JDBC的隔离级别相对应)
ISOLATION_READ_UNCOMMITTED:【读未提交】
这是事务最低的隔离级别,它允许另外一个事务可以看到这个事务未提交的数据。
产生脏读,不可重复读和幻读。
ISOLATION_READ_COMMITTED:【读提交】
保证一个事务修改的数据提交后才能被另外一个事务读取,另外一个事务不能读取该事务未提交的数据。
解决脏读,但产生不可重复读、幻读
ISOLATION_REPEATABLE_READ: 【可重复读】
这种事务隔离级别可以防止脏读,不可重复读。但是可能出现幻像读。它除了保证一个事务不能读取另一个事务未提交的数据外,还保证了避免下面的情况产生(不可重复读)。
解决脏读、不可重复读,但无法解决幻读。
ISOLATION_SERIALIZABLE
这是花费最高代价但是最可靠的事务隔离级别。
事务被处理为顺序执行。除了防止脏读,不可重复读外,还避免了幻像读。
脏读:
读到了尚未提交事务的数据。
例子:
1. 事务A进行了插入操作,插入了一条数据,尚未提交事务 2. 此时事务B读取到了该数据,对该数据进行了一系列操作 3. 但如果事务A没有提交事务,或是进行了回滚,则该数据不存在,因此事务B使用的数据就是脏数据,是不准确的。
不可重复读:
指在一个事务内,多次读同一数据读的结果出现不同。
例子:
1. 在事务A先读了一次数据为X,此时事务A还没有结束 2. 在事务B修改的数据X,变成Y,事务B提交 3. 在事务A进行一些业务操作后,刚好在事务B提交后又进行了读取,发现读的数据变成了Y,和第一次读的不一样,这就是不可重复度。
幻觉读:
当某个事务在读取某个范围内的记录时,另外一个事务又在该范围内插入了新的记录,当之前的事务再次读取该范围的记录时,发现两次不一样,产生幻读。
如:
- 事务A查询数据,发现有4条,尚未提交
- 事务B插入了一条新的数据X后提交。
- 事务A发现没有X,也进行插入一条X,这时可能就会报主键错误,因为X已经插入过了,但是事务A好像出现了幻觉一样,没有发现已经插入过了又进行插入一次。