Spring事务源码详解

事务其实也是基于AOP切面实现的。我们可以直接使用注解,开启事务管理功能:

@Configuration
@ComponentScan("com.wml")
@Import({JdbcConfig.class,TransactionConfig.class})
@PropertySource(value = "JdbcConfig.properties")
@EnableTransactionManagement
public class SpringConfiguration {
}

主要是@EnableTransactionManagement注解开启,除此,还需要定义数据源和事务管理器:

如上@Import注解,就分别引入了两个的配置类:

public class JdbcConfig {
    @Value("${jdbc.driver}")
    private  String driver;
    @Value("${jdbc.url}")
    private String url;
    @Value("${jdbc.username}")
    private String username;
    @Value("${jdbc.password}")
    private String password;

    @Bean(name = "template")
    public JdbcTemplate createTemplate(DataSource dataSource){
        return  new JdbcTemplate(dataSource);
    }
    //这里配置datasource
    @Bean(name = "dataSource")
    public DataSource createDataSource(){
       DriverManagerDataSource dataSource=new DriverManagerDataSource();
        dataSource.setDriverClassName(driver);
        dataSource.setUrl(url);
        dataSource.setUsername(username);
        dataSource.setPassword(password);
        return dataSource;
    }
}

这里配置事务管理器

public class TransactionConfig {
    /**
     * 创建事务管理器对象
     * @param dataSource
     * @return
     */
    @Bean(name = "transactionManager")
    public PlatformTransactionManager createTransaction(DataSource dataSource){
        return new DataSourceTransactionManager(dataSource);
    }
}

Spring在启动的时候会扫描到EnableTransactionManagement注解,跟进入,里面引入了TransactionManagementConfigurationSelector配置类:

@Import(TransactionManagementConfigurationSelector.class)

该类中有个如下的方法:

这个方法在扫描注解时,会调用到该方法,收集beanName,然后封装成beanDefinition对象

protected String[] selectImports(AdviceMode adviceMode) {
        switch (adviceMode) {
            case PROXY:
                return new String[] {AutoProxyRegistrar.class.getName(),
                        ProxyTransactionManagementConfiguration.class.getName()};
            case ASPECTJ:
                return new String[] {determineTransactionAspectClass()};
            default:
                return null;
        }
    }

该方法中主要返回了两个类的beanName,我们主要看ProxyTransactionManagementConfiguration类,该类用于创建事务的切面,同时,数据源和事务管理器也是在这里加载。

创建事务切面

//ProxyTransactionManagementConfiguration#
@Bean(name = TransactionManagementConfigUtils.TRANSACTION_ADVISOR_BEAN_NAME)
    @Role(BeanDefinition.ROLE_INFRASTRUCTURE)
    public BeanFactoryTransactionAttributeSourceAdvisor transactionAdvisor() {
        //这个就很熟悉了,Adivsor,一定是个切面
        BeanFactoryTransactionAttributeSourceAdvisor advisor = new BeanFactoryTransactionAttributeSourceAdvisor();
        //1.将Transactional注解解析出来保存
        advisor.setTransactionAttributeSource(transactionAttributeSource());
        //2.获取Advice
        advisor.setAdvice(transactionInterceptor());
        if (this.enableTx != null) {
            advisor.setOrder(this.enableTx.<Integer>getNumber("order"));
        }
        return advisor;
    }
public TransactionAttributeSource transactionAttributeSource() {
        return new AnnotationTransactionAttributeSource();
    }

解析事务注解,就是对注解中的参数进行封装,这里创建了一个AnnotationTransactionAttributeSource

public AnnotationTransactionAttributeSource(boolean publicMethodsOnly) {
        this.publicMethodsOnly = publicMethodsOnly;
        if (jta12Present || ejb3Present) {
            this.annotationParsers = new LinkedHashSet<>(4);
            //主要是用这个注解解析器,解析方法上面的@Transactional注解
            this.annotationParsers.add(new SpringTransactionAnnotationParser());
            if (jta12Present) {
                this.annotationParsers.add(new JtaTransactionAnnotationParser());
            }
            if (ejb3Present) {
                this.annotationParsers.add(new Ejb3TransactionAnnotationParser());
            }
        }
        else {
            this.annotationParsers = Collections.singleton(new SpringTransactionAnnotationParser());
        }
    }

该类中主要借助SpringTransactionAnnotationParser进行解析,将所有的注解参数封装到RuleBasedTransactionAttribute中返回。

protected TransactionAttribute parseTransactionAnnotation(AnnotationAttributes attributes) {
        RuleBasedTransactionAttribute rbta = new RuleBasedTransactionAttribute();

        Propagation propagation = attributes.getEnum("propagation");
        rbta.setPropagationBehavior(propagation.value());
        Isolation isolation = attributes.getEnum("isolation");
        rbta.setIsolationLevel(isolation.value());
        rbta.setTimeout(attributes.getNumber("timeout").intValue());
        rbta.setReadOnly(attributes.getBoolean("readOnly"));
        rbta.setQualifier(attributes.getString("value"));

        List<RollbackRuleAttribute> rollbackRules = new ArrayList<>();
        for (Class<?> rbRule : attributes.getClassArray("rollbackFor")) {
            rollbackRules.add(new RollbackRuleAttribute(rbRule));
        }
        for (String rbRule : attributes.getStringArray("rollbackForClassName")) {
            rollbackRules.add(new RollbackRuleAttribute(rbRule));
        }
        for (Class<?> rbRule : attributes.getClassArray("noRollbackFor")) {
            rollbackRules.add(new NoRollbackRuleAttribute(rbRule));
        }
        for (String rbRule : attributes.getStringArray("noRollbackForClassName")) {
            rollbackRules.add(new NoRollbackRuleAttribute(rbRule));
        }
        rbta.setRollbackRules(rollbackRules);

        return rbta;
    }

这都很简单。重点来看看如何创建Adivice的:

public TransactionInterceptor transactionInterceptor() {
        TransactionInterceptor interceptor = new TransactionInterceptor();
        interceptor.setTransactionAttributeSource(transactionAttributeSource());
        //事务管理器要跟数据源挂钩,所以需要自己定义
        if (this.txManager != null) {
            interceptor.setTransactionManager(this.txManager);
        }
        return interceptor;
    }

这里涉及TransactionInterceptor类,该类实现了TransactionAspectSupportMethodInterceptor接口,因此该类一定是事务的Advice,结合前面讲的AOP调用,该类的invoke一定会在AOP的链式调用中被执行,如下:

public Object invoke(MethodInvocation invocation) throws Throwable {

        Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);

        // 调用invokeWithinTransaction
        return invokeWithinTransaction(invocation.getMethod(), targetClass, invocation::proceed);
    }

重点在这里:

一、事务调用

位于TransactionAspectSupport中,一个事务切面的Advice,和事务控制的逻辑都在该类中处理

protected Object invokeWithinTransaction(Method method, @Nullable Class<?> targetClass,
            final InvocationCallback invocation) throws Throwable {

        //1.用于获取事务属性的类
        TransactionAttributeSource tas = getTransactionAttributeSource();

        //2.获取方法有@Transactional注解的属性
        final TransactionAttribute txAttr = (tas != null ? tas.getTransactionAttribute(method, targetClass) : null);

        //3.获取事务管理器
        final PlatformTransactionManager tm = determineTransactionManager(txAttr);
        final String joinpointIdentification = methodIdentification(method, targetClass, txAttr);

        if (txAttr == null || !(tm instanceof CallbackPreferringPlatformTransactionManager)) {
            //4.创建事务
            TransactionInfo txInfo = createTransactionIfNecessary(tm, txAttr, joinpointIdentification);
            Object retVal = null;
            try {
                // This is an around advice: Invoke the next interceptor in the chain.
                // This will normally result in a target object being invoked.
                //5.链式传递
                retVal = invocation.proceedWithInvocation();
            }
            catch (Throwable ex) {
                // target invocation exception
                //6.事务回滚
                completeTransactionAfterThrowing(txInfo, ex);
                throw ex;
            }
            finally {
                cleanupTransactionInfo(txInfo);
            }
            //7.事务提交
            commitTransactionAfterReturning(txInfo);
            return retVal;
        }

        else {
            final ThrowableHolder throwableHolder = new ThrowableHolder();

            // It's a CallbackPreferringPlatformTransactionManager: pass a TransactionCallback in.
            try {
                Object result = ((CallbackPreferringPlatformTransactionManager) tm).execute(txAttr, status -> {
                    TransactionInfo txInfo = prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);
                    try {
                        return invocation.proceedWithInvocation();
                    }
                    catch (Throwable ex) {
                        if (txAttr.rollbackOn(ex)) {
                            // A RuntimeException: will lead to a rollback.
                            if (ex instanceof RuntimeException) {
                                throw (RuntimeException) ex;
                            }
                            else {
                                throw new ThrowableHolderException(ex);
                            }
                        }
                        else {
                            // A normal return value: will lead to a commit.
                            throwableHolder.throwable = ex;
                            return null;
                        }
                    }
                    finally {
                        cleanupTransactionInfo(txInfo);
                    }
                });

                // Check result state: It might indicate a Throwable to rethrow.
                if (throwableHolder.throwable != null) {
                    throw throwableHolder.throwable;
                }
                return result;
            }
            catch (ThrowableHolderException ex) {
                throw ex.getCause();
            }
            catch (TransactionSystemException ex2) {
                if (throwableHolder.throwable != null) {
                    logger.error("Application exception overridden by commit exception", throwableHolder.throwable);
                    ex2.initApplicationException(throwableHolder.throwable);
                }
                throw ex2;
            }
            catch (Throwable ex2) {
                if (throwableHolder.throwable != null) {
                    logger.error("Application exception overridden by commit exception", throwableHolder.throwable);
                }
                throw ex2;
            }
        }
    }

该类主要做了如下的事:

  1. 获取方法有@Transactional注解的属性,这里最终就会调用到上面讲的parseTransactionAnnotation方法,封装注解中的所有属性,得到TransactionAttribute

  2. 获取事务管理器,缓存中没有时,会执行通过getBean,获取PlatformTransactionManager实例,就会获得我们配置的事务管理器实例,如下:

    defaultTransactionManager = this.beanFactory.getBean(PlatformTransactionManager.class);
  3. 创建事务

来看看如何创建事务的:

1.1 创建事务

protected TransactionInfo createTransactionIfNecessary(@Nullable PlatformTransactionManager tm,
            @Nullable TransactionAttribute txAttr, final String joinpointIdentification) {
.................

        TransactionStatus status = null;
        if (txAttr != null) {
            if (tm != null) {
                //代码1:开启事务:
                status = tm.getTransaction(txAttr);
            }
            else {
                ....logger..
            }
        }
        //创建事务信息对象,记录新老事务信息对象
        return prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);
    }

1.1.1 开启事务

上面会调用事务管理器的getTransaction开启事务【按照标注的代码1、2......顺序跟下面标题看】

public final TransactionStatus getTransaction(@Nullable TransactionDefinition definition) throws TransactionException {
        //代码1创建事务对象
        Object transaction = doGetTransaction();

        // Cache debug flag to avoid repeated checks.
        boolean debugEnabled = logger.isDebugEnabled();

        if (definition == null) {
            // Use defaults if no transaction definition given.
            definition = new DefaultTransactionDefinition();
        }

        //代码2:如果存在事务【第一次不会走这里】
        if (isExistingTransaction(transaction)) {
            // Existing transaction found -> check propagation behavior to find out how to behave.
            return handleExistingTransaction(definition, transaction, debugEnabled);
        }

        // Check definition settings for new transaction.
        if (definition.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) {
            throw new InvalidTimeoutException("Invalid transaction timeout", definition.getTimeout());
        }

        // No existing transaction found -> check propagation behavior to find out how to proceed.
        if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {
            throw new IllegalTransactionStateException(
                    "No existing transaction found for transaction marked with propagation 'mandatory'");
        }
        //代码3:第一次没有事务走到这
        else if(){
        ..看1.1.1.3部分.
        }
        else {
            ......
            boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
            return prepareTransactionStatus(definition, null, true, newSynchronization, debugEnabled, null);
        }
    }

1.1.1.1创建事务对象

DataSourceTransactionManager类中

protected Object doGetTransaction() {
        //DataSourceTransactionObject内部维护了ConnectionHolder,ConnectionHolder中又维护了连接对象connection
        DataSourceTransactionObject txObject = new DataSourceTransactionObject();

        //是允许嵌套事务
        txObject.setSavepointAllowed(isNestedTransactionAllowed());

        //获取数据源对象
        ConnectionHolder conHolder =
                (ConnectionHolder) TransactionSynchronizationManager.getResource(obtainDataSource());
        txObject.setConnectionHolder(conHolder, false);
        return txObject;
    }
获取数据源和连接对象

obtainDataSource()方法获取到了数据源datasource,即下面的actualKey

private static Object doGetResource(Object actualKey) {
        Map<Object, Object> map = resources.get();
        if (map == null) {
            return null;
        }
        Object value = map.get(actualKey);
        // Transparently remove ResourceHolder that was marked as void...
        if (value instanceof ResourceHolder && ((ResourceHolder) value).isVoid()) {
            map.remove(actualKey);
            // Remove entire ThreadLocal if empty...
            if (map.isEmpty()) {
                resources.remove();
            }
            value = null;
        }
        return value;
    }

可以看到是从resources拿连接对象,而resources是一个ThreadLocal:ThreadLocal<Map<Object, Object>> resources,使用了一个Map,map的key就是datasource,value就是对应的连接对象,这里就是根据当前的数据源拿到对应的连接对象,之所以这样映射,是因为我们可以配置多个数据源。第一次创建事务的时候拿不到连接对象。

1.1.1.2已经存在事务

回到1.1.1,拿到事务对象后,会进入下面的if逻辑,如果当前已经存在事务【第一次不会进入这里】,就返回一个TransactionStatus对象,在这里会进行传播属性的判断。【建议先跳过这部分内容,看完第一个流程后再看,即下面的1.1.1.3的doBegin后再看】

private TransactionStatus handleExistingTransaction(
            TransactionDefinition definition, Object transaction, boolean debugEnabled)
            throws TransactionException {

        //1.PROPAGATION_NEVER 非事务运行,如果存在事务,则抛异常
        if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NEVER) {
            throw new IllegalTransactionStateException(
                    "Existing transaction found for transaction marked with propagation 'never'");
        }

        //2.PROPAGATION_NOT_SUPPORTED 如果以非事务方式执行操作,如果存在事务,就把当前事务挂起
        if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NOT_SUPPORTED) {
            if (debugEnabled) {
                logger.debug("Suspending current transaction");
            }
            //挂起当前事务
            Object suspendedResources = suspend(transaction);
            boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
            //修改事务状态信息,把事务的一些信息存储到当前线程中,ThreadLocal中
            return prepareTransactionStatus(
                    definition, null, false, newSynchronization, debugEnabled, suspendedResources);
        }
        //3.PROPAGATION_REQUIRES_NEW 有事务就挂起当前事务,再创建新的事务
        if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW) {
            if (debugEnabled) {
                logger.debug("Suspending current transaction, creating new transaction with name [" +
                        definition.getName() + "]");
            }
            //挂起当前事务[看后面讲解]
            SuspendedResourcesHolder suspendedResources = suspend(transaction);
            try {
                boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
                DefaultTransactionStatus status = newTransactionStatus(
                        definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
                //这里进行创建新的事务,后面有讲解
                doBegin(transaction, definition);
                prepareSynchronization(status, definition);
                return status;
            }
            catch (RuntimeException | Error beginEx) {
                resumeAfterBeginException(transaction, suspendedResources, beginEx);
                throw beginEx;
            }
        }
        //4.PROPAGATION_NESTED嵌套事务
        if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
            if (!isNestedTransactionAllowed()) {
                throw new NestedTransactionNotSupportedException(
                        "Transaction manager does not allow nested transactions by default - " +
                        "specify 'nestedTransactionAllowed' property with value 'true'");
            }

            //默认true
            if (useSavepointForNestedTransaction()) {
                //注意,这里事务状态为false,而不是true
                DefaultTransactionStatus status =
                        prepareTransactionStatus(definition, transaction, false, false, debugEnabled, null);
                //创建回滚点
                status.createAndHoldSavepoint();
                return status;
            }
            else {
                // Nested transaction through nested begin and commit/rollback calls.
                // Usually only for JTA: Spring synchronization might get activated here
                // in case of a pre-existing JTA transaction.
                boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
                DefaultTransactionStatus status = newTransactionStatus(
                        definition, transaction, true, newSynchronization, debugEnabled, null);
                doBegin(transaction, definition);
                prepareSynchronization(status, definition);
                return status;
            }
        }

        if (isValidateExistingTransaction()) {
            if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT) {
                Integer currentIsolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();
                if (currentIsolationLevel == null || currentIsolationLevel != definition.getIsolationLevel()) {
                    Constants isoConstants = DefaultTransactionDefinition.constants;
                    throw new IllegalTransactionStateException("Participating transaction with definition [" +
                            definition + "] specifies isolation level which is incompatible with existing transaction: " +
                            (currentIsolationLevel != null ?
                                    isoConstants.toCode(currentIsolationLevel, DefaultTransactionDefinition.PREFIX_ISOLATION) :
                                    "(unknown)"));
                }
            }
            if (!definition.isReadOnly()) {
                if (TransactionSynchronizationManager.isCurrentTransactionReadOnly()) {
                    throw new IllegalTransactionStateException("Participating transaction with definition [" +
                            definition + "] is not marked as read-only but existing transaction is");
                }
            }
        }
        boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
        //如果上面都没匹配,就会走这里。
        return prepareTransactionStatus(definition, transaction, false, newSynchronization, debugEnabled, null);
    }
挂起当前事务

挂起事务最终会执行doSuspend方法:

可以看到,就是将连接对象ConnectionHolder置空,然后再将第一次绑定的关系进行解绑

protected Object doSuspend(Object transaction) {
        DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
        txObject.setConnectionHolder(null);
        //解除绑定关系,
        return TransactionSynchronizationManager.unbindResource(obtainDataSource());
    }

解绑操作核心代码:

    Map<Object, Object> map = resources.get();
        if (map == null) {
            return null;
        }
        Object value = map.remove(actualKey);

整个流程下来,发现没有我们的默认传播属性PROPAGATION_REQUIRED,因此会来到代码的最后:

return prepareTransactionStatus(definition, transaction, false, newSynchronization, debugEnabled, null);,这里它将newTransaction设为了false,因为默认传播属性会使用原来的事务;而在第一次创建事务或者传播属性为REQUIRES_NEW时,会将该状态设为true,表示是一个新事务。

1.1.1.3 第一次不存在事务

如果第一次进来,则没有事务,就会进入这个else if

else if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
                definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
                definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
            //先挂起
            SuspendedResourcesHolder suspendedResources = suspend(null);
                ...
                try {
                    boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
                    //创建事务状态对象,封装了事务对象运行时的的一些信息
                    DefaultTransactionStatus status = newTransactionStatus(
                            definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);

                    //****在这里开启事务****
                    doBegin(transaction, definition);

                    //开启事务后,改变事务状态
                    prepareSynchronization(status, definition);
                    return status;
            }
  1. 创建事务状态对象,记录了事务流转过程中的状态数据,其中newTransaction属性为true时,就代表当前事务允许单独提交和回滚,一般是第一次创建事务或者事务传播属性为
    PROPAGATION_REQUIRES_NEW 的时候。如果为 false 则当前事务不能单独提交和回滚。
  2. 开启事务
  3. 改变事务状态
开启事务doBegin

doBegin主要做了如下的事:

  1. 从数据源拿到连接对象进行包装放到事务对象中
  2. 关闭事务自动提交。con.setAutoCommit(false);,不让数据库控制,而由我们自己控制【核心】
  3. 如果是第一次创建事务,则建立将当前线程与连接对象的关系,即将数据源和连接对象保存到ThreadLocal中
protected void doBegin(Object transaction, TransactionDefinition definition) {
        DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
        Connection con = null;

        try {
            //如果没有数据库连接
            if (!txObject.hasConnectionHolder() ||
                    txObject.getConnectionHolder().isSynchronizedWithTransaction()) {
                //1. 从连接池里面获取连接
                Connection newCon = obtainDataSource().getConnection();
                if (logger.isDebugEnabled()) {
                    logger.debug("Acquired Connection [" + newCon + "] for JDBC transaction");
                }
                //2.把连接包装成ConnectionHolder,然后设置到事务对象中
                txObject.setConnectionHolder(new ConnectionHolder(newCon), true);
            }

            txObject.getConnectionHolder().setSynchronizedWithTransaction(true);
            con = txObject.getConnectionHolder().getConnection();

            //3.获取隔离级别
            Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition);
            txObject.setPreviousIsolationLevel(previousIsolationLevel);
            if (con.getAutoCommit()) {
                txObject.setMustRestoreAutoCommit(true);
                if (logger.isDebugEnabled()) {
                    logger.debug("Switching JDBC Connection [" + con + "] to manual commit");
                }
                //关闭连接的自动提交(开启事务控制)
                con.setAutoCommit(false);
            }

            //设置只读事务 从这一点设置的时间点开始(时间点a)到这个事务结束的过程中,其他事务所提交的数据,该事务将看不见
            //设置只读事务后,数据库就不会进行加锁等保证安全的操作,提高效率
            prepareTransactionalConnection(con, definition);

            //自己提交关闭了,就说明已经开启事务了,事务是活动的
            txObject.getConnectionHolder().setTransactionActive(true);

            int timeout = determineTimeout(definition);
            if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) {
                txObject.getConnectionHolder().setTimeoutInSeconds(timeout);
            }

            // Bind the connection holder to the thread.
            if (txObject.isNewConnectionHolder()) {
                //如果是新创建的事务,则建立当前线程和数据库连接的关系
                TransactionSynchronizationManager.bindResource(obtainDataSource(), txObject.getConnectionHolder());
            }
        }

        ....
    }

到这里就完成了第一次进入事务切面的处理,主要就是获取连接对象开启事务,接着会走到第一部分的代码5,invocation.proceedWithInvocation();进行AOP的链式调用,如果我们有如下的业务代码,此时调用Service对象,也是个代理对象,因此又会进入getTransaction,此时因为第一次已经将数据源数据库连接关系保存到了当前线程的ThreadLocal中,因此可以拿到连接对象,它就会走第一个if,即1.1.1.2部分。

    @Transactional
    public Result add(User user) {
        userService.add(user);
        return new Result("OK");
    }

根据AOP的链式调用,invocation.proceedWithInvocation();执行完Advice方法后,就会调用被代理的方法代码,如果在这期间出现了异常。就会进入第一部分的catch中进行回滚completeTransactionAfterThrowing

1.2事务回滚

protected void completeTransactionAfterThrowing(@Nullable TransactionInfo txInfo, Throwable ex) {
        if (txInfo != null && txInfo.getTransactionStatus() != null) {

            if (txInfo.transactionAttribute != null && txInfo.transactionAttribute.rollbackOn(ex)) {
                try {
                    txInfo.getTransactionManager().rollback(txInfo.getTransactionStatus());
                }
                .....catch...

            }
            ....
        }
    }

最终会调用如下方法:

private void processRollback(DefaultTransactionStatus status, boolean unexpected) {
        try {
            boolean unexpectedRollback = unexpected;

            try {
                triggerBeforeCompletion(status);

                //如果是嵌套事务,则就会有回滚点
                if (status.hasSavepoint()) {
                    ...
                            //这里就按照回滚点进行回滚
                    status.rollbackToHeldSavepoint();
                }
                //都为PROPAGATION_REQUIRED最外层事务统一回滚
                else if (status.isNewTransaction()) {
                    ...
                    doRollback(status);
                }
                else {
                    // Participating in larger transaction
                    if (status.hasTransaction()) {
                        if (status.isLocalRollbackOnly() || isGlobalRollbackOnParticipationFailure()) {
                            if (status.isDebug()) {
                                logger.debug("Participating transaction failed - marking existing transaction as rollback-only");
                            }
                            doSetRollbackOnly(status);
                        }
                        ..
                    }
                    ..
                    // Unexpected rollback only matters here if we're asked to fail early
                    if (!isFailEarlyOnGlobalRollbackOnly()) {
                        unexpectedRollback = false;
                    }
                }
            }
            ....

            triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);
.....
        }
        finally {
            cleanupAfterCompletion(status);
        }
    }

doRollback中拿到连接对象调用rollback回滚:

protected void doRollback(DefaultTransactionStatus status) {
        DataSourceTransactionObject txObject = (DataSourceTransactionObject) status.getTransaction();
        Connection con = txObject.getConnectionHolder().getConnection();
        ..
        try {
            con.rollback();
        }
        ....
    }

回滚这部分都几点需要特别注意的:

在回滚中,需要判断事务的状态,如果都是PROPAGATION_REQUIRED传播属性,则只有在其NewTransaction是true的时候,才会调用doRollback回滚,即会在最外层的调用中统一进行回滚,不会在内层调用进行回滚。而为true的状态只有在第一次创建事务或传播属性为NEW时才是,因此如果传播属性为NEW,则出现异常就一定会回滚。

如果有回滚点,则按照回滚点回滚

事务提交回滚的几个问题【重要】

按照这个逻辑,就会有以下几个问题:

有三个事务ABC,在A中先调用B再调用C:

(没有特别说明的,都是默认传播属性REQUIRED,第一个A传播属性无关紧要,传播属性主要针对后面的事务)

  1. 如果ABC都是默认传播属性,如果在C出现异常,则会在A中统一回滚,三个都不会提交成功
  2. 如果B是NEW
    • 如果在B中抛出异常,则会在A中回滚,A和B都不会提交成功,C压根不会执行到
    • 如果在C抛出异常,因为B是NEW,所以B会提交成功(和A和C不是同一个连接),而C的异常会在A中捕获回滚,因此C不会提交成功
  3. 如果C是NEW
    • 如果C抛异常,则B会提交成功,C会回滚提交失败
    • 如果B抛异常,则都A和B提交失败,C不会调用到
  4. 如果B和C都是NEW
    • 如果C抛异常,则B提交成功,C会回滚
    • 如果B抛异常,则B会回滚,C调用不到
  5. 而如果我们在抛异常的时候,手动用try-catch捕获,则不会回滚,因为spring中的catch无法捕捉到异常

假如B和C都是NEW,如果在C中抛出异常,则B会提交成功不回滚,C不会提交成功进行回滚;

嵌套事务的问题

同样是上面的例子,假如此时B和C的属性为PROPAGATION_NESTED,即使用了嵌套事务,那么就会在这两个事务前创建回滚点,如果此时在C抛出异常,此时会发生什么?B会正常提交吗?

答案是不会的,因为如果C抛出异常,该异常最终会在最外层的A被捕获,在A中进行统一回滚,这样B和C都无法提交。要解决这个问题,就是想要在安全点前回滚,保证C出异常时B正常提交,就要让最外层的事务无法捕获异常。那么我们就可以按上面的在A中使用try-catch将B和C包裹起来,手动捕获异常,这样Spring就无法捕获回滚,B就能够正常提交。

另外如果在同类中,事务A中调用另一个带事务的B,则B的事务会失效,此时应该在该类注入本类,然后通过注入的bean调用B,才可以避免B的事务失效。

因为 spring通过TransactionInterceptor类来控制事务开启,提交,回滚等, 它会创建一个目标类的代理类. 而在本示例中methodA方法调用methodB方法时,并不是通过代理类去调用,而是通过this调用本身的方法methodB方法.所以methodB方法的事务并不会开启。

class A{
    @Transactional
    methodA(){
        methodB();
    }

    @Transactional
    methodB(){
       dosomething
    }
}

上面情况B的事务会失效,应该改为下面的:

class A{
    @Autowired
    A a;

    @Transactional
    methodA(){
        a.methodB();
    }

    @Transactional
    methodB(){
       dosomething
    }
}

或者将方法B放在另一个类中,然后注入另一个类调用B方法;或者通过(A) AopContext.currentProxy().methodB()调用。

1.3 事务提交

如果没有Advice需要调用,就会调用被代理的方法,如果没有出现异常,就会走到commitTransactionAfterReturning提交事务,最终会调用下面的方法:

public final void commit(TransactionStatus status) throws TransactionException {
        if (status.isCompleted()) {//事务已经完成了,就不能再提交事务,抛出异常
            throw new IllegalTransactionStateException(
                    "Transaction is already completed - do not call commit or rollback more than once per transaction");
        }

        DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status;
        if (defStatus.isLocalRollbackOnly()) {
            if (defStatus.isDebug()) {
                logger.debug("Transactional code has requested rollback");
            }
            //处理异常回滚
            processRollback(defStatus, false);
            return;
        }

        if (!shouldCommitOnGlobalRollbackOnly() && defStatus.isGlobalRollbackOnly()) {
            if (defStatus.isDebug()) {
                logger.debug("Global transaction is marked as rollback-only but transactional code requested commit");
            }
            processRollback(defStatus, true);
            return;
        }
        ///提交
        processCommit(defStatus);
    }

提交逻辑:

private void processCommit(DefaultTransactionStatus status) throws TransactionException {
        try {
            boolean beforeCompletionInvoked = false;

            try {
                boolean unexpectedRollback = false;
                prepareForCommit(status);
                triggerBeforeCommit(status);
                triggerBeforeCompletion(status);
                beforeCompletionInvoked = true;
                //如果有回滚点,则需要先把回滚点清除,回滚点是在嵌套事务中添加的,如果能走到这里,说明没有异常,执行成功,那么这些回滚点(安全点)就没有存在的意义了,所以直接清除掉
                if (status.hasSavepoint()) {
                    if (status.isDebug()) {
                        logger.debug("Releasing transaction savepoint");
                    }
                    unexpectedRollback = status.isGlobalRollbackOnly();
                    status.releaseHeldSavepoint();
                }
                //这里根据NewTransaction状态就会出现两个情况:
                //1. 如果隔离级别都是PROPAGATION_REQUIRED,则最外层的调用才会走进来统一提交
                //2.而如果是PROPAGATION_REQUIRES_NEW,每一个事务都会进来
                else if (status.isNewTransaction()) {
                    if (status.isDebug()) {
                        logger.debug("Initiating transaction commit");
                    }
                    unexpectedRollback = status.isGlobalRollbackOnly();
                    //内部获取连接对象调用其commit方法提交事务
                    doCommit(status);
                }
                else if (isFailEarlyOnGlobalRollbackOnly()) {
                    unexpectedRollback = status.isGlobalRollbackOnly();
                }

                // Throw UnexpectedRollbackException if we have a global rollback-only
                // marker but still didn't get a corresponding exception from commit.
                if (unexpectedRollback) {
                    throw new UnexpectedRollbackException(
                            "Transaction silently rolled back because it has been marked as rollback-only");
                }
            }
            catch (UnexpectedRollbackException ex) {
                // can only be caused by doCommit
                triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);
                throw ex;
            }
            catch (TransactionException ex) {
                // can only be caused by doCommit
                if (isRollbackOnCommitFailure()) {
                    doRollbackOnCommitException(status, ex);
                }
                else {
                    triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
                }
                throw ex;
            }
            catch (RuntimeException | Error ex) {
                if (!beforeCompletionInvoked) {
                    triggerBeforeCompletion(status);
                }
                doRollbackOnCommitException(status, ex);
                throw ex;
            }

            // Trigger afterCommit callbacks, with an exception thrown there
            // propagated to callers but the transaction still considered as committed.
            try {
                triggerAfterCommit(status);
            }
            finally {
                triggerAfterCompletion(status, TransactionSynchronization.STATUS_COMMITTED);
            }

        }
        finally {
            //释放资源
            cleanupAfterCompletion(status);
        }
    }

事务提交流程:

  1. 如果有回滚点,需要清除回滚点
  2. 如果传播属性为REQUIERD,则只有最外层的事务才会提交
  3. 如果传播属性为NEW,则每次都会提交
  4. 释放资源

2和3主要根据事务NewTransaction状态判断,而前面讲过,只有第一次创建事务和传播属性为NEW时,才会将该状态设为true,因为NEW每次都会将当前事务挂起,而创建新的事务。

释放资源

private void cleanupAfterCompletion(DefaultTransactionStatus status) {
        status.setCompleted();
        if (status.isNewSynchronization()) {
            TransactionSynchronizationManager.clear();
        }
        //NewTransaction为true才释放
        if (status.isNewTransaction()) {
            doCleanupAfterCompletion(status.getTransaction());
        }
        //如果当前事务有挂起的事务
        if (status.getSuspendedResources() != null) {
            if (status.isDebug()) {
                logger.debug("Resuming suspended transaction after completion of inner transaction");
            }
            Object transaction = (status.hasTransaction() ? status.getTransaction() : null);
            //恢复挂起连接的绑定
            resume(transaction, (SuspendedResourcesHolder) status.getSuspendedResources());
        }
    }
  1. 当仅当NewTransaction为true才释放,因为为true时表示所有操作都执行完了,然后将连接对象归还到连接池,再解除在ThreadLocal中绑定的线程和连接对象的关系。

  2. 如果还有事务被挂起,则恢复被挂起的事务的连接关系【前面挂起的事务解除了该关系】

二、注解事务和编程式事务

方法上面加了@Transactional 的事务就是注解事务

这种事务控制粒度不够细,如果方法流程很长的话会产生连接占用问题,连接占用就会导致整个系统吞吐量下降。
注解事务犹豫隔离级别的不同,可能导致可重复读的问题,比如抢火车票这种,需要用
到乐观锁,但是在重复抢票的时候由于是同一个连接对象,所以每次查询的数据是一样的。

编程式事务解决了这个问题,这个查询语句是在 execute 方法内,当这个方法执行完
时就提交了事务,下次再进来时又是一个单独的新事务了。
编程式事务控制粒度更细,可以只关心需要事务控制的操作,如果不需要事务控制的代
码可以不放在 execute 方法内。

 @Bean
    public TransactionTemplate transactionTemplate(PlatformTransactionManager platformTransactionManager) {
        TransactionTemplate transactionTemplate = new TransactionTemplate();
        transactionTemplate.setTransactionManager(platformTransactionManager);
        return transactionTemplate;
    }

@Autowired
private TransactionTemplate transactionTemplate;

public int getTicketModeOne() {

        Integer execute = transactionTemplate.execute(status -> {
            //1、获取锁
            List<ZgTicket> zgTickets = commonMapper.queryTicketById("12306");
            Map lockmap = new HashMap();
            lockmap.put("ticketId", "12306");
            lockmap.put("version", zgTickets.get(0).getVersion());
            int i = commonMapper.updateLock(lockmap);

            if (i > 0) {
                //抢票
                ZgTicket zgTicket = zgTickets.get(0);
                zgTicket.setTicketCount(2);
                int i1 = commonMapper.updateTicket(zgTicket);
            }
            return i;
        });

        if (execute == 0) {
            //继续抢
            getTicketModeOne();
        }
        return 0;
    }

如果想更细,可以自己手动控制事务:

@Autowired
    PlatformTransactionManager platformTransactionManager;

    public void xxx() {
        DefaultTransactionDefinition defaultTransactionDefinition = new DefaultTransactionDefinition();
        defaultTransactionDefinition.setPropagationBehavior(0);
        TransactionStatus transaction = platformTransactionManager.getTransaction(defaultTransactionDefinition);

        try {
            System.out.println("业务代码");
        }catch (Exception e) {
            platformTransactionManager.rollback(transaction);
        }

        platformTransactionManager.commit(transaction);
    }

附1:事务的传播属性

所谓传播属性就是告诉spring如何去控制事务。

  • PROPAGATION_REQUIRED【默认】

    如果当前没有事务,就新建一个事务,如果已经存在一个事务中,加入到这个事务中。即使用同一个事务

  • PROPAGATION_SUPPORTS

    支持当前事务,如果当前没有事务,就以非事务方式执行。

  • PROPAGATION_MANDATORY
    使用当前的事务,如果当前没有事务,就抛出异常

  • PROPAGATION_REQUIRES_NEW
    新建事务,如果当前存在事务,把当前事务挂起。

  • PROPAGATION_NOT_SUPPORTED

    以非事务方式执行操作,如果当前存在事务,就把当前事务挂起。

  • PROPAGATION_NEVER

    以非事务方式执行,如果当前存在事务,则抛出异常。

  • PROPAGATION_NESTED

    如果当前存在事务,则在嵌套事务内执行。如果当前没有事务,则执行与PROPAGATION_REQUIRED类似的操作。

附2:隔离级别

  1. ISOLATION_DEFAULT

    默认的隔离级别,使用数据库默认的事务隔离级别.(另外四个与JDBC的隔离级别相对应)

  2. ISOLATION_READ_UNCOMMITTED:【读未提交】

    这是事务最低的隔离级别,它允许另外一个事务可以看到这个事务未提交的数据。

    产生脏读,不可重复读和幻读。

  3. ISOLATION_READ_COMMITTED:【读提交】

    保证一个事务修改的数据提交后才能被另外一个事务读取,另外一个事务不能读取该事务未提交的数据。

    解决脏读,但产生不可重复读、幻读

  4. ISOLATION_REPEATABLE_READ: 【可重复读】

    这种事务隔离级别可以防止脏读,不可重复读。但是可能出现幻像读。它除了保证一个事务不能读取另一个事务未提交的数据外,还保证了避免下面的情况产生(不可重复读)。

    解决脏读、不可重复读,但无法解决幻读。

  5. ISOLATION_SERIALIZABLE

    这是花费最高代价但是最可靠的事务隔离级别。

    事务被处理为顺序执行。除了防止脏读,不可重复读外,还避免了幻像读。

  • 脏读:

    读到了尚未提交事务的数据。

    例子:

     1. 事务A进行了插入操作,插入了一条数据,尚未提交事务
      2. 此时事务B读取到了该数据,对该数据进行了一系列操作
      3. 但如果事务A没有提交事务,或是进行了回滚,则该数据不存在,因此事务B使用的数据就是脏数据,是不准确的。
  • 不可重复读:

    指在一个事务内,多次读同一数据读的结果出现不同。

    例子:

     1. 在事务A先读了一次数据为X,此时事务A还没有结束
      2. 在事务B修改的数据X,变成Y,事务B提交
      3. 在事务A进行一些业务操作后,刚好在事务B提交后又进行了读取,发现读的数据变成了Y,和第一次读的不一样,这就是不可重复度。
  • 幻觉读:

    当某个事务在读取某个范围内的记录时,另外一个事务又在该范围内插入了新的记录,当之前的事务再次读取该范围的记录时,发现两次不一样,产生幻读。

    如:

    1. 事务A查询数据,发现有4条,尚未提交
    2. 事务B插入了一条新的数据X后提交。
    3. 事务A发现没有X,也进行插入一条X,这时可能就会报主键错误,因为X已经插入过了,但是事务A好像出现了幻觉一样,没有发现已经插入过了又进行插入一次。
全部评论

相关推荐

求面试求offer啊啊啊啊:这个在牛客不是老熟人了吗
点赞 评论 收藏
分享
数开小菜鸡_暂退沉淀版:大二第三段,还是字节,这下真得点点举办了
点赞 评论 收藏
分享
评论
点赞
1
分享

创作者周榜

更多
牛客网
牛客企业服务