分布式事务
1、什么是分布式事务,分布式事务与本地事务的区别?
本地事务,就是只对本项目模块事务进行管理的,对于其他的数据库事务不做任何的改变,在项目编写中,经常使用@Transactional(rollBackFor=Exception.class)实现当前方法如果出现任何的异常就会使用数据库的redo.log日志实现事务的回滚。
分布式事务,它是在全局的角度看待问题,在多个模块之间发挥自己的作用,如果其中的一个模块出现了问题,那么就会将涉及到的多个模块的事务都会进行回滚,他是跨数据库的,跨项目模块的。
2、分布式事务的解决方案?
分布式事务的解决方案可以使用Seata实现。
什么是Seata?
Seata是一个开源的分布式事务的解决方案,它主要提供一个高性能和简单实用的分布式事务服务。
Seata为用户提供了四种模式:AT、TCC、SAGA、XA模式。
后面。将主要使用XA模式实现分布式事务问题。
3、实战案例一(微服务系统,不使用分布式事务)
CREATE DATABASE BANK1; CREATE TABLE `account_info` ( `id` bigint(20) NOT NULL AUTO_INCREMENT, `account_name` varchar(100) COLLATE utf8_bin DEFAULT NULL COMMENT '户主姓名', `account_no` varchar(100) COLLATE utf8_bin DEFAULT NULL COMMENT '银行卡号', `account_password` varchar(100) COLLATE utf8_bin DEFAULT NULL COMMENT '帐户密码', `account_balance` double DEFAULT NULL COMMENT '帐户余额', PRIMARY KEY (`id`) USING BTREE ) ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=utf8 COLLATE=utf8_bin ROW_FORMAT=DYNAMIC;第二、创建数据库bank2
CREATE DATABASE BANK2; CREATE TABLE `account_info` ( `id` bigint(20) NOT NULL AUTO_INCREMENT, `account_name` varchar(100) COLLATE utf8_bin DEFAULT NULL COMMENT '户主姓名', `account_no` varchar(100) COLLATE utf8_bin DEFAULT NULL COMMENT '银行卡号', `account_password` varchar(100) COLLATE utf8_bin DEFAULT NULL COMMENT '帐户密码', `account_balance` double DEFAULT NULL COMMENT '帐户余额', PRIMARY KEY (`id`) USING BTREE ) ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=utf8 COLLATE=utf8_bin ROW_FORMAT=DYNAMIC;第三、分别向数据库bank1和bank2插入数据
BANK1: INSERT INTO ACCOUNT_INFO VALUES(2,"张三",1,123456,1000); BANK2: INSERT INTO ACCOUNT_INFO VALUES(3,"李四",2,123456,0);第四、搭建项目环境(BANK1)
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.hui</groupId> <artifactId>bank1</artifactId> <version>1.0-SNAPSHOT</version> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.6.3</version> </parent> <properties> <maven.compiler.source>8</maven.compiler.source> <maven.compiler.target>8</maven.compiler.target> </properties> <dependencies> <!-- web --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <!-- mysql-connector --> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>8.0.28</version> </dependency> <!-- mybatis-plus --> <dependency> <groupId>com.baomidou</groupId> <artifactId>mybatis-plus-boot-starter</artifactId> <version>3.3.0</version> </dependency> <!-- nacos --> <dependency> <groupId>com.alibaba.cloud</groupId> <artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId> <version>2021.0.1.0</version> </dependency> <!-- open-feign --> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-openfeign</artifactId> <version>3.1.0</version> </dependency> <!-- lombok --> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>1.18.22</version> </dependency> <!-- loadbanancer --> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-loadbalancer</artifactId> <version>3.1.2</version> </dependency> <!-- 逆向工程数据库 --> <dependency> <groupId>com.baomidou</groupId> <artifactId>mybatis-plus-generator</artifactId> <version>3.5.2</version> </dependency> <dependency> <groupId>org.apache.velocity</groupId> <artifactId>velocity-engine-core</artifactId> <version>2.0</version> </dependency> </dependencies> </project>Controller
/**
* @Description: 用户转账操作
* @Author: huidou 惠豆
* @CreateTime: 2022/6/14 23:19
*/
@Controller
@RequestMapping("/bank1")
public class AccountInfoController {
@Autowired
private IAccountInfoService iAccountInfoService;
/**
* 向李四赚钱
* @param accountNo
* @param amount
* @return
*/
@ResponseBody
@RequestMapping("/transfer")
public String account(String accountNo, Double amount) {
Integer accountBalance = iAccountInfoService.updateAccountBalance(accountNo, amount);
if (accountBalance >= 1) {
return "转账成功,向李四转账成功";
} else {
return "转账失败,没能向李四转账成功";
}
}
}
service
public interface IAccountInfoService extends IService<AccountInfo> {
/**
* 向李四转账
* @param accountNo 银行卡号
* @param amount 金额
*/
Integer updateAccountBalance(String accountNo, Double amount);
}
@Slf4j
@Service
public class AccountInfoServiceImpl extends ServiceImpl<AccountInfoMapper, AccountInfo> implements IAccountInfoService {
@Autowired
private AccountInfoMapper accountInfoMapper;
// 远程服务接口
@Autowired
private Bank2Client bank2Client;
/**
* 向李四转账
* 从异常捕获来分析,bank1和bank2都设置了本地事务,这是后就会出现两种情况:
* 1、bank1在调用bank2接口之后出现了异常。
* 2、bank1正常调用bank2,但是bank2出现了异常。
*
* 结果分别是:
* 第一、bank1出现了异常会根据本地事务的特点,遇见异常就进行回滚,但是bank2是正常的,就会出现bank1没有扣钱,但是bank2已经成功的加钱了。
* 第二、bank1没有出现问题,但是bank2出现了问题,由于bank1调用bank2,benk2出现了Exception就会抛给bank1,所以最后bank1和bank2都会根据本地事务进行回滚。
* @param accountNo 银行卡号
* @param amount 金额
*/
@Override
@Transactional(rollbackFor = Exception.class)
public Integer updateAccountBalance(String accountNo, Double amount) {
// 1、首先查询自己的账户信息
AccountInfo accountInfo = accountInfoMapper.selectById(2);
if (accountInfo == null) {
return 0;
}
// 2、判断自己的余额信息
if (accountInfo.getAccountBalance() < amount) {
return 0;
}
// 3、修改自己的余额信息
accountInfo.setAccountBalance(accountInfo.getAccountBalance() - amount);
int update = accountInfoMapper.updateById(accountInfo);
// 4、测试以下远程调用的异常会不会引起当前方法的异常
try {
bank2Client.transfer(accountNo,amount);
} catch (Exception e) {
System.out.println("bank2出现错误了");
System.out.println(e.getMessage());
}
return 1;
}
}
mapper
@Repository
public interface AccountInfoMapper extends BaseMapper<AccountInfo> {
}
pojo
@Data
@TableName("account_info")
public class AccountInfo implements Serializable {
private static final long serialVersionUID = 1L;
@TableId(value = "id", type = IdType.AUTO)
private Long id;
private String accountName;
private String accountNo;
private String accountPassword;
private Double accountBalance;
}
openfeign
/**
* @Description: openfeign 远程调用服务接口
* @Author: huidou 惠豆
* @CreateTime: 2022/6/14 23:49
*/
@Component
@FeignClient(value = "seata-bank2")
public interface Bank2Client {
// 调用bank2的服务接口,实现向李四转账
@RequestMapping("/bank2/transfer")
String transfer(@RequestParam("accountNo") String accountNo,@RequestParam("amount") Double amount);
}
搭建项目环境(BANK2)
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.hui</groupId> <artifactId>bank2</artifactId> <version>1.0-SNAPSHOT</version> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.6.3</version> </parent> <properties> <maven.compiler.source>8</maven.compiler.source> <maven.compiler.target>8</maven.compiler.target> </properties> <dependencies> <!-- web --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <!-- mysql-connector --> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>8.0.28</version> </dependency> <!-- mybatis-plus --> <dependency> <groupId>com.baomidou</groupId> <artifactId>mybatis-plus-boot-starter</artifactId> <version>3.3.0</version> </dependency> <!-- nacos --> <dependency> <groupId>com.alibaba.cloud</groupId> <artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId> <version>2021.0.1.0</version> </dependency> <!-- lombok --> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>1.18.22</version> </dependency> <!-- loadbanaer --> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-loadbalancer</artifactId> <version>3.1.2</version> </dependency> <!-- 逆向工程数据库 --> <dependency> <groupId>com.baomidou</groupId> <artifactId>mybatis-plus-generator</artifactId> <version>3.5.2</version> </dependency> <dependency> <groupId>org.apache.velocity</groupId> <artifactId>velocity-engine-core</artifactId> <version>2.0</version> </dependency> </dependencies> </project>controller
/**
* @Description: 用户转账操作
* @Author: huidou 惠豆
* @CreateTime: 2022/6/14 23:19
*/
@Controller
@RequestMapping("/bank2")
public class AccountInfoController {
@Autowired
private IAccountInfoService iAccountInfoService;
/**
* 接收来自张三的钱
* @param accountNo
* @param amount
* @return
*/
@ResponseBody
@RequestMapping("/transfer")
public String account(String accountNo, Double amount) {
iAccountInfoService.updateAccountBalance(accountNo, amount);
return "转账成功,接收来自张三的钱";
}
}
service
public interface IAccountInfoService extends IService<AccountInfo> {
/**
* 李四账户增加金额
* @param accountNo 银行卡号
* @param amount 金额
*/
void updateAccountBalance(String accountNo, Double amount);
}
@Slf4j
@Service
public class AccountInfoServiceImpl extends ServiceImpl<AccountInfoMapper, AccountInfo> implements IAccountInfoService {
@Autowired
private AccountInfoMapper accountInfoMapper;
/**
* 李四增加金额
* @param accountNo 银行卡号
* @param amount 金额
*/
@Override
@Transactional(rollbackFor = Exception.class)
public void updateAccountBalance(String accountNo, Double amount) {
try {
System.out.println(10/0);
// 1、根据账户ID,查询对应的账号信息(目前只有一人,就给死ID)
QueryWrapper<AccountInfo> queryWrapper = new QueryWrapper<>();
queryWrapper.eq("account_no",accountNo);
AccountInfo accountInfo = accountInfoMapper.selectOne(queryWrapper);
// 2、判断该账户是不是为Null
if (accountInfo == null) {
return;
}
// 3、给该账户添加钱(来自张三的)
accountInfo.setAccountBalance(accountInfo.getAccountBalance() + amount);
// 4、更新数据
int updateById = accountInfoMapper.updateById(accountInfo);
} catch (Exception e) {
e.printStackTrace();
throw new RuntimeException("出现了10/0的异常");
}
}
}
pojo
@Data
@TableName("account_info")
public class AccountInfo implements Serializable {
private static final long serialVersionUID = 1L;
@TableId(value = "id", type = IdType.AUTO)
private Long id;
private String accountName;
private String accountNo;
private String accountPassword;
private Double accountBalance;
}
项目的配置文件
server: port: 6001 spring: application: name: seata-bank1 cloud: nacos: discovery: server-addr: 192.168.205.150:8848 datasource: username: root password: root url: jdbc:mysql://localhost:3306/bank1?useUnicode=true&characterEncoding=utf-8&useSSL=false&serverTimezone=UTC driver-class-name: com.mysql.cj.jdbc.Driver server: port: 6002 spring: application: name: seata-bank2 cloud: nacos: discovery: server-addr: 192.168.205.150:8848 datasource: username: root password: root url: jdbc:mysql://localhost:3306/bank2?useUnicode=true&characterEncoding=utf-8&useSSL=false&serverTimezone=UTC driver-class-name: com.mysql.cj.jdbc.DriverNacos服务列表
-
项目测试
1、两个项目中都不加@Transactional(rollbackFor = Exception.class),逻辑正常调用。
2、两个项目中都加上@Transactional(rollbackFor = Exception.class),逻辑正常调用。
3、两个项目中都加上@Transactional(rollbackFor = Exception.class),bank1在调用bank2接口之后设置异常查看是否出现数据不一致(bank2逻辑正常)。
4、两个项目中都加上@Transactional(rollbackFor = Exception.class),bank1在调用bank2接口之后(bank1逻辑正常),但是bank2逻辑设置错误。
结果:
1、对于第一种情况,逻辑是正确的,数据符合一致性。
2、对于第二种情况,逻辑是正确的,数据符合一致性。
3、对于第三种情况,由于bank1在调用完bank2之后出现了异常,bank2的执行逻辑正确,再加上都有
@Transactional(rollbackFor = Exception.class)注解,所以导致bank1数据回滚,但是bank2数据还是修改了,因为这个注解是本地注解,不能管到其他的事务模块中。
4、对于第四种情况容易出现误解,由于bank1的执行逻辑是正确的,但是bank2的逻辑是错误的,是会报出异常的,加上bank1调用了bank2,这时bank1就会接收到bank2抛出的异常,由于两个接口都有@Transactional(rollbackFor = Exception.class)注解,所以最后都会进行回滚,这个时候,不清楚异常接收的原理的时候,还以为两个本地事务注解就能实现分布式事务,其实不是的,要真是,那还要分布式事务的理由在那里,第三种情况出现了又该如何处理?
4、实战案例二(微服务系统,使用分布式事务XA模式)
第一、添加Seata依赖<!-- seata --> <dependency> <groupId>com.alibaba.cloud</groupId> <artifactId>spring-cloud-starter-alibaba-seata</artifactId> <version>2021.0.1.0</version> </dependency>第二、数据库添加Undo_log
CREATE TABLE `undo_log` ( `id` bigint(20) NOT NULL AUTO_INCREMENT, `branch_id` bigint(20) NOT NULL, `xid` varchar(100) NOT NULL, `context` varchar(128) NOT NULL, `rollback_info` longblob NOT NULL, `log_status` int(11) NOT NULL, `log_created` datetime NOT NULL, `log_modified` datetime NOT NULL, `ext` varchar(100) DEFAULT NULL, PRIMARY KEY (`id`), UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`) ) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;第三、BANK1(在业务层调用BANK2接口之后设置异常)
service
@Slf4j
@Service
public class AccountInfoServiceImpl extends ServiceImpl<AccountInfoMapper, AccountInfo> implements IAccountInfoService {
@Autowired
private AccountInfoMapper accountInfoMapper;
// 远程服务接口
@Autowired
private Bank2Client bank2Client;
/**
* 向李四转账
* 从异常捕获来分析,bank1和bank2都设置了本地事务,这是后就会出现两种情况:
* 1、bank1在调用bank2接口之后出现了异常。
* 2、bank1正常调用bank2,但是bank2出现了异常。
*
* 结果分别是:
* 第一、bank1出现了异常会根据本地事务的特点,遇见异常就进行回滚,但是bank2是正常的,就会出现bank1没有扣钱,但是bank2已经成功的加钱了。
* 第二、bank1没有出现问题,但是bank2出现了问题,由于bank1调用bank2,benk2出现了Exception就会抛给bank1,所以最后bank1和bank2都会根据本地事务进行回滚。
* @param accountNo 银行卡号
* @param amount 金额
*/
@GlobalTransactional // 这个注解只需要在其中的一个模块方法上添加即可,因为是全局,所以一个就够了
@Override
@Transactional(rollbackFor = Exception.class)
public Integer updateAccountBalance(String accountNo, Double amount) {
// 1、首先查询自己的账户信息
AccountInfo accountInfo = accountInfoMapper.selectById(2);
if (accountInfo == null) {
return 0;
}
// 2、判断自己的余额信息
if (accountInfo.getAccountBalance() < amount) {
return 0;
}
// 3、修改自己的余额信息
accountInfo.setAccountBalance(accountInfo.getAccountBalance() - amount);
int update = accountInfoMapper.updateById(accountInfo);
// 4、测试以下远程调用的异常会不会引起当前方法的异常
bank2Client.transfer(accountNo,amount);
/**
* 在这里设置错误,检测是否出现不一致性
* 错误情况就是:bank1数据回滚,不改变,但是bank2数据改变
* 正常情况就是:出现异常,大家都不改变。
*/
System.out.println(10/0);
return 1;
}
}
第四、BANK2(业务层逻辑正确)
@Slf4j
@Service
public class AccountInfoServiceImpl extends ServiceImpl<AccountInfoMapper, AccountInfo> implements IAccountInfoService {
@Autowired
private AccountInfoMapper accountInfoMapper;
/**
* 李四增加金额
* @param accountNo 银行卡号
* @param amount 金额
*/
@Override
@Transactional(rollbackFor = Exception.class)
public void updateAccountBalance(String accountNo, Double amount) {
// 1、根据账户ID,查询对应的账号信息(目前只有一人,就给死ID)
QueryWrapper<AccountInfo> queryWrapper = new QueryWrapper<>();
queryWrapper.eq("account_no",accountNo);
AccountInfo accountInfo = accountInfoMapper.selectOne(queryWrapper);
// 2、判断该账户是不是为Null
if (accountInfo == null) {
return;
}
// 3、给该账户添加钱(来自张三的)
accountInfo.setAccountBalance(accountInfo.getAccountBalance() + amount);
// 4、更新数据
int updateById = accountInfoMapper.updateById(accountInfo);
}
}
结果:
数据库初始数据图:
数据库执行之后的结果还是一样的,所以说明分布式事务Seata生效了。
5、使用分布式事务解决的原理
分布式事务的核心思想其实就是一个投票的过程,对于一个决定,如果没有一个人有问题,那么就会执行,反之,只要有一个人有问题,就会进行回退取消。
从上面的图可以看出,当用户服务的事务管理器TM会向全局事务控制申请开启分布式事务,这时会产生一个全局唯一的XID,然后,用户服务会向全局事务进行注册自己的本地事务,注册完之后,会进行执行自己的逻辑,之后在条用积分服务,积分服务也是一样的会进行注册本地事务,由于两个服务都有自己的本地服务,倘若都成功了,就都会向TC提交一个投票也就是Commit,这时TC就会向两个本地事务发送Commit的命令;但是若有一个出现了问题那么@Transactional(rollbackFor = Exception.class)注解就会进行回滚,由于TC管辖了该本地事务,就会知道该事务想自己提交了rollback的请求,这时TC就会向所有的本地事务发送rollback的命令。
6、基于Atomikos实现XA强一致性分布式事务实战
是哦也能够Atomikos实现XA模式的强一致性,模拟实现跨库的转账实现,在一个项目模块中,使用多数据源(两个数据库)实现不同数据库的数据改变。
原理图如下:
使用Atomikos框架技术的好处在于:项目不需要直接与数据库进行交互进行数据的改变,而是让Atomikos技术对数据库进行封装,由于这个Atomikos技术内部实现了分布式事务的XA模式,所以对于开发人员来说,我不需要像之前那样不但要添加依赖,添加注解,还要与本地事务做出关联,现在就不需要了,只需要简简单单的添加依赖与注解即可,注解也是常用的@Transactional即可,只需要关注业务逻辑实现。
开始一个项目实战案例:
项目需求:在一个项目中,配置多个数据源,用于实现转账功能,一个数据库扣减金额;一个数据库添加金额
第一、创建数据库CREATE DATABASE TX-XA-01 CREATE TABLE `user_account` ( `account_no` varchar(64) COLLATE utf8_bin NOT NULL COMMENT '账户编号', `account_name` varchar(64) COLLATE utf8_bin DEFAULT NULL COMMENT '账户\r\n名称', `account_balance` decimal(10,2) DEFAULT NULL COMMENT '账户余额', PRIMARY KEY (`account_no`) USING BTREE ) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin ROW_FORMAT=DYNAMIC;
CREATE DATABASE TX-XA-02 CREATE TABLE `user_account` ( `account_no` varchar(64) COLLATE utf8_bin NOT NULL COMMENT '账户编号', `account_name` varchar(64) COLLATE utf8_bin DEFAULT NULL COMMENT '账户\r\n名称', `account_balance` decimal(10,2) DEFAULT NULL COMMENT '账户余额', PRIMARY KEY (`account_no`) USING BTREE ) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin ROW_FORMAT=DYNAMIC;第二、向数据库插入数据
CREATE DATABASE TX-XA-01
INSERT INTO USER_ACCOUNT VALUES("1001","张三",10000);
CREATE DATABASE TX-XA-02
INSERT INTO USER_ACCOUNT VALUES("1002","李四",10000);
第三、创建项目,配置项目环境
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.hui</groupId> <artifactId>atomikos-xa</artifactId> <version>1.0-SNAPSHOT</version> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.6.3</version> </parent> <dependencies> <!-- web --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <!-- durid数据源 --> <dependency> <groupId>com.alibaba</groupId> <artifactId>druid-spring-boot-starter</artifactId> <version>1.1.22</version> </dependency> <!-- mysql-connector --> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> </dependency> <!-- mybatis-plus --> <dependency> <groupId>com.baomidou</groupId> <artifactId>mybatis-plus-boot-starter</artifactId> <version>3.3.0</version> </dependency> <!-- atomikos --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-jta-atomikos</artifactId> </dependency> <!-- lombok --> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> </dependency> <!-- 逆向工程数据库 --> <dependency> <groupId>com.baomidou</groupId> <artifactId>mybatis-plus-generator</artifactId> <version>3.5.2</version> </dependency> <dependency> <groupId>org.apache.velocity</groupId> <artifactId>velocity-engine-core</artifactId> <version>2.0</version> </dependency> </dependencies> </project>第四、多数据源配置
/**
* @Description: 第一个数据库信息的绑定
* @Author: huidou 惠豆
* @CreateTime: 2022/6/15 17:09
*/
@Data
@ConfigurationProperties(prefix = "spring.datasource.dynamic.datasource.master")
public class DBConfig1 {
private String url;
private String username;
private String password;
private String driverClassName;
}
DBConfig2
/**
* @Description: 第二个数据库信息的绑定
* @Author: huidou 惠豆
* @CreateTime: 2022/6/15 17:09
*/
@Data
@ConfigurationProperties(prefix = "spring.datasource.dynamic.datasource.slave")
public class DBConfig2 {
private String url;
private String username;
private String password;
private String driverClassName;
}
MybatisConfig1、MybatisConfig2
/**
* @Description: 配置第一个数据源,将其与Atomikos进行整合
* @Author: huidou 惠豆
* @CreateTime: 2022/6/15 17:09
*/
@Configuration
@MapperScan(basePackages = "com.hui.mapper1",sqlSessionTemplateRef = "masterSqlSessionTemplate")
public class MybatisConfig1 {
/**
* 创建一个DataSource数据源
* @param dbConfig1
* @return
*/
@Bean(name = "masterDataSource")
public DataSource getDataSource(DBConfig1 dbConfig1) {
// 1. 将本地事务注册到 Atomikos全局事务中
AtomikosDataSourceBean sourceBean = new AtomikosDataSourceBean();
// 2. 设置rm供应商
sourceBean.setUniqueResourceName("masterDataSource");
sourceBean.setXaDataSourceClassName(dbConfig1.getDriverClassName());
// 3. 设置testquery
sourceBean.setTestQuery("select 1");
// 4. 设置超时时间
sourceBean.setBorrowConnectionTimeout(3);
// 设置Mysql链接
MysqlXADataSource dataSource = new MysqlXADataSource();
dataSource.setUrl(dbConfig1.getUrl());
dataSource.setUser(dbConfig1.getUsername());
dataSource.setPassword(dbConfig1.getPassword());
sourceBean.setXaDataSource(dataSource);
return sourceBean;
}
/**
* SqlSessionFactory是mybaits 重要的对象
* @return
*/
@Bean(name = "masterSqlSessionFactory")
public SqlSessionFactory getSqlSessionFactory(@Qualifier("masterDataSource") DataSource dataSource) throws Exception {
MybatisSqlSessionFactoryBean sessionFactoryBean = new MybatisSqlSessionFactoryBean();
sessionFactoryBean.setDataSource(dataSource);
return sessionFactoryBean.getObject();
}
/**
* 负责管理mybatis 的sqlsession sql
* SqlSessionTemplate 替换默认的mybaits 实现的defalutsqlsession不能参与spring事务不能注入 线程不安全
* @return
*/
@Bean(name = "masterSqlSessionTemplate")
public SqlSessionTemplate getSqlSessionTemplate(@Qualifier("masterSqlSessionFactory") SqlSessionFactory sqlSessionFactory){
return new SqlSessionTemplate(sqlSessionFactory);
}
}
/**
* @Description: 配置第二个数据源,将其与Atomikos进行整合
* @Author: huidou 惠豆
* @CreateTime: 2022/6/15 17:09
*/
@Configuration
@MapperScan(basePackages = "com.hui.mapper2",sqlSessionTemplateRef = "slaveSqlSessionTemplate")
public class MybatisConfig2 {
/**
* 创建一个DataSource数据源
* @param dbConfig2
* @return
*/
@Bean(name = "slaveDataSource")
public DataSource getDataSource(DBConfig2 dbConfig2) {
// 1. 将本地事务注册到 Atomikos全局事务中
AtomikosDataSourceBean sourceBean = new AtomikosDataSourceBean();
// 2. 设置rm供应商
sourceBean.setUniqueResourceName("slaveDataSource");
sourceBean.setXaDataSourceClassName(dbConfig2.getDriverClassName());
// 3. 设置testquery
sourceBean.setTestQuery("select 1");
// 4. 设置超时时间
sourceBean.setBorrowConnectionTimeout(3);
// 设置Mysql链接
MysqlXADataSource dataSource = new MysqlXADataSource();
dataSource.setUrl(dbConfig2.getUrl());
dataSource.setUser(dbConfig2.getUsername());
dataSource.setPassword(dbConfig2.getPassword());
sourceBean.setXaDataSource(dataSource);
return sourceBean;
}
/**
* SqlSessionFactory是mybaits 重要的对象
* @return
*/
@Bean(name = "slaveSqlSessionFactory")
public SqlSessionFactory getSqlSessionFactory(@Qualifier("slaveDataSource") DataSource dataSource) throws Exception {
MybatisSqlSessionFactoryBean sessionFactoryBean = new MybatisSqlSessionFactoryBean();
sessionFactoryBean.setDataSource(dataSource);
return sessionFactoryBean.getObject();
}
/**
* 负责管理mybatis 的sqlsession sql
* SqlSessionTemplate 替换默认的mybaits 实现的defalutsqlsession不能参与spring事务不能注入 线程不安全
* @return
*/
@Bean(name = "slaveSqlSessionTemplate")
public SqlSessionTemplate getSqlSessionTemplate(@Qualifier("slaveSqlSessionFactory") SqlSessionFactory sqlSessionFactory){
return new SqlSessionTemplate(sqlSessionFactory);
}
}
逻辑层实现
/**
* 实现转账操作
*/
@RestController
@RequestMapping("/userAccount")
public class UserAccountController {
@Autowired
private IUserAccountService iUserAccountService;
/**
* 实现转账操作
* @param sourceAccountNo 源账户
* @param targetAccountNo 目标账户
* @param amount 转账金额
* @return
*/
@RequestMapping("/transfer")
public String transfer(String sourceAccountNo, String targetAccountNo, BigDecimal amount) {
Integer transfer = iUserAccountService.transfer(sourceAccountNo, targetAccountNo, amount);
if (transfer == 1) {
return "转账成功!";
}
return "转账失败!";
}
}
service
@Service
public class UserAccountServiceImpl implements IUserAccountService {
// 原账户所在数据库
@Autowired
private UserAccountMapper1 userAccountMapper1;
// 目标账户所在数据库
@Autowired
private UserAccountMapper2 userAccountMapper2;
/**
* 实现转账操作
* @param sourceAccountNo 源账户
* @param targetAccountNo 目标账户
* @param amount 转账金额
* @return
*/
@Override
public Integer transfer(String sourceAccountNo, String targetAccountNo, BigDecimal amount) {
// 1、查询原账户
UserAccount sourceAccount = userAccountMapper1.selectById(sourceAccountNo);
// 2、查询目标账户
UserAccount targetAccount = userAccountMapper2.selectById(targetAccountNo);
// 3、判断原账户与目标账户是否存在
if (sourceAccount != null && targetAccount != null) {
// 4、判断原账户余额是否不足
if (sourceAccount.getAccountBalance().compareTo(amount) < 0) {
throw new RuntimeException("原账户金额不足,无法实现转账操作");
}
// 5、原账户扣减余额,并更新
sourceAccount.setAccountBalance(sourceAccount.getAccountBalance().subtract(amount));
int updateSourceAccount = userAccountMapper1.updateById(sourceAccount);
// 6、目标账户增加余额,并更新
targetAccount.setAccountBalance(targetAccount.getAccountBalance().add(amount));
int updateTargetAccount = userAccountMapper2.updateById(targetAccount);
if (updateSourceAccount == 1 && updateTargetAccount == 1) {
return 1;
}
}
return 0;
}
}
mapper
@Repository
public interface UserAccountMapper1 extends BaseMapper<UserAccount> {}
@Repository
public interface UserAccountMapper2 extends BaseMapper<UserAccount> {}
application.yml
server:
port: 6003
spring:
autoconfigure:
#停用druid连接池的自动配置
exclude: com.alibaba.druid.spring.boot.autoconfigure.DruidDataSourceAutoConfigure
datasource:
#选用druid的XADataSource数据源,因为这个数据源支持分布式事务管理
type: com.alibaba.druid.pool.xa.DruidXADataSource
#以下是自定义字段
dynamic:
primary: master
datasource:
master:
url: jdbc:mysql://localhost:3306/tx-xa-01?useUnicode=true&characterEncoding=utf-8&allowMultiQueries=true&useSSL=false&zeroDateTimeBehavior=convertToNull&serverTimezone=Asia/Shanghai&autoReconnect=true
username: root
password: root
driver-class-name: com.mysql.cj.jdbc.Driver
slave:
url: jdbc:mysql://localhost:3306/tx-xa-02?useUnicode=true&characterEncoding=utf-8&allowMultiQueries=true&useSSL=false&zeroDateTimeBehavior=convertToNull&serverTimezone=Asia/Shanghai&autoReconnect=true
username: root
password: root
driver-class-name: com.mysql.cj.jdbc.Driver
validation-query: SELCET 1
logging:
pattern:
console: logging.pattern.console=%d{MM/dd HH:mm:ss.SSS} %clr(%-5level) --- [%-15thread] %cyan(%-50logger{50}):%msg%n
业务测试
发送请求:
http://localhost:6003/userAccount/transfer?sourceAccountNo=1001&targetAccountNo=1002&amount=1000
正常情况下,可以实现跨数据库对数据进行修改,但是如果我恶意做一个操作,代码如下:
/**
* 实现转账操作
* @param sourceAccountNo 源账户
* @param targetAccountNo 目标账户
* @param amount 转账金额
* @return
*/
@Override
public Integer transfer(String sourceAccountNo, String targetAccountNo, BigDecimal amount) {
// 1、查询原账户
UserAccount sourceAccount = userAccountMapper1.selectById(sourceAccountNo);
// 2、查询目标账户
UserAccount targetAccount = userAccountMapper2.selectById(targetAccountNo);
// 3、判断原账户与目标账户是否存在
if (sourceAccount != null && targetAccount != null) {
// 4、判断原账户余额是否不足
if (sourceAccount.getAccountBalance().compareTo(amount) < 0) {
throw new RuntimeException("原账户金额不足,无法实现转账操作");
}
// 5、原账户扣减余额,并更新
sourceAccount.setAccountBalance(sourceAccount.getAccountBalance().subtract(amount));
int updateSourceAccount = userAccountMapper1.updateById(sourceAccount);
System.out.println(10/0);
// 6、目标账户增加余额,并更新
targetAccount.setAccountBalance(targetAccount.getAccountBalance().add(amount));
int updateTargetAccount = userAccountMapper2.updateById(targetAccount);
if (updateSourceAccount == 1 && updateTargetAccount == 1) {
return 1;
}
}
return 0;
}
从代码上可以看出来,我们在对数据库1做完操作之后进行的一个异常,这时就会造成报错,第一个数据库扣减成功,第二个数据库因异常无法执行,则不能添加,形成数据不一致。
如何解决?
使用Atomikos实现分布式事务数据XA模式的强一致性。
代码上如何修改:很简单,由于我们之前把多数据源已经都与Atomikos框架整合在一起了,所以这个时候,只需要在方法上添加一个注解@Transactional,该注解是import org.springframework.transaction.annotation.Transactional;下的,即可,该注解就包含了Atomikos对事务的处理,然后在主启动类上添加一个注解@EnableTransactionManager,即可实现。
结果:
发送请求:
http://localhost:6003/userAccount/transfer?sourceAccountNo=1001&targetAccountNo=1002&amount=1000
实现数据一致性。

查看14道真题和解析