Seata源码—9.Seata XA模式的事务处理
大纲
1.Seata XA分布式事务案例及AT与XA的区别
2.Seata XA分布式事务案例的各模块运行流程
3.Seata使用Spring Boot自动装配简化复杂配置
4.全局事务注解扫描组件的自动装配
5.全局事务注解扫描器的核心变量与初始化
6.全局事务注解扫描器创建AOP代理
7.全局事务降级检查开启与提交事务的源码
8.Seata Server故障时全局事务拦截器的降级处理
9.基于HTTP请求头的全局事务传播的源码
10.XA数据源代理的初始化与XA连接代理的获取
11.XA分支事务注册与原始XA连接启动的源码
12.XA分布式事务两阶段提交流程的源码
1.Seata XA分布式事务案例及AT与XA的区别
(1)seata-samples项目的seata-xa模块简介
(2)启动Seata XA分布式事务案例的步骤
(3)Seata XA模式与AT模式对比
(1)seata-samples项目的seata-xa模块简介
seata-samples项目下的seata-xa模块,便基于Seata XA模式,实现了分布式事务的提交和回滚。
该seata-xa模块是一个Spring Boot项目:
一.使用了Feign来实现远程调用
二.使用了Spring JDBC来实现访问MySQL数据库
三.使用了Seata XA来实现分布式事务
(2)启动Seata XA分布式事务案例的步骤
一.执行案例所需的SQL语句:sql/all_in_one.sql
DROP TABLE IF EXISTS `stock_tbl`; CREATE TABLE `stock_tbl` ( `id` int(11) NOT NULL AUTO_INCREMENT, `commodity_code` varchar(255) DEFAULT NULL, `count` int(11) DEFAULT 0, PRIMARY KEY (`id`), UNIQUE KEY (`commodity_code`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8; DROP TABLE IF EXISTS `order_tbl`; CREATE TABLE `order_tbl` ( `id` int(11) NOT NULL AUTO_INCREMENT, `user_id` varchar(255) DEFAULT NULL, `commodity_code` varchar(255) DEFAULT NULL, `count` int(11) DEFAULT 0, `money` int(11) DEFAULT 0, PRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8; DROP TABLE IF EXISTS `account_tbl`; CREATE TABLE `account_tbl` ( `id` int(11) NOT NULL AUTO_INCREMENT, `user_id` varchar(255) DEFAULT NULL, `money` int(11) DEFAULT 0, PRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
二.下载Seata Sever压缩包
访问:https://github.com/seata/seata/releases
三.解压Seata Sever的压缩包并启动Seata Server
$ unzip seata-server-xxx.zip $ cd distribution $ sh ./bin/seata-server.sh 8091 file
四.依次启动AccountXA、OrderXA、StockXA、BusinessXA服务
五.测试
# 具体调用参数请结合BusinessController的代码 $ curl http://127.0.0.1:8084/purchase
六.说明
数据初始化逻辑参考BusinessService.initData()方法。基于初始化数据和默认的调用逻辑,purchase将被成功调用3次。每次账户余额扣减3000,由最初的10000减少到1000。第4次调用,因为账户余额不足,purchase调用将失败。最后相应的库存、订单、账户都会回滚。
(3)Seata XA模式与AT模式对比
只要切换数据源代理类型,上述案例即可在XA模式和AT模式之间进行切换。
一.XA模式使用的数据源代理类型是DataSourceProxyXA
public class DataSourceProxy { @Bean("dataSourceProxy") public DataSource dataSource(DruidDataSource druidDataSource) { //DataSourceProxyXA for XA mode return new DataSourceProxyXA(druidDataSource); } }
二.AT模式使用的数据源代理类型是DataSourceProxy
public class DataSourceProxy { @Bean("dataSourceProxy") public DataSource dataSource(DruidDataSource druidDataSource) { return new DataSourceProxy(druidDataSource); } }
AT模式需要在数据库中建立undo_log表
XA模式不需要在数据库中建立undo_log表
CREATE TABLE `undo_log` ( `id` bigint(20) NOT NULL AUTO_INCREMENT, `branch_id` bigint(20) NOT NULL, `xid` varchar(100) NOT NULL, `context` varchar(128) NOT NULL, `rollback_info` longblob NOT NULL, `log_status` int(11) NOT NULL, `log_created` datetime NOT NULL, `log_modified` datetime NOT NULL, PRIMARY KEY (`id`), UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`) ) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;
2.Seata XA分布式事务案例的各模块运行流程
(1)account-xa模块实现的账户服务的运行流程
(2)order-xa模块实现的订单服务的运行流程
(3)stock-xa模块实现的库存服务的运行流程
(4)business-xa模块实现的全局事务入口服务的运行流程
(1)account-xa模块实现的账户服务的运行流程
如下是account-xa模块的核心代码:
@EnableTransactionManagement @SpringBootApplication public class AccountXAApplication { public static void main(String[] args) { //监听8083端口 SpringApplication.run(AccountXAApplication.class, args); } } @Configuration public class AccountXADataSourceConfiguration { @Bean @ConfigurationProperties(prefix = "spring.datasource") public DruidDataSource druidDataSource() { return new DruidDataSource(); } @Bean("dataSourceProxy") public DataSource dataSource(DruidDataSource druidDataSource) { //DataSourceProxyXA for XA mode return new DataSourceProxyXA(druidDataSource); } @Bean("jdbcTemplate") public JdbcTemplate jdbcTemplate(DataSource dataSourceProxy) { return new JdbcTemplate(dataSourceProxy); } @Bean public PlatformTransactionManager txManager(DataSource dataSourceProxy) { return new DataSourceTransactionManager(dataSourceProxy); } } @RestController public class AccountController { @Autowired private AccountService accountService; @RequestMapping(value = "/reduce", method = RequestMethod.GET, produces = "application/json") public String reduce(String userId, int money) { try { accountService.reduce(userId, money); } catch (Exception exx) { exx.printStackTrace(); return FAIL; } return SUCCESS; } } @Service public class AccountService { public static final String SUCCESS = "SUCCESS"; public static final String FAIL = "FAIL"; private static final Logger LOGGER = LoggerFactory.getLogger(AccountService.class); @Autowired private JdbcTemplate jdbcTemplate; @Transactional public void reduce(String userId, int money) { String xid = RootContext.getXID(); LOGGER.info("reduce account balance in transaction: " + xid); jdbcTemplate.update("update account_tbl set money = money - ? where user_id = ?", new Object[]{money, userId}); int balance = jdbcTemplate.queryForObject("select money from account_tbl where user_id = ?", new Object[]{userId}, Integer.class); LOGGER.info("balance after transaction: " + balance); if (balance < 0) { throw new RuntimeException("Not Enough Money ..."); } } }
(2)order-xa模块实现的订单服务的运行流程
如下是order-xa模块的核心代码:
@SpringBootApplication @EnableFeignClients public class OrderXAApplication { public static void main(String[] args) { //监听8082端口 SpringApplication.run(OrderXAApplication.class, args); } } @Configuration public class OrderXADataSourceConfiguration { @Bean @ConfigurationProperties(prefix = "spring.datasource") public DruidDataSource druidDataSource() { return new DruidDataSource(); } @Bean("dataSourceProxy") public DataSource dataSource(DruidDataSource druidDataSource) { //DataSourceProxyXA for XA mode return new DataSourceProxyXA(druidDataSource); } @Bean("jdbcTemplate") public JdbcTemplate jdbcTemplate(DataSource dataSourceProxy) { return new JdbcTemplate(dataSourceProxy); } } @RestController public class OrderController { @Autowired private OrderService orderService; @RequestMapping(value = "/create", method = RequestMethod.GET, produces = "application/json") public String create(String userId, String commodityCode, int orderCount) { try { orderService.create(userId, commodityCode, orderCount); } catch (Exception exx) { exx.printStackTrace(); return FAIL; } return SUCCESS; } } @Service public class OrderService { public static final String SUCCESS = "SUCCESS"; public static final String FAIL = "FAIL"; private static final Logger LOGGER = LoggerFactory.getLogger(OrderService.class); @Autowired private AccountFeignClient accountFeignClient; @Autowired private JdbcTemplate jdbcTemplate; public void create(String userId, String commodityCode, Integer count) { String xid = RootContext.getXID(); LOGGER.info("create order in transaction: " + xid); //定单总价 = 订购数量(count) * 商品单价(100) int orderMoney = count * 100; //生成订单 jdbcTemplate.update("insert order_tbl(user_id,commodity_code,count,money) values(?,?,?,?)", new Object[]{userId, commodityCode, count, orderMoney}); //调用账户余额扣减 String result = accountFeignClient.reduce(userId, orderMoney); if (!SUCCESS.equals(result)) { throw new RuntimeException("Failed to call Account Service. "); } } } @FeignClient(name = "account-xa", url = "127.0.0.1:8083") public interface AccountFeignClient { @GetMapping("/reduce") String reduce(@RequestParam("userId") String userId, @RequestParam("money") int money); }
(3)stock-xa模块实现的库存服务的运行流程
如下是stock-xa模块的核心代码:
@SpringBootApplication public class StockXAApplication { public static void main(String[] args) { //监听8081端口 SpringApplication.run(StockXAApplication.class, args); } } @Configuration public class StockXADataSourceConfiguration { @Bean @ConfigurationProperties(prefix = "spring.datasource") public DruidDataSource druidDataSource() { return new DruidDataSource(); } @Bean("dataSourceProxy") public DataSource dataSource(DruidDataSource druidDataSource) { //DataSourceProxyXA for XA mode return new DataSourceProxyXA(druidDataSource); } @Bean("jdbcTemplate") public JdbcTemplate jdbcTemplate(DataSource dataSourceProxy) { return new JdbcTemplate(dataSourceProxy); } } @RestController public class StockController { @Autowired private StockService stockService; @RequestMapping(value = "/deduct", method = RequestMethod.GET, produces = "application/json") public String deduct(String commodityCode, int count) { try { stockService.deduct(commodityCode, count); } catch (Exception exx) { exx.printStackTrace(); return FAIL; } return SUCCESS; } } @Service public class StockService { public static final String SUCCESS = "SUCCESS"; public static final String FAIL = "FAIL"; private static final Logger LOGGER = LoggerFactory.getLogger(StockService.class); @Autowired private JdbcTemplate jdbcTemplate; public void deduct(String commodityCode, int count) { String xid = RootContext.getXID(); LOGGER.info("deduct stock balance in transaction: " + xid); jdbcTemplate.update("update stock_tbl set count = count - ? where commodity_code = ?", new Object[]{count, commodityCode}); } }
(4)business-xa模块实现的全局事务入口服务的运行流程
如下是business-xa模块的核心代码:
@SpringBootApplication @EnableFeignClients public class BusinessXAApplication { public static void main(String[] args) { //监听8084端口 SpringApplication.run(BusinessXAApplication.class, args); } } @Configuration public class BusinessXADataSourceConfiguration { @Bean @ConfigurationProperties(prefix = "spring.datasource") public DruidDataSource dataSource() { return new DruidDataSource(); } @Bean("jdbcTemplate") public JdbcTemplate jdbcTemplate(DataSource dataSource) { return new JdbcTemplate(dataSource); } } @RestController public class BusinessController { @Autowired private BusinessService businessService; @RequestMapping(value = "/purchase", method = RequestMethod.GET, produces = "application/json") public String purchase(Boolean rollback, Integer count) { int orderCount = 30; if (count != null) { orderCount = count; } try { businessService.purchase(TestDatas.USER_ID, TestDatas.COMMODITY_CODE, orderCount, rollback == null ? false : rollback.booleanValue()); } catch (Exception exx) { return "Purchase Failed:" + exx.getMessage(); } return SUCCESS; } } @Service public class BusinessService { public static final String SUCCESS = "SUCCESS"; public static final String FAIL = "FAIL"; private static final Logger LOGGER = LoggerFactory.getLogger(BusinessService.class); @Autowired private StockFeignClient stockFeignClient; @Autowired private OrderFeignClient orderFeignClient; @Autowired private JdbcTemplate jdbcTemplate; @GlobalTransactional public void purchase(String userId, String commodityCode, int orderCount, boolean rollback) { String xid = RootContext.getXID(); LOGGER.info("New Transaction Begins: " + xid); String result = stockFeignClient.deduct(commodityCode, orderCount); if (!SUCCESS.equals(result)) { throw new RuntimeException("库存服务调用失败,事务回滚!"); } result = orderFeignClient.create(userId, commodityCode, orderCount); if (!SUCCESS.equals(result)) { throw new RuntimeException("订单服务调用失败,事务回滚!"); } if (rollback) { throw new RuntimeException("Force rollback ... "); } } ... } @FeignClient(name = "stock-xa", url = "127.0.0.1:8081") public interface StockFeignClient { @GetMapping("/deduct") String deduct(@RequestParam("commodityCode") String commodityCode, @RequestParam("count") int count); } @FeignClient(name = "order-xa", url = "127.0.0.1:8082") public interface OrderFeignClient { @GetMapping("/create") String create(@RequestParam("userId") String userId, @RequestParam("commodityCode") String commodityCode, @RequestParam("orderCount") int orderCount); }
3.Seata使用Spring Boot自动装配简化复杂配置
(1)seata-spring-boot-starter简介
(2)自动装配GlobalTransactionScanner的Bean
(1)seata-spring-boot-starter简介
seata-v1.5.0源码下的seata-spring-boot-starter模块,便使用了Spring Boot自动装配来简化seata-all的复杂配置。与dubbo-spring-boot-starter是Spring Boot整合Dubbo所需的依赖一样,seata-spring-boot-starter也是Spring Boot整合Seata所需的依赖。
seata-samples-1.5.0项目中的seata-spring-boot-starter-samples模块,整合了SpringBoot + Dubbo + Mybatis + Nacos +Seata来实现Dubbo的分布式事务管理。其中使用Nacos作为Dubbo和Seata的注册中心和配置中心,使用MySQL数据库和MyBatis来操作数据。
注意:seata-spring-boot-starter默认会开启数据源自动代理,用户若再手动配置DataSourceProxy将会导致异常。
(2)自动装配GlobalTransactionScanner的Bean
seata-samples-1.5.0项目的seata-xa模块与seata-samples-dubbo模块不同,后者是通过xml配置文件来装配GlobalTransactionScanner这个Bean的。
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:dubbo="http://dubbo.apache.org/schema/dubbo" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://dubbo.apache.org/schema/dubbo http://dubbo.apache.org/schema/dubbo/dubbo.xsd"> ... <!-- 全局事务注解扫描组件 --> <bean class="io.seata.spring.annotation.GlobalTransactionScanner"> <constructor-arg value="dubbo-demo-account-service"/> <constructor-arg value="my_test_tx_group"/> </bean> </beans>
前者是通过seata-spring-boot-starter的SeataAutoConfiguration类,来实现自动装配GlobalTransactionScanner这个Bean的。
@ConditionalOnProperty(prefix = SEATA_PREFIX, name = "enabled", havingValue = "true", matchIfMissing = true) public class SeataAutoConfiguration { private static final Logger LOGGER = LoggerFactory.getLogger(SeataAutoConfiguration.class); @Bean(BEAN_NAME_FAILURE_HANDLER) @ConditionalOnMissingBean(FailureHandler.class) public FailureHandler failureHandler() { return new DefaultFailureHandlerImpl(); } @Bean @DependsOn({BEAN_NAME_SPRING_APPLICATION_CONTEXT_PROVIDER, BEAN_NAME_FAILURE_HANDLER}) @ConditionalOnMissingBean(GlobalTransactionScanner.class) public GlobalTransactionScanner globalTransactionScanner(SeataProperties seataProperties, FailureHandler failureHandler) { if (LOGGER.isInfoEnabled()) { LOGGER.info("Automatically configure Seata"); } //返回一个@GlobalTransaction全局事务注解的扫描组件 return new GlobalTransactionScanner(seataProperties.getApplicationId(), seataProperties.getTxServiceGroup(), failureHandler); } }
4.全局事务注解扫描组件的自动装配
(1)Spring Boot的自动装配驱动Seata Spring Boot的自动装配
(2)SeataAutoConfiguration会自动装配GlobalTransactionScanner
(1)Spring Boot的自动装配驱动Seata Spring Boot的自动装配
resources/META-INF/spring.factories的配置文件如下,所以Spring Boot会自动装配上述这4个类。
# Auto Configure org.springframework.boot.autoconfigure.EnableAutoConfiguration=\ io.seata.spring.boot.autoconfigure.SeataPropertiesAutoConfiguration,\ io.seata.spring.boot.autoconfigure.SeataDataSourceAutoConfiguration,\ io.seata.spring.boot.autoconfigure.SeataAutoConfiguration,\ io.seata.spring.boot.autoconfigure.HttpAutoConfiguration
(2)SeataAutoConfiguration会自动装配GlobalTransactionScanner
全局事务注解扫描器GlobalTransactionScanner会扫描@GlobalTransactional全局事务注解。
@ConditionalOnProperty(prefix = SEATA_PREFIX, name = "enabled", havingValue = "true", matchIfMissing = true) public class SeataAutoConfiguration { private static final Logger LOGGER = LoggerFactory.getLogger(SeataAutoConfiguration.class); @Bean(BEAN_NAME_FAILURE_HANDLER) @ConditionalOnMissingBean(FailureHandler.class) public FailureHandler failureHandler() { return new DefaultFailureHandlerImpl(); } @Bean @DependsOn({BEAN_NAME_SPRING_APPLICATION_CONTEXT_PROVIDER, BEAN_NAME_FAILURE_HANDLER}) @ConditionalOnMissingBean(GlobalTransactionScanner.class) public GlobalTransactionScanner globalTransactionScanner(SeataProperties seataProperties, FailureHandler failureHandler) { if (LOGGER.isInfoEnabled()) { LOGGER.info("Automatically configure Seata"); } //返回一个@GlobalTransaction全局事务注解的扫描组件 return new GlobalTransactionScanner(seataProperties.getApplicationId(), seataProperties.getTxServiceGroup(), failureHandler); } }
5.全局事务注解扫描器的核心变量与初始化
//The type Global transaction scanner. //AbstractAutoProxyCreator,自动代理创建组件,继承了它以后,Spring容器里的Bean都会传递给wrapIfNecessary() //从而让GlobalTransactionScanner.wrapIfNecessary()可以扫描每个Bean的每个方法 //判断是否添加了@GlobalTransactional注解,如果扫描到添加了就对这个Bean创建一个AOP代理 public class GlobalTransactionScanner extends AbstractAutoProxyCreator implements ConfigurationChangeListener, InitializingBean, ApplicationContextAware, DisposableBean { ... //方法拦截器 private MethodInterceptor interceptor; //全局事务注解拦截器 private MethodInterceptor globalTransactionalInterceptor; //应用ID private final String applicationId; //全局事务服务分组 private final String txServiceGroup; //事务模式 private final int mode; //与阿里云有关的,一个是访问key,一个是密钥key private String accessKey; private String secretKey; //是否禁用全局事务配置,默认是false private volatile boolean disableGlobalTransaction = ConfigurationFactory.getInstance().getBoolean(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION, DEFAULT_DISABLE_GLOBAL_TRANSACTION); //是否完成初始化 private final AtomicBoolean initialized = new AtomicBoolean(false); //失败处理钩子 private final FailureHandler failureHandlerHook; //Spring容器上下文 private ApplicationContext applicationContext; //Instantiates a new Global transaction scanner. public GlobalTransactionScanner(String txServiceGroup) { this(txServiceGroup, txServiceGroup, DEFAULT_MODE); } //Instantiates a new Global transaction scanner. public GlobalTransactionScanner(String txServiceGroup, int mode) { this(txServiceGroup, txServiceGroup, mode); } //Instantiates a new Global transaction scanner. public GlobalTransactionScanner(String applicationId, String txServiceGroup) { this(applicationId, txServiceGroup, DEFAULT_MODE); } //Instantiates a new Global transaction scanner. public GlobalTransactionScanner(String applicationId, String txServiceGroup, int mode) { this(applicationId, txServiceGroup, mode, null); } //Instantiates a new Global transaction scanner. public GlobalTransactionScanner(String applicationId, String txServiceGroup, FailureHandler failureHandlerHook) { this(applicationId, txServiceGroup, DEFAULT_MODE, failureHandlerHook); } //Instantiates a new Global transaction scanner. public GlobalTransactionScanner(String applicationId, String txServiceGroup, int mode, FailureHandler failureHandlerHook) { setOrder(ORDER_NUM); setProxyTargetClass(true); this.applicationId = applicationId; this.txServiceGroup = txServiceGroup; this.mode = mode; this.failureHandlerHook = failureHandlerHook; } //Spring销毁时的回调接口 @Override public void destroy() { ShutdownHook.getInstance().destroyAll(); } //Spring初始化回调接口,负责这个扫描组件GlobalTransactionScanner的初始化 @Override public void afterPropertiesSet() { if (disableGlobalTransaction) { if (LOGGER.isInfoEnabled()) { LOGGER.info("Global transaction is disabled."); } ConfigurationCache.addConfigListener(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION, (ConfigurationChangeListener)this); return; } if (initialized.compareAndSet(false, true)) { initClient(); } } //初始化Seata的两个网络通信客户端:TM网络客户端、RM网络客户端 private void initClient() { if (LOGGER.isInfoEnabled()) { LOGGER.info("Initializing Global Transaction Clients ... "); } if (StringUtils.isNullOrEmpty(applicationId) || StringUtils.isNullOrEmpty(txServiceGroup)) { throw new IllegalArgumentException(String.format("applicationId: %s, txServiceGroup: %s", applicationId, txServiceGroup)); } //init TM TMClient.init(applicationId, txServiceGroup, accessKey, secretKey); if (LOGGER.isInfoEnabled()) { LOGGER.info("Transaction Manager Client is initialized. applicationId[{}] txServiceGroup[{}]", applicationId, txServiceGroup); } //init RM RMClient.init(applicationId, txServiceGroup); if (LOGGER.isInfoEnabled()) { LOGGER.info("Resource Manager is initialized. applicationId[{}] txServiceGroup[{}]", applicationId, txServiceGroup); } if (LOGGER.isInfoEnabled()) { LOGGER.info("Global Transaction Clients are initialized. "); } registerSpringShutdownHook(); } ... }
6.全局事务扫描器创建AOP代理
//The type Global transaction scanner. //AbstractAutoProxyCreator,自动代理创建组件,继承了它以后,Spring容器里的Bean都会传递给wrapIfNecessary() //从而让GlobalTransactionScanner.wrapIfNecessary()可以扫描每个Bean的每个方法 //判断是否添加了@GlobalTransactional注解,如果扫描到添加了就对这个Bean创建一个AOP代理 public class GlobalTransactionScanner extends AbstractAutoProxyCreator implements ConfigurationChangeListener, InitializingBean, ApplicationContextAware, DisposableBean { ... //The following will be scanned, and added corresponding interceptor: //由于继承自AbstractAutoProxyCreator抽象类,所以Spring所有的Bean都会传递给这个方法来判断是否有一些Seata的注解 //如果有一些Seata的注解,那么就会针对这些注解创建对应的AOP代理 @Override protected Object wrapIfNecessary(Object bean, String beanName, Object cacheKey) { try { synchronized (PROXYED_SET) { if (PROXYED_SET.contains(beanName)) { return bean; } interceptor = null; //check TCC proxy if (TCCBeanParserUtils.isTccAutoProxy(bean, beanName, applicationContext)) { //TCC interceptor, proxy bean of sofa:reference/dubbo:reference, and LocalTCC interceptor = new TccActionInterceptor(TCCBeanParserUtils.getRemotingDesc(beanName)); ConfigurationCache.addConfigListener(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION, (ConfigurationChangeListener)interceptor); } else { Class<?> serviceInterface = SpringProxyUtils.findTargetClass(bean); Class<?>[] interfacesIfJdk = SpringProxyUtils.findInterfaces(bean); if (!existsAnnotation(new Class[]{serviceInterface}) && !existsAnnotation(interfacesIfJdk)) { return bean; } if (globalTransactionalInterceptor == null) { //构建一个GlobalTransactionalInterceptor,即全局事务注解的拦截器 globalTransactionalInterceptor = new GlobalTransactionalInterceptor(failureHandlerHook); ConfigurationCache.addConfigListener(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION, (ConfigurationChangeListener)globalTransactionalInterceptor); } interceptor = globalTransactionalInterceptor; } LOGGER.info("Bean[{}] with name [{}] would use interceptor [{}]", bean.getClass().getName(), beanName, interceptor.getClass().getName()); if (!AopUtils.isAopProxy(bean)) { //接下来会基于Spring的AbstractAutoProxyCreator创建针对目标Bean接口的动态代理 //这样后续调用到目标Bean的方法,就会调用到GlobalTransactionInterceptor拦截器 bean = super.wrapIfNecessary(bean, beanName, cacheKey); } else { AdvisedSupport advised = SpringProxyUtils.getAdvisedSupport(bean); Advisor[] advisor = buildAdvisors(beanName, getAdvicesAndAdvisorsForBean(null, null, null)); for (Advisor avr : advisor) { advised.addAdvisor(0, avr); } } PROXYED_SET.add(beanName); return bean; } } catch (Exception exx) { throw new RuntimeException(exx); } } ... }
7.全局事务降级检查开启与提交事务的源码
在GlobalTransactionalInterceptor的初始化方法中,如果发现需要启用全局事务降级检查机制,那么就会调用startDegradeCheck()方法启动降级检查定时调度线程。
该定时调度线程默认会每隔2秒运行一次,也就是每隔2秒会尝试到Seata Server开启一个全局事务。如果开启成功,则获取到对应的xid,并对该全局事务xid进行提交,并且发布一个降级检查成功的事件到事件总线中。如果开启失败或提交失败,则发布一个降级检查失败的事件到事件总线中。
//The type Global transactional interceptor. //当调用添加了全局事务注解@GlobalTransactional的方法时,会被AOP拦截进入到这个拦截器里的invoke()方法 public class GlobalTransactionalInterceptor implements ConfigurationChangeListener, MethodInterceptor { private static final Logger LOGGER = LoggerFactory.getLogger(GlobalTransactionalInterceptor.class); private static final FailureHandler DEFAULT_FAIL_HANDLER = new DefaultFailureHandlerImpl(); //全局事务执行模板 private final TransactionalTemplate transactionalTemplate = new TransactionalTemplate(); //全局锁模板 private final GlobalLockTemplate globalLockTemplate = new GlobalLockTemplate(); //失败处理器 private final FailureHandler failureHandler; //是否禁用全局事务 private volatile boolean disable; //降级检查周期 //降级机制:如果Seata Server挂掉导致全局事务没法推进,那么就可以进行降级运行本地事务 private static int degradeCheckPeriod; //是否启用降级检查机制 private static volatile boolean degradeCheck; //降级检查允许次数 private static int degradeCheckAllowTimes; //降级数量 private static volatile Integer degradeNum = 0; //达标数量 private static volatile Integer reachNum = 0; //降级检查的事件总线 private static final EventBus EVENT_BUS = new GuavaEventBus("degradeCheckEventBus", true); //降级检查定时调度线程池 private static ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1, new NamedThreadFactory("degradeCheckWorker", 1, true)); //region DEFAULT_GLOBAL_TRANSACTION_TIMEOUT //默认的全局事务超时时间 private static int defaultGlobalTransactionTimeout = 0; //endregion //Instantiates a new Global transactional interceptor. public GlobalTransactionalInterceptor(FailureHandler failureHandler) { //失败处理器 this.failureHandler = failureHandler == null ? DEFAULT_FAIL_HANDLER : failureHandler; //是否禁用全局事务,默认false this.disable = ConfigurationFactory.getInstance().getBoolean(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION, DEFAULT_DISABLE_GLOBAL_TRANSACTION); //是否启用全局事务降级检查机制,默认是false degradeCheck = ConfigurationFactory.getInstance().getBoolean(ConfigurationKeys.CLIENT_DEGRADE_CHECK, DEFAULT_TM_DEGRADE_CHECK); //如果启用全局事务降级检查机制 if (degradeCheck) { //添加一个监听器 ConfigurationCache.addConfigListener(ConfigurationKeys.CLIENT_DEGRADE_CHECK, this); //设置降级检查周期,默认是2s一次 degradeCheckPeriod = ConfigurationFactory.getInstance().getInt(ConfigurationKeys.CLIENT_DEGRADE_CHECK_PERIOD, DEFAULT_TM_DEGRADE_CHECK_PERIOD); //设置降级检查允许次数,默认10次 degradeCheckAllowTimes = ConfigurationFactory.getInstance().getInt(ConfigurationKeys.CLIENT_DEGRADE_CHECK_ALLOW_TIMES, DEFAULT_TM_DEGRADE_CHECK_ALLOW_TIMES); //将自己注册到降级检查事件总线里,作为事件订阅者 EVENT_BUS.register(this); //如果降级检查周期大于0,而且降级检查允许次数大于0,此时启动降级检查线程 if (degradeCheckPeriod > 0 && degradeCheckAllowTimes > 0) { startDegradeCheck(); } } //初始化默认全局事务超时时间 this.initDefaultGlobalTransactionTimeout(); } private void initDefaultGlobalTransactionTimeout() { if (GlobalTransactionalInterceptor.defaultGlobalTransactionTimeout <= 0) { int defaultGlobalTransactionTimeout; try { defaultGlobalTransactionTimeout = ConfigurationFactory.getInstance().getInt(ConfigurationKeys.DEFAULT_GLOBAL_TRANSACTION_TIMEOUT, DEFAULT_GLOBAL_TRANSACTION_TIMEOUT); } catch (Exception e) { LOGGER.error("Illegal global transaction timeout value: " + e.getMessage()); defaultGlobalTransactionTimeout = DEFAULT_GLOBAL_TRANSACTION_TIMEOUT; } if (defaultGlobalTransactionTimeout <= 0) { LOGGER.warn("Global transaction timeout value '{}' is illegal, and has been reset to the default value '{}'", defaultGlobalTransactionTimeout, DEFAULT_GLOBAL_TRANSACTION_TIMEOUT); defaultGlobalTransactionTimeout = DEFAULT_GLOBAL_TRANSACTION_TIMEOUT; } GlobalTransactionalInterceptor.defaultGlobalTransactionTimeout = defaultGlobalTransactionTimeout; } } ... //auto upgrade service detection //启动降级检查定时调度线程,默认每隔2s运行一次 private static void startDegradeCheck() { executor.scheduleAtFixedRate(() -> { if (degradeCheck) { try { //尝试通过应用id为null、事务服务分组为null、使用事务名称是degradeCheck、超时时间是60s的参数,去Seata Server开启一个全局事务 String xid = TransactionManagerHolder.get().begin(null, null, "degradeCheck", 60000); //如果开启成功了,就获取到一个xid,直接对全局事务xid进行commit提交 TransactionManagerHolder.get().commit(xid); //如果xid提交成功了,就发布一个降级检查事件到事件总线里,表明降级检查结果是true EVENT_BUS.post(new DegradeCheckEvent(true)); } catch (Exception e) { //如果开启一个全局事务失败,或者提交xid失败了,那么发布一个事件表示降级检查失败,结果是false EVENT_BUS.post(new DegradeCheckEvent(false)); } } }, degradeCheckPeriod, degradeCheckPeriod, TimeUnit.MILLISECONDS); } ... }
8.Seata Server故障时全局事务拦截器的降级处理
降级检查定时调度线程每2秒进行降级检查的结果,会以事件的形式被发送到事件总线EventBus中。
GlobalTransactionalInterceptor在初始化时已把自己注册到了EventBus,所以添加了@Subscribe注解的onDegradeCheck()会消费事件总线的事件。
AOP切面拦截器GlobalTransactionalInterceptor的onDegradeCheck()方法,如果发现降级检查失败,会对degradeNum进行递增。如果发现降级检查成功,则会恢复degradeNum为0。
这样,当调用添加了@GlobalTransactional注解的方法时,会执行GlobalTransactionalInterceptor的invoke()方法,于是会根据degradeNum是否大于degradeCheckAllowTimes,也就是降级次数是否大于允许降级检查的次数,来决定是否执行降级。其中,降级的逻辑就是不开启全局事务而直接运行目标方法。
public class GlobalTransactionalInterceptor implements ConfigurationChangeListener, MethodInterceptor { ... @Subscribe public static void onDegradeCheck(DegradeCheckEvent event) { if (event.isRequestSuccess()) {//如果检查成功,TC是有效的 if (degradeNum >= degradeCheckAllowTimes) { reachNum++; if (reachNum >= degradeCheckAllowTimes) { reachNum = 0; degradeNum = 0; if (LOGGER.isInfoEnabled()) { LOGGER.info("the current global transaction has been restored"); } } } else if (degradeNum != 0) { degradeNum = 0; } } else {//如果检查失败,TC故障了,无法开启全局事务和提交了 //如果降级次数小于允许降级检查次数(10次) if (degradeNum < degradeCheckAllowTimes) { degradeNum++; //对降级次数+1 if (degradeNum >= degradeCheckAllowTimes) { if (LOGGER.isWarnEnabled()) { LOGGER.warn("the current global transaction has been automatically downgraded"); } } } else if (reachNum != 0) { reachNum = 0; } } } //如果调用添加了@GlobalTransactional注解的方法,就会执行如下invoke()方法 @Override public Object invoke(final MethodInvocation methodInvocation) throws Throwable { Class<?> targetClass = methodInvocation.getThis() != null ? AopUtils.getTargetClass(methodInvocation.getThis()) : null; Method specificMethod = ClassUtils.getMostSpecificMethod(methodInvocation.getMethod(), targetClass); if (specificMethod != null && !specificMethod.getDeclaringClass().equals(Object.class)) { final Method method = BridgeMethodResolver.findBridgedMethod(specificMethod); final GlobalTransactional globalTransactionalAnnotation = getAnnotation(method, targetClass, GlobalTransactional.class); final GlobalLock globalLockAnnotation = getAnnotation(method, targetClass, GlobalLock.class); //如果禁用了全局事务,或者开启了降级检查,同时降级次数大于了降级检查允许次数,那么localDisable就为true //localDisable为true则表示全局事务被禁用了,此时就不可以开启全局事务了 boolean localDisable = disable || (degradeCheck && degradeNum >= degradeCheckAllowTimes); //如果全局事务没有禁用 if (!localDisable) { if (globalTransactionalAnnotation != null) { //真正处理全局事务的入口 return handleGlobalTransaction(methodInvocation, globalTransactionalAnnotation); } else if (globalLockAnnotation != null) { return handleGlobalLock(methodInvocation, globalLockAnnotation); } } } //直接运行目标方法 return methodInvocation.proceed(); } ... }
9.基于HTTP请求头的全局事务传播的源码
@Configuration @ConditionalOnWebApplication public class HttpAutoConfiguration extends WebMvcConfigurerAdapter { @Override public void addInterceptors(InterceptorRegistry registry) { registry.addInterceptor(new TransactionPropagationInterceptor()); } @Override public void extendHandlerExceptionResolvers(List<HandlerExceptionResolver> exceptionResolvers) { exceptionResolvers.add(new HttpHandlerExceptionResolver()); } } //Springmvc Intercepter. public class TransactionPropagationInterceptor extends HandlerInterceptorAdapter { private static final Logger LOGGER = LoggerFactory.getLogger(TransactionPropagationInterceptor.class); @Override public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) { String xid = RootContext.getXID(); //和Spring Cloud整合后,Feign在运行时也会执行Filter机制 //此时就可以把xid作为一个请求头放到HTTP请求里去 //接收方就会通过Spring MVC的拦截器,从请求头里提取xid,绑定到RootContext里 String rpcXid = request.getHeader(RootContext.KEY_XID); if (LOGGER.isDebugEnabled()) { LOGGER.debug("xid in RootContext[{}] xid in HttpContext[{}]", xid, rpcXid); } if (xid == null && rpcXid != null) { RootContext.bind(rpcXid); if (LOGGER.isDebugEnabled()) { LOGGER.debug("bind[{}] to RootContext", rpcXid); } } return true; } @Override public void postHandle(HttpServletRequest request, HttpServletResponse response, Object handler, ModelAndView modelAndView) { if (RootContext.inGlobalTransaction()) { XidResource.cleanXid(request.getHeader(RootContext.KEY_XID)); } } }
10.XA数据源代理的初始化与XA连接代理的获取
//DataSource proxy for XA mode. public class DataSourceProxyXA extends AbstractDataSourceProxyXA { private static final Logger LOGGER = LoggerFactory.getLogger(DataSourceProxyXA.class); public DataSourceProxyXA(DataSource dataSource) { this(dataSource, DEFAULT_RESOURCE_GROUP_ID); } public DataSourceProxyXA(DataSource dataSource, String resourceGroupId) { if (dataSource instanceof SeataDataSourceProxy) { LOGGER.info("Unwrap the data source, because the type is: {}", dataSource.getClass().getName()); dataSource = ((SeataDataSourceProxy) dataSource).getTargetDataSource(); } this.dataSource = dataSource; this.branchType = BranchType.XA; JdbcUtils.initDataSourceResource(this, dataSource, resourceGroupId); //Set the default branch type to 'XA' in the RootContext. RootContext.setDefaultBranchType(this.getBranchType()); } @Override public Connection getConnection() throws SQLException { Connection connection = dataSource.getConnection(); return getConnectionProxy(connection); } @Override public Connection getConnection(String username, String password) throws SQLException { Connection connection = dataSource.getConnection(username, password); return getConnectionProxy(connection); } protected Connection getConnectionProxy(Connection connection) throws SQLException { if (!RootContext.inGlobalTransaction()) { return connection; } return getConnectionProxyXA(connection); } @Override protected Connection getConnectionProxyXA() throws SQLException { Connection connection = dataSource.getConnection(); return getConnectionProxyXA(connection); } private Connection getConnectionProxyXA(Connection connection) throws SQLException { //物理连接 Connection physicalConn = connection.unwrap(Connection.class); XAConnection xaConnection = XAUtils.createXAConnection(physicalConn, this); ConnectionProxyXA connectionProxyXA = new ConnectionProxyXA(connection, xaConnection, this, RootContext.getXID()); connectionProxyXA.init(); return connectionProxyXA; } } //Abstract DataSource proxy for XA mode. public abstract class AbstractDataSourceProxyXA extends BaseDataSourceResource<ConnectionProxyXA> { protected static final String DEFAULT_RESOURCE_GROUP_ID = "DEFAULT_XA"; //Get a ConnectionProxyXA instance for finishing XA branch(XA commit/XA rollback) //@return ConnectionProxyXA instance public ConnectionProxyXA getConnectionForXAFinish(XAXid xaXid) throws SQLException { ConnectionProxyXA connectionProxyXA = lookup(xaXid.toString()); if (connectionProxyXA != null) { return connectionProxyXA; } return (ConnectionProxyXA)getConnectionProxyXA(); } protected abstract Connection getConnectionProxyXA() throws SQLException; //Force close the physical connection kept for XA branch of given XAXid. //@param xaXid the given XAXid public void forceClosePhysicalConnection(XAXid xaXid) throws SQLException { ConnectionProxyXA connectionProxyXA = lookup(xaXid.toString()); if (connectionProxyXA != null) { connectionProxyXA.close(); Connection physicalConn = connectionProxyXA.getWrappedConnection(); if (physicalConn instanceof PooledConnection) { physicalConn = ((PooledConnection)physicalConn).getConnection(); } //Force close the physical connection physicalConn.close(); } } }
11.XA分支事务注册与原始XA连接启动的源码
//Connection proxy for XA mode. //XA模式的连接代理,通过它可以进行本地事务打开和提交/回滚,执行SQL语句,执行XA两阶段提交 public class ConnectionProxyXA extends AbstractConnectionProxyXA implements Holdable { private static final Logger LOGGER = LoggerFactory.getLogger(ConnectionProxyXA.class); //当前自动提交事务状态,默认true private boolean currentAutoCommitStatus = true; //XA分支事务xid private XAXid xaBranchXid; //XA事务是否活跃的标记,默认false private boolean xaActive = false; //是否保持住XA事务,默认false private boolean kept = false; //Constructor of Connection Proxy for XA mode. //@param originalConnection Normal Connection from the original DataSource. //@param xaConnection XA Connection based on physical connection of the normal Connection above. //@param resource The corresponding Resource(DataSource proxy) from which the connections was created. //@param xid Seata global transaction xid. public ConnectionProxyXA(Connection originalConnection, XAConnection xaConnection, BaseDataSourceResource resource, String xid) { super(originalConnection, xaConnection, resource, xid); } public void init() { try { this.xaResource = xaConnection.getXAResource(); this.currentAutoCommitStatus = this.originalConnection.getAutoCommit(); if (!currentAutoCommitStatus) { throw new IllegalStateException("Connection[autocommit=false] as default is NOT supported"); } } catch (SQLException e) { throw new RuntimeException(e); } } ... //修改和调整自动提交事务状态时,会开始进行分支事务的注册 @Override public void setAutoCommit(boolean autoCommit) throws SQLException { if (currentAutoCommitStatus == autoCommit) { return; } if (autoCommit) { //According to JDBC spec: //If this method is called during a transaction and the auto-commit mode is changed, the transaction is committed. if (xaActive) { commit(); } } else { if (xaActive) { throw new SQLException("should NEVER happen: setAutoCommit from true to false while xa branch is active"); } //Start a XA branch long branchId = 0L; try { //1. register branch to TC then get the branchId //分支事务发起注册,类型为XA,传入resourceId和xid branchId = DefaultResourceManager.get().branchRegister(BranchType.XA, resource.getResourceId(), null, xid, null, null); } catch (TransactionException te) { cleanXABranchContext(); throw new SQLException("failed to register xa branch " + xid + " since " + te.getCode() + ":" + te.getMessage(), te); } //2. build XA-Xid with xid and branchId this.xaBranchXid = XAXidBuilder.build(xid, branchId); try { //3. XA Start xaResource.start(this.xaBranchXid, XAResource.TMNOFLAGS); } catch (XAException e) { cleanXABranchContext(); throw new SQLException("failed to start xa branch " + xid + " since " + e.getMessage(), e); } //4. XA is active this.xaActive = true; } currentAutoCommitStatus = autoCommit; } ... } //The type Abstract connection proxy on XA mode. public abstract class AbstractConnectionProxyXA implements Connection { public static final String SQLSTATE_XA_NOT_END = "SQLSTATE_XA_NOT_END"; //原始连接 protected Connection originalConnection; //XA包装过的连接 protected XAConnection xaConnection; //XA事务资源 protected XAResource xaResource; //基础数据库连接池资源 protected BaseDataSourceResource resource; //分布式事务xid protected String xid; public AbstractConnectionProxyXA(Connection originalConnection, XAConnection xaConnection, BaseDataSourceResource resource, String xid) { this.originalConnection = originalConnection; this.xaConnection = xaConnection; this.resource = resource; this.xid = xid; } ... }
12.XA分布式事务两阶段提交流程的源码
Seata的XA是依赖MySQL的XA来实现的。
//Connection proxy for XA mode. //XA模式的连接代理,通过它可以进行本地事务打开和提交/回滚,执行SQL语句,执行XA两阶段提交 public class ConnectionProxyXA extends AbstractConnectionProxyXA implements Holdable { ... //第一阶段提交 @Override public void commit() throws SQLException { if (currentAutoCommitStatus) { //Ignore the committing on an autocommit session. return; } if (!xaActive || this.xaBranchXid == null) { throw new SQLException("should NOT commit on an inactive session", SQLSTATE_XA_NOT_END); } try { //XA End: Success xaResource.end(xaBranchXid, XAResource.TMSUCCESS); //XA Prepare xaResource.prepare(xaBranchXid); //Keep the Connection if necessary keepIfNecessary(); } catch (XAException xe) { try { //Branch Report to TC: Failed DefaultResourceManager.get().branchReport(BranchType.XA, xid, xaBranchXid.getBranchId(), BranchStatus.PhaseOne_Failed, null); } catch (TransactionException te) { LOGGER.warn("Failed to report XA branch commit-failure on " + xid + "-" + xaBranchXid.getBranchId() + " since " + te.getCode() + ":" + te.getMessage() + " and XAException:" + xe.getMessage()); } throw new SQLException("Failed to end(TMSUCCESS)/prepare xa branch on " + xid + "-" + xaBranchXid.getBranchId() + " since " + xe.getMessage(), xe); } finally { cleanXABranchContext(); } } //第二阶段提交 public void xaCommit(String xid, long branchId, String applicationData) throws XAException { XAXid xaXid = XAXidBuilder.build(xid, branchId); xaResource.commit(xaXid, false); releaseIfNecessary(); } ... }
详细介绍后端技术栈的基础内容,包括但不限于:MySQL原理和优化、Redis原理和应用、JVM和G1原理和优化、RocketMQ原理应用及源码、Kafka原理应用及源码、ElasticSearch原理应用及源码、JUC源码、Netty源码、zk源码、Dubbo源码、Spring源码、Spring Boot源码、SCA源码、分布式锁源码、分布式事务、分库分表和TiDB、大型商品系统、大型订单系统等