Seata源码—5.全局事务的创建与返回处理
大纲
1.Seata开启分布式事务的流程总结
2.Seata生成全局事务ID的雪花算法源码
3.生成xid以及对全局事务会话进行持久化的源码
4.全局事务会话数据持久化的实现源码
5.Seata Server创建全局事务与返回xid的源码
6.Client获取Server的响应与处理的源码
7.Seata与Dubbo整合的过滤器源码
1.Seata开启分布式事务的流程总结
(1)Seata分布式事务执行流程
(2)开启一个全局事务的流程
(1)Seata分布式事务执行流程
Seata Client在执行添加了全局事务注解@GlobalTransactional的方法时,实际执行的是根据全局事务拦截器创建该方法所在Bean的动态代理方法,于是会执行GlobalTransactionalInterceptor的invoke()方法。此时,添加了全局事务注解@GlobalTransactional的方法就会被全局事务拦截器拦截了。
GlobalTransactionalInterceptor全局事务拦截器拦截目标方法的调用后,会由事务执行模版TransactionalTemplate的excute()方法来执行目标方法。
在事务执行模版TransactionalTemplate的excute()方法中,首先会判断Propagation全局事务传播级别,然后开启一个全局事务(也就是打开一个全局事务),接着才执行具体的业务目标方法。
执行具体的业务目标方法时,会通过Dubbo的RPC调用来传递全局事务的xid给其他的Seata Client。其他的Seata Client通过Dubbo过滤器获取到RPC调用中的xid后,会将xid放入线程本地变量副本中。之后执行SQL时就会获取数据库连接代理来对SQL进行拦截,数据库连接代理就可以从线程本地变量副本中获取xid,然后开启分支事务。
各个分支事务都执行完毕后,开启全局事务的Seata Client就会提交事务、处理全局锁、资源清理。
(2)开启一个全局事务的流程
Seata Server收到Seata Client发送过来的RpcMessage对象消息后,RpcMessage对象消息首先会由ServerOnRequestProcessor的process()方法处理,然后会由DefaultCoordinator的onRequest()方法进行处理,接着会由GlobalBeginRequest的handle()方法进行处理,然后会由DefaultCoordinator的doGlobalBegin()方法来处理,最后给到DefaultCore的begin()方法来进行处理。
在DefaultCore的begin()方法中,首先就会创建一个全局事务会话,然后将全局事务会话的xid通过MDC放入线程本地变量副本中,接着对该全局事务会话添加一个全局事务会话的生命周期监听器,最后打开该全局事务会话、发布会话开启事件并返回全局事务会话的xid。
在创建一个全局事务会话GlobalSession时,首先会由uuid生成组件UUIDGenerator来生成全局事务id(transactionId),然后根据生成的全局事务id(transactionId)来继续生成xid。
2.Seata生成全局事务ID的雪花算法源码
(1)通过UUIDGenerator生成全局事务ID
(2)IdWorker实现的雪花算法生成的ID的组成
(3)IdWorker实现的雪花算法对时钟回拨的处理
(1)通过UUIDGenerator生成全局事务ID
Seata在创建全局事务会话时会通过UUIDGenerator来生成全局事务ID,UUIDGenerator在生成ID时是通过Seata自己实现的雪花算法来生成的。
public class GlobalSession implements SessionLifecycle, SessionStorable { ... //创建全局事务会话 public static GlobalSession createGlobalSession(String applicationId, String txServiceGroup, String txName, int timeout) { GlobalSession session = new GlobalSession(applicationId, txServiceGroup, txName, timeout, false); return session; } public GlobalSession(String applicationId, String transactionServiceGroup, String transactionName, int timeout, boolean lazyLoadBranch) { //全局事务id是通过UUIDGenerator来生成的 this.transactionId = UUIDGenerator.generateUUID(); this.status = GlobalStatus.Begin; this.lazyLoadBranch = lazyLoadBranch; if (!lazyLoadBranch) { this.branchSessions = new ArrayList<>(); } this.applicationId = applicationId; this.transactionServiceGroup = transactionServiceGroup; this.transactionName = transactionName; this.timeout = timeout; //根据UUIDGenerator生成的transactionId + XID工具生成最终的xid this.xid = XID.generateXID(transactionId); } ... } public class UUIDGenerator { private static volatile IdWorker idWorker; //generate UUID using snowflake algorithm public static long generateUUID() { //Double Check + volatile,实现并发场景下只创建一次idWorker对象 if (idWorker == null) { synchronized (UUIDGenerator.class) { if (idWorker == null) { init(null); } } } //正常情况下,每次都会通过idWorker生成一个id return idWorker.nextId(); } //init IdWorker public static void init(Long serverNode) { idWorker = new IdWorker(serverNode); } }
(2)IdWorker实现的雪花算法生成的ID的组成
IdWorker就是Seata自己实现的基于雪花算法的ID生成器。IdWorker的nextId()方法通过雪花算法生成的transactionId一共是64位,用64个bit拼接出一个唯一的ID。
一.最高位始终是0,占1个bit
二.接着的10个bit是workerId
一台机器就是一个worker,每个worker都会有一个自己的workerId。生成workerId时,是基于本机网络地址里的Mac地址来生成的。
三.接着的41个bit是时间戳
表示可以为某台机器的每一毫秒,分配一个自增长的ID。毫秒时间戳有13位数,转换为2进制需要2的41次方。
四.最后的12个bit是序列号
如果一台机器在一毫秒内需要为很多线程生成ID,就可以通过自增长的12个bit的Sequence为每个线程分配ID。
(3)IdWorker实现的雪花算法对时钟回拨的处理
在执行IdWorker的nextId()方法时,会对包含序列号和时间戳的timestampAndSequence进行累加,也就是对timestampAndSequence的某一个毫秒内的Sequence序列号进行累加。
如果出现大量的线程并发获取ID,此时可能会导致timestampAndSequence中某一个毫秒内的Sequence序列号快速累加,并且将代表Sequence序列号的12个bit全部累加完毕,最后便会导致包含序列号和时间戳的timestampAndSequence中的毫秒时间戳也进行累加。
但当前的实际时间其实还是这一毫秒,而timestampAndSequence里的毫秒时间戳已经累加到下一个毫秒去了,出现时钟回拨问题,于是就需要调用waitIfNecessary()方法进行处理。
所以,在IdWorker的waitIfNecessary()方法中,如果获取ID的QPS过高,导致当前时间戳对应的Sequence序列号被耗尽,那么就需要阻塞当前线程5毫秒。
//IdWorker就是Seata自己实现的基于雪花算法的ID生成器 public class IdWorker { private final long twepoch = 1588435200000L;//Start time cut (2020-05-03) private final int workerIdBits = 10;//The number of bits occupied by workerId private final int timestampBits = 41;//The number of bits occupied by timestamp private final int sequenceBits = 12;//The number of bits occupied by sequence private final int maxWorkerId = ~(-1 << workerIdBits);//Maximum supported machine id, the result is 1023 //business meaning: machine ID (0 ~ 1023) //actual layout in memory: //highest 1 bit: 0 //middle 10 bit: workerId //lowest 53 bit: all 0 private long workerId; //timestampAndSequence是64位的、支持CAS操作的Long型的、包含了Sequence序列号的时间戳 //它的最高位是11个bit,没有使用 //中间有41个bit,是时间戳 //最低位有12个bit,是序列号 //timestampAndSequence可以认为是把时间戳和序列号混合在了一个long型数字里 //timestamp and sequence mix in one Long //highest 11 bit: not used //middle 41 bit: timestamp //lowest 12 bit: sequence private AtomicLong timestampAndSequence; //mask that help to extract timestamp and sequence from a long //可以帮忙从一个long数字里提取出一个包含Sequence序列号的时间戳 private final long timestampAndSequenceMask = ~(-1L << (timestampBits + sequenceBits)); //instantiate an IdWorker using given workerId public IdWorker(Long workerId) { //初始化timestampAndSequence initTimestampAndSequence(); //初始化workerId initWorkerId(workerId); } //init first timestamp and sequence immediately private void initTimestampAndSequence() { //获取相对于twepoch的最新时间戳 long timestamp = getNewestTimestamp(); //将最新时间戳和sequenceBits进行位运算(左移),从而得到一个混合了sequence的时间戳 long timestampWithSequence = timestamp << sequenceBits; //把混合了sequence的时间戳,赋值给timestampAndSequence this.timestampAndSequence = new AtomicLong(timestampWithSequence); } //init workerId private void initWorkerId(Long workerId) { if (workerId == null) { workerId = generateWorkerId(); } if (workerId > maxWorkerId || workerId < 0) { String message = String.format("worker Id can't be greater than %d or less than 0", maxWorkerId); throw new IllegalArgumentException(message); } //将workerId与timestampBits+sequenceBits的和进行位运算(左移),获取一个workerId this.workerId = workerId << (timestampBits + sequenceBits); } //通过snowflake雪花算法来生成transactionId //一共是64位,用64个bit拼接出一个唯一的ID,最高位始终是0,占1个bit //接着的10个bit是workerId,一台机器就是一个worker,每个worker都会有一个自己的workerId //接着的41个bit是时间戳,表示可以为某台机器的每一毫秒,分配一个自增长的id,毫秒时间戳有13位数,转换为2进制就需要2的41次方,2的20次方是一个7位数的数字 //最后的12个bit是序列号,如果一台机器在一毫秒内需要为很多线程生成id,就可以通过自增长的12个bit的Sequence为每个线程分配id //get next UUID(base on snowflake algorithm), which look like: //highest 1 bit: always 0 //next 10 bit: workerId //next 41 bit: timestamp //lowest 12 bit: sequence public long nextId() { waitIfNecessary(); //对包含Sequence序列号的时间戳timestampAndSequence进行累加,也就是对timestampAndSequence的某一个毫秒内的Sequence进行累加 //如果出现大量的线程并发获取id,此时可能会导致timestampAndSequence的某一个毫秒内的Sequence快速累加,并且将12个bit全部累加完毕 //最终导致timestampAndSequence的毫秒时间戳也进行累加了 //但当前的实际时间其实还是这一毫秒,而timestampAndSequence里的毫秒时间戳已经累加到下一个毫秒去了,于是就需要waitIfNecessary()进行处理 long next = timestampAndSequence.incrementAndGet(); //把最新的包含Sequence序列号的时间戳next与timestampAndSequenceMask进行位运算,获取真正的包含Sequence序列号的时间戳timestampWithSequence long timestampWithSequence = next & timestampAndSequenceMask; //对包含Sequence序列号的时间戳与workerId通过位运算拼接在一起 return workerId | timestampWithSequence; } //block current thread if the QPS of acquiring UUID is too high that current sequence space is exhausted //如果获取UUID的QPS过高,导致当前时间戳对应的Sequence序列号被耗尽了,那么就需要阻塞当前线程5毫秒 private void waitIfNecessary() { //先获取包含Sequence序列号的当前时间戳 long currentWithSequence = timestampAndSequence.get(); //将currentWithSequence与sequenceBits进行位运算(右移),获取到当前时间戳 long current = currentWithSequence >>> sequenceBits; //获取相对于twepoch的最新时间戳 long newest = getNewestTimestamp(); //如果当前的时间戳大于最新的时间戳,说明获取UUID的QPS过高,导致timestampAndSequence增长太快了(出现时钟回拨问题) if (current >= newest) { try { //如果获取UUID的QPS过高,导致当前时间戳对应的Sequence序列号被耗尽了,那么就需要阻塞当前线程5毫秒 Thread.sleep(5); } catch (InterruptedException ignore) { //don't care } } } //get newest timestamp relative to twepoch private long getNewestTimestamp() { //通过当前毫秒单位的时间戳 减去 一个固定的时间twepoch,得到的就是相对于twepoch的最新时间戳 return System.currentTimeMillis() - twepoch; } //auto generate workerId, try using mac first, if failed, then randomly generate one private long generateWorkerId() { try { //生成一个workerId,默认是基于网络的Mac地址来生成的 return generateWorkerIdBaseOnMac(); } catch (Exception e) { return generateRandomWorkerId(); } } //use lowest 10 bit of available MAC as workerId private long generateWorkerIdBaseOnMac() throws Exception { //获取所有的网络接口 Enumeration<NetworkInterface> all = NetworkInterface.getNetworkInterfaces(); //遍历每一个网络接口 while (all.hasMoreElements()) { NetworkInterface networkInterface = all.nextElement(); boolean isLoopback = networkInterface.isLoopback(); boolean isVirtual = networkInterface.isVirtual(); //如果是虚拟的、回环的地址,那么这个地址就跳过,不能使用 if (isLoopback || isVirtual) { continue; } //获取本机网络地址里的Mac地址,基于Mac地址来生成一个workerid byte[] mac = networkInterface.getHardwareAddress(); return ((mac[4] & 0B11) << 8) | (mac[5] & 0xFF); } throw new RuntimeException("no available mac found"); } //randomly generate one as workerId private long generateRandomWorkerId() { return new Random().nextInt(maxWorkerId + 1); } }
3.生成xid以及对全局事务会话进行持久化的源码
(1)根据全局事务ID生成xid
(2)全局事务会话的持久化
(1)根据全局事务ID生成xid
xid是通过ip:port:transactionId拼接出来的。
public class XID { private static int port; private static String ipAddress; ... //Generate xid string. public static String generateXID(long tranId) { //首先获取当前机器的IP地址 //然后拼接上一个冒号、接着拼接一个端口号、再拼接一个冒号 //最后再拼接事务id,以此来生成xid //所以xid是通过ip:port:transactionId拼接出来的 return new StringBuilder().append(ipAddress).append(IP_PORT_SPLIT_CHAR).append(port).append(IP_PORT_SPLIT_CHAR).append(tranId).toString(); } ... }
(2)全局事务会话的持久化
public class DefaultCore implements Core { ... @Override public String begin(String applicationId, String transactionServiceGroup, String name, int timeout) throws TransactionException { //创建一个全局事务会话 GlobalSession session = GlobalSession.createGlobalSession(applicationId, transactionServiceGroup, name, timeout); //通过slf4j的MDC把xid放入线程本地变量副本里去 MDC.put(RootContext.MDC_KEY_XID, session.getXid()); //添加一个全局事务会话的生命周期监听器 session.addSessionLifecycleListener(SessionHolder.getRootSessionManager()); //打开Session,其中会对全局事务会话进行持久化 session.begin(); //transaction start event,发布会话开启事件 MetricsPublisher.postSessionDoingEvent(session, false); //返回全局事务会话的xid return session.getXid(); } ... } public class GlobalSession implements SessionLifecycle, SessionStorable { ... @Override public void begin() throws TransactionException { this.status = GlobalStatus.Begin; this.beginTime = System.currentTimeMillis(); this.active = true; for (SessionLifecycleListener lifecycleListener : lifecycleListeners) { lifecycleListener.onBegin(this); } } ... } public abstract class AbstractSessionManager implements SessionManager, SessionLifecycleListener { ... @Override public void onBegin(GlobalSession globalSession) throws TransactionException { addGlobalSession(globalSession); } @Override public void addGlobalSession(GlobalSession session) throws TransactionException { if (LOGGER.isDebugEnabled()) { LOGGER.debug("MANAGER[{}] SESSION[{}] {}", name, session, LogOperation.GLOBAL_ADD); } writeSession(LogOperation.GLOBAL_ADD, session); } private void writeSession(LogOperation logOperation, SessionStorable sessionStorable) throws TransactionException { //transactionStoreManager.writeSession()会对全局事务会话进行持久化 if (!transactionStoreManager.writeSession(logOperation, sessionStorable)) { if (LogOperation.GLOBAL_ADD.equals(logOperation)) { throw new GlobalTransactionException(TransactionExceptionCode.FailedWriteSession, "Fail to store global session"); } else if (LogOperation.GLOBAL_UPDATE.equals(logOperation)) { throw new GlobalTransactionException(TransactionExceptionCode.FailedWriteSession, "Fail to update global session"); } else if (LogOperation.GLOBAL_REMOVE.equals(logOperation)) { throw new GlobalTransactionException(TransactionExceptionCode.FailedWriteSession, "Fail to remove global session"); } else if (LogOperation.BRANCH_ADD.equals(logOperation)) { throw new BranchTransactionException(TransactionExceptionCode.FailedWriteSession, "Fail to store branch session"); } else if (LogOperation.BRANCH_UPDATE.equals(logOperation)) { throw new BranchTransactionException(TransactionExceptionCode.FailedWriteSession, "Fail to update branch session"); } else if (LogOperation.BRANCH_REMOVE.equals(logOperation)) { throw new BranchTransactionException(TransactionExceptionCode.FailedWriteSession, "Fail to remove branch session"); } else { throw new BranchTransactionException(TransactionExceptionCode.FailedWriteSession, "Unknown LogOperation:" + logOperation.name()); } } } ... }
4.全局事务会话数据持久化的实现源码
(1)全局事务会话数据的持久化流程
(2)将全局事务会话持久化到MySQL数据库的实现
(3)将全局事务会话持久化到File文件的实现
(4)将全局事务会话持久化到Redis存储的实现
(1)全局事务会话数据的持久化流程
创建全局事务会话时,会通过雪花算法生成全局事务ID即transactionId,然后通过transactionId按照"ip:port:transactionId"格式生成xid。
创建完全局事务会话之后,就会添加一个全局事务会话的生命周期监听器,然后就会调用GlobalSession的begin()方法开启会话。
在GlobalSession的begin()方法中,会调用全局事务会话生命周期监听器的onBegin()方法,也就是调用SessionLifecycleListener的onBegin()方法。
接着就会由AbstractSessionManager对全局事务会话进行管理,将GlobalSession添加到SessionManager会话管理器中,也就是调用transactionStoreManager的writeSession()方法,对全局事务会话进行持久化。
默认情况下,会通过数据库进行持久化,也就是调用DataBaseTransactionStoreManager数据库事务存储管理器的writeSession()方法,将全局事务会话存储到数据库中。
当然Seata提供了三种方式来对全局事务会话进行持久化,分别是数据库存储、文件存储和Redis存储。
(2)将全局事务会话持久化到MySQL数据库的实现
//The type Database transaction store manager. public class DataBaseTransactionStoreManager extends AbstractTransactionStoreManager implements TransactionStoreManager { private static volatile DataBaseTransactionStoreManager instance; protected LogStore logStore; ... //Get the instance. public static DataBaseTransactionStoreManager getInstance() { if (instance == null) { synchronized (DataBaseTransactionStoreManager.class) { if (instance == null) { instance = new DataBaseTransactionStoreManager(); } } } return instance; } //Instantiates a new Database transaction store manager. private DataBaseTransactionStoreManager() { logQueryLimit = CONFIG.getInt(ConfigurationKeys.STORE_DB_LOG_QUERY_LIMIT, DEFAULT_LOG_QUERY_LIMIT); String datasourceType = CONFIG.getConfig(ConfigurationKeys.STORE_DB_DATASOURCE_TYPE); //init dataSource,通过SPI机制加载DataSourceProvider DataSource logStoreDataSource = EnhancedServiceLoader.load(DataSourceProvider.class, datasourceType).provide(); logStore = new LogStoreDataBaseDAO(logStoreDataSource); } @Override public boolean writeSession(LogOperation logOperation, SessionStorable session) { if (LogOperation.GLOBAL_ADD.equals(logOperation)) { return logStore.insertGlobalTransactionDO(SessionConverter.convertGlobalTransactionDO(session)); } else if (LogOperation.GLOBAL_UPDATE.equals(logOperation)) { return logStore.updateGlobalTransactionDO(SessionConverter.convertGlobalTransactionDO(session)); } else if (LogOperation.GLOBAL_REMOVE.equals(logOperation)) { return logStore.deleteGlobalTransactionDO(SessionConverter.convertGlobalTransactionDO(session)); } else if (LogOperation.BRANCH_ADD.equals(logOperation)) { return logStore.insertBranchTransactionDO(SessionConverter.convertBranchTransactionDO(session)); } else if (LogOperation.BRANCH_UPDATE.equals(logOperation)) { return logStore.updateBranchTransactionDO(SessionConverter.convertBranchTransactionDO(session)); } else if (LogOperation.BRANCH_REMOVE.equals(logOperation)) { return logStore.deleteBranchTransactionDO(SessionConverter.convertBranchTransactionDO(session)); } else { throw new StoreException("Unknown LogOperation:" + logOperation.name()); } } ... } public class LogStoreDataBaseDAO implements LogStore { protected DataSource logStoreDataSource = null; protected String globalTable; protected String branchTable; private String dbType; ... public LogStoreDataBaseDAO(DataSource logStoreDataSource) { this.logStoreDataSource = logStoreDataSource; globalTable = CONFIG.getConfig(ConfigurationKeys.STORE_DB_GLOBAL_TABLE, DEFAULT_STORE_DB_GLOBAL_TABLE); branchTable = CONFIG.getConfig(ConfigurationKeys.STORE_DB_BRANCH_TABLE, DEFAULT_STORE_DB_BRANCH_TABLE); dbType = CONFIG.getConfig(ConfigurationKeys.STORE_DB_TYPE); if (StringUtils.isBlank(dbType)) { throw new StoreException("there must be db type."); } if (logStoreDataSource == null) { throw new StoreException("there must be logStoreDataSource."); } //init transaction_name size initTransactionNameSize(); } @Override public boolean insertGlobalTransactionDO(GlobalTransactionDO globalTransactionDO) { String sql = LogStoreSqlsFactory.getLogStoreSqls(dbType).getInsertGlobalTransactionSQL(globalTable); Connection conn = null; PreparedStatement ps = null; try { int index = 1; conn = logStoreDataSource.getConnection(); conn.setAutoCommit(true); ps = conn.prepareStatement(sql); ps.setString(index++, globalTransactionDO.getXid()); ps.setLong(index++, globalTransactionDO.getTransactionId()); ps.setInt(index++, globalTransactionDO.getStatus()); ps.setString(index++, globalTransactionDO.getApplicationId()); ps.setString(index++, globalTransactionDO.getTransactionServiceGroup()); String transactionName = globalTransactionDO.getTransactionName(); transactionName = transactionName.length() > transactionNameColumnSize ? transactionName.substring(0, transactionNameColumnSize) : transactionName; ps.setString(index++, transactionName); ps.setInt(index++, globalTransactionDO.getTimeout()); ps.setLong(index++, globalTransactionDO.getBeginTime()); ps.setString(index++, globalTransactionDO.getApplicationData()); return ps.executeUpdate() > 0; } catch (SQLException e) { throw new StoreException(e); } finally { IOUtil.close(ps, conn); } } ... }
(3)将全局事务会话持久化到File文件的实现
public class FileTransactionStoreManager extends AbstractTransactionStoreManager implements TransactionStoreManager, ReloadableStore { private ReentrantLock writeSessionLock = new ReentrantLock(); ... @Override public boolean writeSession(LogOperation logOperation, SessionStorable session) { long curFileTrxNum; writeSessionLock.lock(); try { if (!writeDataFile(new TransactionWriteStore(session, logOperation).encode())) { return false; } lastModifiedTime = System.currentTimeMillis(); curFileTrxNum = FILE_TRX_NUM.incrementAndGet(); if (curFileTrxNum % PER_FILE_BLOCK_SIZE == 0 && (System.currentTimeMillis() - trxStartTimeMills) > MAX_TRX_TIMEOUT_MILLS) { return saveHistory(); } } catch (Exception exx) { LOGGER.error("writeSession error, {}", exx.getMessage(), exx); return false; } finally { writeSessionLock.unlock(); } flushDisk(curFileTrxNum, currFileChannel); return true; } private boolean writeDataFile(byte[] bs) { if (bs == null || bs.length >= Integer.MAX_VALUE - 3) { return false; } if (!writeDataFrame(bs)) { return false; } return flushWriteBuffer(writeBuffer); } private boolean writeDataFrame(byte[] data) { if (data == null || data.length <= 0) { return true; } int dataLength = data.length; int bufferRemainingSize = writeBuffer.remaining(); if (bufferRemainingSize <= INT_BYTE_SIZE) { if (!flushWriteBuffer(writeBuffer)) { return false; } } bufferRemainingSize = writeBuffer.remaining(); if (bufferRemainingSize <= INT_BYTE_SIZE) { throw new IllegalStateException(String.format("Write buffer remaining size %d was too small", bufferRemainingSize)); } writeBuffer.putInt(dataLength); bufferRemainingSize = writeBuffer.remaining(); int dataPos = 0; while (dataPos < dataLength) { int dataLengthToWrite = dataLength - dataPos; dataLengthToWrite = Math.min(dataLengthToWrite, bufferRemainingSize); writeBuffer.put(data, dataPos, dataLengthToWrite); bufferRemainingSize = writeBuffer.remaining(); if (bufferRemainingSize == 0) { if (!flushWriteBuffer(writeBuffer)) { return false; } bufferRemainingSize = writeBuffer.remaining(); } dataPos += dataLengthToWrite; } return true; } private boolean flushWriteBuffer(ByteBuffer writeBuffer) { writeBuffer.flip(); if (!writeDataFileByBuffer(writeBuffer)) { return false; } writeBuffer.clear(); return true; } private void flushDisk(long curFileNum, FileChannel currFileChannel) { if (FLUSH_DISK_MODE == FlushDiskMode.SYNC_MODEL) { SyncFlushRequest syncFlushRequest = new SyncFlushRequest(curFileNum, currFileChannel); writeDataFileRunnable.putRequest(syncFlushRequest); syncFlushRequest.waitForFlush(MAX_WAIT_FOR_FLUSH_TIME_MILLS); } else { writeDataFileRunnable.putRequest(new AsyncFlushRequest(curFileNum, currFileChannel)); } } ... } public class TransactionWriteStore implements SessionStorable { private SessionStorable sessionRequest; private LogOperation operate; public TransactionWriteStore(SessionStorable sessionRequest, LogOperation operate) { this.sessionRequest = sessionRequest; this.operate = operate; } @Override public byte[] encode() { byte[] bySessionRequest = this.sessionRequest.encode(); byte byOpCode = this.getOperate().getCode(); int len = bySessionRequest.length + 1; byte[] byResult = new byte[len]; ByteBuffer byteBuffer = ByteBuffer.wrap(byResult); byteBuffer.put(bySessionRequest); byteBuffer.put(byOpCode); return byResult; } ... }
(4)将全局事务会话持久化到Redis存储的实现
这里的实现比较优雅,十分值得借鉴。
public class RedisTransactionStoreManager extends AbstractTransactionStoreManager implements TransactionStoreManager { private static volatile RedisTransactionStoreManager instance; //Map for LogOperation Global Operation public static volatile ImmutableMap<LogOperation, Function<GlobalTransactionDO, Boolean>> globalMap; //Map for LogOperation Branch Operation public static volatile ImmutableMap<LogOperation, Function<BranchTransactionDO, Boolean>> branchMap; ... public static RedisTransactionStoreManager getInstance() { if (instance == null) { synchronized (RedisTransactionStoreManager.class) { if (instance == null) { instance = new RedisTransactionStoreManager(); } } } return instance; } public RedisTransactionStoreManager() { super(); initGlobalMap(); initBranchMap(); logQueryLimit = CONFIG.getInt(STORE_REDIS_QUERY_LIMIT, DEFAULT_LOG_QUERY_LIMIT); if (logQueryLimit > DEFAULT_LOG_QUERY_LIMIT) { logQueryLimit = DEFAULT_LOG_QUERY_LIMIT; } } public void initGlobalMap() { if (CollectionUtils.isEmpty(branchMap)) { globalMap = ImmutableMap.<LogOperation, Function<GlobalTransactionDO, Boolean>>builder() .put(LogOperation.GLOBAL_ADD, this::insertGlobalTransactionDO) .put(LogOperation.GLOBAL_UPDATE, this::updateGlobalTransactionDO) .put(LogOperation.GLOBAL_REMOVE, this::deleteGlobalTransactionDO) .build(); } } public void initBranchMap() { if (CollectionUtils.isEmpty(branchMap)) { branchMap = ImmutableMap.<LogOperation, Function<BranchTransactionDO, Boolean>>builder() .put(LogOperation.BRANCH_ADD, this::insertBranchTransactionDO) .put(LogOperation.BRANCH_UPDATE, this::updateBranchTransactionDO) .put(LogOperation.BRANCH_REMOVE, this::deleteBranchTransactionDO) .build(); } } //Insert the global transaction. private boolean insertGlobalTransactionDO(GlobalTransactionDO globalTransactionDO) { String globalKey = buildGlobalKeyByTransactionId(globalTransactionDO.getTransactionId()); try (Jedis jedis = JedisPooledFactory.getJedisInstance(); Pipeline pipelined = jedis.pipelined()) { Date now = new Date(); globalTransactionDO.setGmtCreate(now); globalTransactionDO.setGmtModified(now); pipelined.hmset(globalKey, BeanUtils.objectToMap(globalTransactionDO)); pipelined.rpush(buildGlobalStatus(globalTransactionDO.getStatus()), globalTransactionDO.getXid()); pipelined.sync(); return true; } catch (Exception ex) { throw new RedisException(ex); } } //Insert branch transaction private boolean insertBranchTransactionDO(BranchTransactionDO branchTransactionDO) { String branchKey = buildBranchKey(branchTransactionDO.getBranchId()); String branchListKey = buildBranchListKeyByXid(branchTransactionDO.getXid()); try (Jedis jedis = JedisPooledFactory.getJedisInstance(); Pipeline pipelined = jedis.pipelined()) { Date now = new Date(); branchTransactionDO.setGmtCreate(now); branchTransactionDO.setGmtModified(now); pipelined.hmset(branchKey, BeanUtils.objectToMap(branchTransactionDO)); pipelined.rpush(branchListKey, branchKey); pipelined.sync(); return true; } catch (Exception ex) { throw new RedisException(ex); } } @Override public boolean writeSession(LogOperation logOperation, SessionStorable session) { if (globalMap.containsKey(logOperation) || branchMap.containsKey(logOperation)) { return globalMap.containsKey(logOperation) ? globalMap.get(logOperation).apply(SessionConverter.convertGlobalTransactionDO(session)) : branchMap.get(logOperation).apply(SessionConverter.convertBranchTransactionDO(session)); } else { throw new StoreException("Unknown LogOperation:" + logOperation.name()); } } ... } public class SessionConverter { ... public static GlobalTransactionDO convertGlobalTransactionDO(SessionStorable session) { if (session == null || !(session instanceof GlobalSession)) { throw new IllegalArgumentException("The parameter of SessionStorable is not available, SessionStorable:" + StringUtils.toString(session)); } GlobalSession globalSession = (GlobalSession)session; GlobalTransactionDO globalTransactionDO = new GlobalTransactionDO(); globalTransactionDO.setXid(globalSession.getXid()); globalTransactionDO.setStatus(globalSession.getStatus().getCode()); globalTransactionDO.setApplicationId(globalSession.getApplicationId()); globalTransactionDO.setBeginTime(globalSession.getBeginTime()); globalTransactionDO.setTimeout(globalSession.getTimeout()); globalTransactionDO.setTransactionId(globalSession.getTransactionId()); globalTransactionDO.setTransactionName(globalSession.getTransactionName()); globalTransactionDO.setTransactionServiceGroup(globalSession.getTransactionServiceGroup()); globalTransactionDO.setApplicationData(globalSession.getApplicationData()); return globalTransactionDO; } public static BranchTransactionDO convertBranchTransactionDO(SessionStorable session) { if (session == null || !(session instanceof BranchSession)) { throw new IllegalArgumentException("The parameter of SessionStorable is not available, SessionStorable:" + StringUtils.toString(session)); } BranchSession branchSession = (BranchSession)session; BranchTransactionDO branchTransactionDO = new BranchTransactionDO(); branchTransactionDO.setXid(branchSession.getXid()); branchTransactionDO.setBranchId(branchSession.getBranchId()); branchTransactionDO.setBranchType(branchSession.getBranchType().name()); branchTransactionDO.setClientId(branchSession.getClientId()); branchTransactionDO.setResourceGroupId(branchSession.getResourceGroupId()); branchTransactionDO.setTransactionId(branchSession.getTransactionId()); branchTransactionDO.setApplicationData(branchSession.getApplicationData()); branchTransactionDO.setResourceId(branchSession.getResourceId()); branchTransactionDO.setStatus(branchSession.getStatus().getCode()); return branchTransactionDO; } ... }
5.Seata Server创建全局事务与返回xid的源码
-> ServerHandler.channelRead()接收Seata Client发送过来的请求; -> AbstractNettyRemoting.processMessage()处理RpcMessage消息; -> ServerOnRequestProcessor.process()处理RpcMessage消息; -> TransactionMessageHandler.onRequest()处理RpcMessage消息; -> RemotingServer.sendAsyncResponse()返回包含xid的响应给客户端;
public abstract class AbstractNettyRemotingServer extends AbstractNettyRemoting implements RemotingServer { ... @ChannelHandler.Sharable class ServerHandler extends ChannelDuplexHandler { @Override public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception { if (!(msg instanceof RpcMessage)) { return; } //接下来调用processMessage()方法对解码完毕的RpcMessage对象进行处理 processMessage(ctx, (RpcMessage) msg); } } } public abstract class AbstractNettyRemoting implements Disposable { ... protected void processMessage(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception { if (LOGGER.isDebugEnabled()) { LOGGER.debug(String.format("%s msgId:%s, body:%s", this, rpcMessage.getId(), rpcMessage.getBody())); } Object body = rpcMessage.getBody(); if (body instanceof MessageTypeAware) { MessageTypeAware messageTypeAware = (MessageTypeAware) body; //根据消息类型获取到一个Pair对象,该Pair对象是由请求处理组件和请求处理线程池组成的 //processorTable里的内容,是NettyRemotingServer在初始化时,通过调用registerProcessor()方法put进去的 //所以下面的代码实际上会由ServerOnRequestProcessor的process()方法进行处理 final Pair<RemotingProcessor, ExecutorService> pair = this.processorTable.get((int) messageTypeAware.getTypeCode()); if (pair != null) { if (pair.getSecond() != null) { try { pair.getSecond().execute(() -> { try { pair.getFirst().process(ctx, rpcMessage); } catch (Throwable th) { LOGGER.error(FrameworkErrorCode.NetDispatch.getErrCode(), th.getMessage(), th); } finally { MDC.clear(); } }); } catch (RejectedExecutionException e) { ... } } else { try { pair.getFirst().process(ctx, rpcMessage); } catch (Throwable th) { LOGGER.error(FrameworkErrorCode.NetDispatch.getErrCode(), th.getMessage(), th); } } } else { LOGGER.error("This message type [{}] has no processor.", messageTypeAware.getTypeCode()); } } else { LOGGER.error("This rpcMessage body[{}] is not MessageTypeAware type.", body); } } ... } public class ServerOnRequestProcessor implements RemotingProcessor, Disposable { private final RemotingServer remotingServer; ... @Override public void process(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception { if (ChannelManager.isRegistered(ctx.channel())) { onRequestMessage(ctx, rpcMessage); } else { try { if (LOGGER.isInfoEnabled()) { LOGGER.info("closeChannelHandlerContext channel:" + ctx.channel()); } ctx.disconnect(); ctx.close(); } catch (Exception exx) { LOGGER.error(exx.getMessage()); } if (LOGGER.isInfoEnabled()) { LOGGER.info(String.format("close a unhandled connection! [%s]", ctx.channel().toString())); } } } private void onRequestMessage(ChannelHandlerContext ctx, RpcMessage rpcMessage) { Object message = rpcMessage.getBody(); //RpcContext线程本地变量副本 RpcContext rpcContext = ChannelManager.getContextFromIdentified(ctx.channel()); if (LOGGER.isDebugEnabled()) { LOGGER.debug("server received:{},clientIp:{},vgroup:{}", message, NetUtil.toIpAddress(ctx.channel().remoteAddress()), rpcContext.getTransactionServiceGroup()); } else { try { BatchLogHandler.INSTANCE.getLogQueue().put(message + ",clientIp:" + NetUtil.toIpAddress(ctx.channel().remoteAddress()) + ",vgroup:" + rpcContext.getTransactionServiceGroup()); } catch (InterruptedException e) { LOGGER.error("put message to logQueue error: {}", e.getMessage(), e); } } if (!(message instanceof AbstractMessage)) { return; } // the batch send request message if (message instanceof MergedWarpMessage) { ... } else { // the single send request message final AbstractMessage msg = (AbstractMessage) message; //最终调用到DefaultCoordinator的onRequest()方法来处理RpcMessage AbstractResultMessage result = transactionMessageHandler.onRequest(msg, rpcContext); //返回响应给客户端 remotingServer.sendAsyncResponse(rpcMessage, ctx.channel(), result); } } ... }
-> TransactionMessageHandler.onRequest()处理RpcMessage消息; -> DefaultCoordinator.onRequest()处理RpcMessage消息; -> GlobalBeginRequest.handle()处理开启全局事务请求; -> AbstractTCInboundHandler.handle()开启全局事务返回全局事务; -> DefaultCoordinator.doGlobalBegin()开启全局事务; -> DefaultCore.begin()创建全局事务会话并开启; -> GlobalSession.createGlobalSession()创建全局事务会话; -> GlobalSession.begin()开启全局事务会话; -> AbstractSessionManager.onBegin() -> AbstractSessionManager.addGlobalSession() -> AbstractSessionManager.writeSession() -> TransactionStoreManager.writeSession()持久化全局事务会话;
public class DefaultCoordinator extends AbstractTCInboundHandler implements TransactionMessageHandler, Disposable { ... @Override public AbstractResultMessage onRequest(AbstractMessage request, RpcContext context) { if (!(request instanceof AbstractTransactionRequestToTC)) { throw new IllegalArgumentException(); } AbstractTransactionRequestToTC transactionRequest = (AbstractTransactionRequestToTC) request; transactionRequest.setTCInboundHandler(this); return transactionRequest.handle(context); } ... } public class GlobalBeginRequest extends AbstractTransactionRequestToTC { ... @Override public AbstractTransactionResponse handle(RpcContext rpcContext) { return handler.handle(this, rpcContext); } ... } public abstract class AbstractTCInboundHandler extends AbstractExceptionHandler implements TCInboundHandler { private static final Logger LOGGER = LoggerFactory.getLogger(AbstractTCInboundHandler.class); @Override public GlobalBeginResponse handle(GlobalBeginRequest request, final RpcContext rpcContext) { GlobalBeginResponse response = new GlobalBeginResponse(); exceptionHandleTemplate(new AbstractCallback<GlobalBeginRequest, GlobalBeginResponse>() { @Override public void execute(GlobalBeginRequest request, GlobalBeginResponse response) throws TransactionException { try { //开启全局事务 doGlobalBegin(request, response, rpcContext); } catch (StoreException e) { throw new TransactionException(TransactionExceptionCode.FailedStore, String.format("begin global request failed. xid=%s, msg=%s", response.getXid(), e.getMessage()), e); } } }, request, response); return response; } ... } public class DefaultCoordinator extends AbstractTCInboundHandler implements TransactionMessageHandler, Disposable { private final DefaultCore core; ... @Override protected void doGlobalBegin(GlobalBeginRequest request, GlobalBeginResponse response, RpcContext rpcContext) throws TransactionException { //接下来才真正处理开启全局事务的业务逻辑 //其中会调用DefaultCore来真正开启一个全局事务,即拿到xid并设置到响应里去 response.setXid(core.begin( rpcContext.getApplicationId(),//应用程序id rpcContext.getTransactionServiceGroup(),//事务服务分组 request.getTransactionName(),//事务名称 request.getTimeout())//超时时间 ); if (LOGGER.isInfoEnabled()) { LOGGER.info("Begin new global transaction applicationId: {},transactionServiceGroup: {}, transactionName: {},timeout:{},xid:{}", rpcContext.getApplicationId(), rpcContext.getTransactionServiceGroup(), request.getTransactionName(), request.getTimeout(), response.getXid()); } } ... } public class DefaultCore implements Core { ... @Override public String begin(String applicationId, String transactionServiceGroup, String name, int timeout) throws TransactionException { //创建一个全局事务会话 GlobalSession session = GlobalSession.createGlobalSession(applicationId, transactionServiceGroup, name, timeout); //通过slf4j的MDC把xid放入线程本地变量副本里去 MDC.put(RootContext.MDC_KEY_XID, session.getXid()); //添加一个全局事务会话的生命周期监听器 session.addSessionLifecycleListener(SessionHolder.getRootSessionManager()); //打开Session,其中会对全局事务会话进行持久化 session.begin(); //transaction start event,发布会话开启事件 MetricsPublisher.postSessionDoingEvent(session, false); //返回全局事务会话的xid return session.getXid(); } ... } public class GlobalSession implements SessionLifecycle, SessionStorable { ... public static GlobalSession createGlobalSession(String applicationId, String txServiceGroup, String txName, int timeout) { GlobalSession session = new GlobalSession(applicationId, txServiceGroup, txName, timeout, false); return session; } public GlobalSession(String applicationId, String transactionServiceGroup, String transactionName, int timeout, boolean lazyLoadBranch) { //全局事务id是通过UUIDGenerator来生成的 this.transactionId = UUIDGenerator.generateUUID(); this.status = GlobalStatus.Begin; this.lazyLoadBranch = lazyLoadBranch; if (!lazyLoadBranch) { this.branchSessions = new ArrayList<>(); } this.applicationId = applicationId; this.transactionServiceGroup = transactionServiceGroup; this.transactionName = transactionName; this.timeout = timeout; //根据UUIDGenerator生成的transactionId + XID工具生成最终的xid this.xid = XID.generateXID(transactionId); } @Override public void begin() throws TransactionException { this.status = GlobalStatus.Begin; this.beginTime = System.currentTimeMillis(); this.active = true; for (SessionLifecycleListener lifecycleListener : lifecycleListeners) { lifecycleListener.onBegin(this); } } ... } public abstract class AbstractSessionManager implements SessionManager, SessionLifecycleListener { ... @Override public void onBegin(GlobalSession globalSession) throws TransactionException { addGlobalSession(globalSession); } @Override public void addGlobalSession(GlobalSession session) throws TransactionException { if (LOGGER.isDebugEnabled()) { LOGGER.debug("MANAGER[{}] SESSION[{}] {}", name, session, LogOperation.GLOBAL_ADD); } writeSession(LogOperation.GLOBAL_ADD, session); } private void writeSession(LogOperation logOperation, SessionStorable sessionStorable) throws TransactionException { //transactionStoreManager.writeSession()会对全局事务会话进行持久化 if (!transactionStoreManager.writeSession(logOperation, sessionStorable)) { ... } } ... }
-> RemotingServer.sendAsyncResponse()返回包含xid的响应给客户端; -> AbstractNettyRemotingServer.sendAsyncResponse()异步发送响应; -> AbstractNettyRemoting.buildResponseMessage()构造包含xid响应; -> AbstractNettyRemoting.sendAsync()异步发送响应; -> Netty的Channel.writeAndFlush()发送响应给客户端;
public abstract class AbstractNettyRemotingServer extends AbstractNettyRemoting implements RemotingServer { ... @Override public void sendAsyncResponse(RpcMessage rpcMessage, Channel channel, Object msg) { Channel clientChannel = channel; if (!(msg instanceof HeartbeatMessage)) { clientChannel = ChannelManager.getSameClientChannel(channel); } if (clientChannel != null) { RpcMessage rpcMsg = buildResponseMessage(rpcMessage, msg, msg instanceof HeartbeatMessage ? ProtocolConstants.MSGTYPE_HEARTBEAT_RESPONSE : ProtocolConstants.MSGTYPE_RESPONSE); super.sendAsync(clientChannel, rpcMsg); } else { throw new RuntimeException("channel is error."); } } ... } public abstract class AbstractNettyRemoting implements Disposable { ... protected RpcMessage buildResponseMessage(RpcMessage rpcMessage, Object msg, byte messageType) { RpcMessage rpcMsg = new RpcMessage(); rpcMsg.setMessageType(messageType); rpcMsg.setCodec(rpcMessage.getCodec()); // same with request rpcMsg.setCompressor(rpcMessage.getCompressor()); rpcMsg.setBody(msg); rpcMsg.setId(rpcMessage.getId()); return rpcMsg; } //rpc async request. protected void sendAsync(Channel channel, RpcMessage rpcMessage) { channelWritableCheck(channel, rpcMessage.getBody()); if (LOGGER.isDebugEnabled()) { LOGGER.debug("write message:" + rpcMessage.getBody() + ", channel:" + channel + ",active?" + channel.isActive() + ",writable?" + channel.isWritable() + ",isopen?" + channel.isOpen()); } doBeforeRpcHooks(ChannelUtil.getAddressFromChannel(channel), rpcMessage); channel.writeAndFlush(rpcMessage).addListener((ChannelFutureListener) future -> { if (!future.isSuccess()) { destroyChannel(future.channel()); } }); } ... }
6.Client获取Server的响应与处理的源码
-> ClientHandler.channelRead()接收Seata Server返回的响应; -> AbstractNettyRemoting.processMessage()处理RpcMessage消息; -> ClientOnResponseProcessor.process()会设置MessageFuture结果; -> MessageFuture.setResultMessage()设置MessageFuture结果; -> CompletableFuture.complete()唤醒阻塞的线程;
public abstract class AbstractNettyRemotingClient extends AbstractNettyRemoting implements RemotingClient { ... @Sharable class ClientHandler extends ChannelDuplexHandler { @Override public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception { if (!(msg instanceof RpcMessage)) { return; } processMessage(ctx, (RpcMessage) msg); } ... } ... } public abstract class AbstractNettyRemoting implements Disposable { ... protected void processMessage(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception { if (LOGGER.isDebugEnabled()) { LOGGER.debug(String.format("%s msgId:%s, body:%s", this, rpcMessage.getId(), rpcMessage.getBody())); } Object body = rpcMessage.getBody(); if (body instanceof MessageTypeAware) { MessageTypeAware messageTypeAware = (MessageTypeAware) body; //根据消息类型获取到一个Pair对象,该Pair对象是由请求处理组件和请求处理线程池组成的 //processorTable里的内容,是NettyRemotingServer在初始化时,通过调用registerProcessor()方法put进去的 //所以下面的代码实际上会由ServerOnRequestProcessor的process()方法进行处理 final Pair<RemotingProcessor, ExecutorService> pair = this.processorTable.get((int) messageTypeAware.getTypeCode()); if (pair != null) { if (pair.getSecond() != null) { try { pair.getSecond().execute(() -> { try { pair.getFirst().process(ctx, rpcMessage); } catch (Throwable th) { LOGGER.error(FrameworkErrorCode.NetDispatch.getErrCode(), th.getMessage(), th); } finally { MDC.clear(); } }); } catch (RejectedExecutionException e) { ... } } else { try { pair.getFirst().process(ctx, rpcMessage); } catch (Throwable th) { LOGGER.error(FrameworkErrorCode.NetDispatch.getErrCode(), th.getMessage(), th); } } } else { LOGGER.error("This message type [{}] has no processor.", messageTypeAware.getTypeCode()); } } else { LOGGER.error("This rpcMessage body[{}] is not MessageTypeAware type.", body); } } ... } public class ClientOnResponseProcessor implements RemotingProcessor { ... @Override public void process(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception { if (rpcMessage.getBody() instanceof MergeResultMessage) { ... } else if (rpcMessage.getBody() instanceof BatchResultMessage) { ... } else { //这里是对普通消息的处理 MessageFuture messageFuture = futures.remove(rpcMessage.getId()); if (messageFuture != null) { messageFuture.setResultMessage(rpcMessage.getBody()); } else { if (rpcMessage.getBody() instanceof AbstractResultMessage) { if (transactionMessageHandler != null) { transactionMessageHandler.onResponse((AbstractResultMessage) rpcMessage.getBody(), null); } } } } } ... } public class MessageFuture { private transient CompletableFuture<Object> origin = new CompletableFuture<>(); ... //Sets result message. public void setResultMessage(Object obj) { origin.complete(obj); } ... }
由于Seata Client发送开启全局事务的请求给Seata Server时,会通过MessageFuture的get()方法同步等待Seata Server返回响应。所以当Seata Client获取Seata Server的响应并通过complete()方法设置MessageFuture已经完成后,原来同步等待Seata Server响应的线程便会继续往下处理。
即某线程执行CompletableFuture.complete()方法后,执行CompletableFuture.get()方法的线程就不会被阻塞而会被唤醒。
-> GlobalTransactionalInterceptor.invoke() -> GlobalTransactionalInterceptor.handleGlobalTransaction() -> TransactionalTemplate.execute() -> TransactionalTemplate.beginTransaction() -> DefaultGlobalTransaction.begin() -> DefaultTransactionManager.begin() -> DefaultTransactionManager.syncCall() -> TmNettyRemotingClient.sendSyncRequest() -> AbstractNettyRemotingClient.sendSyncRequest()发送请求; -> AbstractNettyRemoting.sendSync()发送同步请求; -> MessageFuture.get()会同步等待Seata Server的响应结果; -> CompletableFuture.get()阻塞当前线程进行等待唤醒;
public abstract class AbstractNettyRemotingClient extends AbstractNettyRemoting implements RemotingClient { ... @Override public Object sendSyncRequest(Object msg) throws TimeoutException { //因为Seata Server是可以多节点部署实现高可用架构的,所以这里调用loadBalance()方法进行负载均衡 String serverAddress = loadBalance(getTransactionServiceGroup(), msg); //获取RPC调用的超时时间 long timeoutMillis = this.getRpcRequestTimeout(); //构建一个RPC消息 RpcMessage rpcMessage = buildRequestMessage(msg, ProtocolConstants.MSGTYPE_RESQUEST_SYNC); //send batch message //put message into basketMap, @see MergedSendRunnable //默认是不开启批量消息发送 if (this.isEnableClientBatchSendRequest()) { ... } else { //通过网络连接管理器clientChannelManager,获取与指定Seata Server建立的网络连接Channel //然后通过网络连接Channel把RpcMessage发送出去 Channel channel = clientChannelManager.acquireChannel(serverAddress); return super.sendSync(channel, rpcMessage, timeoutMillis); } } ... } public abstract class AbstractNettyRemoting implements Disposable { ... protected Object sendSync(Channel channel, RpcMessage rpcMessage, long timeoutMillis) throws TimeoutException { if (timeoutMillis <= 0) { throw new FrameworkException("timeout should more than 0ms"); } if (channel == null) { LOGGER.warn("sendSync nothing, caused by null channel."); return null; } //把发送出去的请求封装到MessageFuture中,然后存放到futures这个Map里 MessageFuture messageFuture = new MessageFuture(); messageFuture.setRequestMessage(rpcMessage); messageFuture.setTimeout(timeoutMillis); futures.put(rpcMessage.getId(), messageFuture); channelWritableCheck(channel, rpcMessage.getBody()); //获取远程地址 String remoteAddr = ChannelUtil.getAddressFromChannel(channel); doBeforeRpcHooks(remoteAddr, rpcMessage); //异步化发送数据,同时对发送结果添加监听器 //如果发送失败,则会对网络连接Channel进行销毁处理 channel.writeAndFlush(rpcMessage).addListener((ChannelFutureListener) future -> { if (!future.isSuccess()) { MessageFuture messageFuture1 = futures.remove(rpcMessage.getId()); if (messageFuture1 != null) { messageFuture1.setResultMessage(future.cause()); } destroyChannel(future.channel()); } }); try { //然后通过请求响应组件MessageFuture同步等待Seata Server返回该请求的响应 Object result = messageFuture.get(timeoutMillis, TimeUnit.MILLISECONDS); doAfterRpcHooks(remoteAddr, rpcMessage, result); return result; } catch (Exception exx) { LOGGER.error("wait response error:{},ip:{},request:{}", exx.getMessage(), channel.remoteAddress(), rpcMessage.getBody()); if (exx instanceof TimeoutException) { throw (TimeoutException) exx; } else { throw new RuntimeException(exx); } } } ... } public class MessageFuture { private transient CompletableFuture<Object> origin = new CompletableFuture<>(); ... public Object get(long timeout, TimeUnit unit) throws TimeoutException, InterruptedException { Object result = null; try { result = origin.get(timeout, unit); if (result instanceof TimeoutException) { throw (TimeoutException)result; } } catch (ExecutionException e) { throw new ShouldNeverHappenException("Should not get results in a multi-threaded environment", e); } catch (TimeoutException e) { throw new TimeoutException(String.format("%s ,cost: %d ms", e.getMessage(), System.currentTimeMillis() - start)); } if (result instanceof RuntimeException) { throw (RuntimeException)result; } else if (result instanceof Throwable) { throw new RuntimeException((Throwable)result); } return result; } ... }
7.Seata与Dubbo整合的过滤器源码
(1)调用Dubbo过滤器的入口
(2)Seata与Dubbo整合的过滤器
(1)调用Dubbo过滤器的入口
-> GlobalTransactionalInterceptor.invoke()拦截添加了@GlobalTransactional注解的方法; -> GlobalTransactionalInterceptor.handleGlobalTransaction()进行全局事务的处理; -> TransactionalTemplate.execute()执行全局事务 -> TransactionalTemplate.beginTransaction()开启一个全局事务 -> handleGlobalTransaction().methodInvocation.proceed()真正执行目标方法 -> ApacheDubboTransactionPropagationFilter.invoke()经过Dubbo过滤器处理
public class GlobalTransactionalInterceptor implements ConfigurationChangeListener, MethodInterceptor, SeataInterceptor { ... //如果调用添加了@GlobalTransactional注解的方法,就会执行如下invoke()方法 @Override public Object invoke(final MethodInvocation methodInvocation) throws Throwable { //methodInvocation是一次方法调用 //通过methodInvocation的getThis()方法可以获取到被调用方法的对象 //通过AopUtils.getTargetClass()方法可以获取到对象对应的Class Class<?> targetClass = methodInvocation.getThis() != null ? AopUtils.getTargetClass(methodInvocation.getThis()) : null; //通过反射,获取到目标class中被调用的method方法 Method specificMethod = ClassUtils.getMostSpecificMethod(methodInvocation.getMethod(), targetClass); //如果调用的目标method不为null if (specificMethod != null && !specificMethod.getDeclaringClass().equals(Object.class)) { //尝试寻找桥接方法bridgeMethod final Method method = BridgeMethodResolver.findBridgedMethod(specificMethod); //通过反射,获取被调用的目标方法的@GlobalTransactional注解 final GlobalTransactional globalTransactionalAnnotation = getAnnotation(method, targetClass, GlobalTransactional.class); //通过反射,获取被调用目标方法的@GlobalLock注解 final GlobalLock globalLockAnnotation = getAnnotation(method, targetClass, GlobalLock.class); //如果禁用了全局事务,或者开启了降级检查,同时降级次数大于了降级检查允许次数,那么localDisable就为true //localDisable为true则表示全局事务被禁用了,此时就不可以开启全局事务了 boolean localDisable = disable || (degradeCheck && degradeNum >= degradeCheckAllowTimes); //如果全局事务没有禁用 if (!localDisable) { //全局事务注解不为空,或者是AOP切面全局事务核心配置不为空 if (globalTransactionalAnnotation != null || this.aspectTransactional != null) { AspectTransactional transactional; if (globalTransactionalAnnotation != null) { //创建全局事务AOP切面的核心配置AspectTransactional,配置数据会从全局事务注解里提取出来 transactional = new AspectTransactional( globalTransactionalAnnotation.timeoutMills(), globalTransactionalAnnotation.name(), globalTransactionalAnnotation.rollbackFor(), globalTransactionalAnnotation.noRollbackForClassName(), globalTransactionalAnnotation.noRollbackFor(), globalTransactionalAnnotation.noRollbackForClassName(), globalTransactionalAnnotation.propagation(), globalTransactionalAnnotation.lockRetryInterval(), globalTransactionalAnnotation.lockRetryTimes() ); } else { transactional = this.aspectTransactional; } //真正处理全局事务的入口 return handleGlobalTransaction(methodInvocation, transactional); } else if (globalLockAnnotation != null) { return handleGlobalLock(methodInvocation, globalLockAnnotation); } } } //直接运行目标方法 return methodInvocation.proceed(); } //真正进行全局事务的处理 Object handleGlobalTransaction(final MethodInvocation methodInvocation, final AspectTransactional aspectTransactional) throws Throwable { boolean succeed = true; try { //基于全局事务执行模版TransactionalTemplate,来执行全局事务 return transactionalTemplate.execute(new TransactionalExecutor() { @Override public Object execute() throws Throwable { //真正执行目标方法 return methodInvocation.proceed(); } ... }); } catch (TransactionalExecutor.ExecutionException e) { ... } finally { if (degradeCheck) { EVENT_BUS.post(new DegradeCheckEvent(succeed)); } } } ... } public class TransactionalTemplate { ... public Object execute(TransactionalExecutor business) throws Throwable { //1.Get transactionInfo TransactionInfo txInfo = business.getTransactionInfo(); if (txInfo == null) { throw new ShouldNeverHappenException("transactionInfo does not exist"); } //1.1 Get current transaction, if not null, the tx role is 'GlobalTransactionRole.Participant'. //根据线程本地变量副本,获取当前线程本地变量副本里是否存在xid,如果存在则创建一个全局事务 //刚开始在开启一个全局事务的时候,是没有全局事务的 GlobalTransaction tx = GlobalTransactionContext.getCurrent(); //1.2 Handle the transaction propagation. //从全局事务配置里,可以获取到全局事务的传播级别,默认是REQUIRED //也就是如果存在一个全局事务,就直接执行业务;如果不存在一个全局事务,就开启一个新的全局事务; Propagation propagation = txInfo.getPropagation(); //不同的全局事务传播级别,会采取不同的处理方式 //比如挂起当前事务 + 开启新的事务,或者是直接不使用事务执行业务,挂起其实就是解绑当前线程的xid //可以通过@GlobalTransactional注解,定制业务方法的全局事务,比如指定业务方法全局事务的传播级别 SuspendedResourcesHolder suspendedResourcesHolder = null; try { switch (propagation) { ... } //1.3 If null, create new transaction with role 'GlobalTransactionRole.Launcher'. if (tx == null) { tx = GlobalTransactionContext.createNew(); } //set current tx config to holder GlobalLockConfig previousConfig = replaceGlobalLockConfig(txInfo); try { //2. If the tx role is 'GlobalTransactionRole.Launcher', send the request of beginTransaction to TC, else do nothing. Of course, the hooks will still be triggered. //开启一个全局事务 beginTransaction(txInfo, tx); Object rs; try { //Do Your Business //执行业务方法,把全局事务xid通过Dubbo RPC传递下去,开启并提交一个一个分支事务 rs = business.execute(); } catch (Throwable ex) { //3. The needed business exception to rollback. //发生异常时需要完成的事务 completeTransactionAfterThrowing(txInfo, tx, ex); throw ex; } //4. everything is fine, commit. //如果一切执行正常就会在这里提交全局事务 commitTransaction(tx); return rs; } finally { //5. clear //执行一些全局事务完成后的回调,比如清理等工作 resumeGlobalLockConfig(previousConfig); triggerAfterCompletion(); cleanUp(); } } finally { //If the transaction is suspended, resume it. if (suspendedResourcesHolder != null) { //如果之前挂起了一个全局事务,此时可以恢复这个全局事务 tx.resume(suspendedResourcesHolder); } } } //开启事务 private void beginTransaction(TransactionInfo txInfo, GlobalTransaction tx) throws TransactionalExecutor.ExecutionException { try { //开启全局事务之前有一个回调的一个钩子名为triggerBeforeBegin() triggerBeforeBegin(); //真正去开启一个全局事务 tx.begin(txInfo.getTimeOut(), txInfo.getName()); //开启全局事务之后还有一个回调钩子名为triggerAfterBegin() triggerAfterBegin(); } catch (TransactionException txe) { throw new TransactionalExecutor.ExecutionException(tx, txe, TransactionalExecutor.Code.BeginFailure); } } ... }
(2)Seata与Dubbo整合的过滤器
如果线程本地变量副本里的xid不为null,对应于发起RPC调用的情形。如果线程本地变量副本里的xid为null,则对应于接收RPC调用的情形。
当RootContext的xid不为null时,需要设置RpcContext的xid。当RootContext的xid为null + RpcContext的xid不为null时,需要设置RootContext的xid。
@Activate(group = {DubboConstants.PROVIDER, DubboConstants.CONSUMER}, order = 100) public class ApacheDubboTransactionPropagationFilter implements Filter { private static final Logger LOGGER = LoggerFactory.getLogger(ApacheDubboTransactionPropagationFilter.class); @Override public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException { //发起Dubbo的RPC调用时,会先从线程本地变量副本里获取xid String xid = RootContext.getXID(); //然后从线程本地变量副本里获取当前的分支事务类型,默认分支类型就是AT BranchType branchType = RootContext.getBranchType(); //从RpcContext里获取attachments里的xid和分支类型 String rpcXid = getRpcXid(); String rpcBranchType = RpcContext.getContext().getAttachment(RootContext.KEY_BRANCH_TYPE); if (LOGGER.isDebugEnabled()) { LOGGER.debug("xid in RootContext[{}] xid in RpcContext[{}]", xid, rpcXid); } boolean bind = false; if (xid != null) { //如果线程本地变量副本里的xid不为null,对应于发起RPC调用的情形 //则把线程本地变量副本里的xid和分支类型,设置到RpcContext上下文里 //RpcContext上下文里的attachment内容会随着RPC请求发送到其他系统中 RpcContext.getContext().setAttachment(RootContext.KEY_XID, xid); RpcContext.getContext().setAttachment(RootContext.KEY_BRANCH_TYPE, branchType.name()); } else { //如果线程本地变量副本里的xid为null且RpcContext里的xid不为null,对应于接收RPC调用的情形 if (rpcXid != null) { //把RpcContext里的xid绑定到当前服务的线程本地变量副本里 RootContext.bind(rpcXid); if (StringUtils.equals(BranchType.TCC.name(), rpcBranchType)) { RootContext.bindBranchType(BranchType.TCC); } bind = true; if (LOGGER.isDebugEnabled()) { LOGGER.debug("bind xid [{}] branchType [{}] to RootContext", rpcXid, rpcBranchType); } } } try { return invoker.invoke(invocation); } finally { if (bind) { BranchType previousBranchType = RootContext.getBranchType(); //对线程本地变量副本里的xid做解绑 String unbindXid = RootContext.unbind(); if (BranchType.TCC == previousBranchType) { RootContext.unbindBranchType(); } if (LOGGER.isDebugEnabled()) { LOGGER.debug("unbind xid [{}] branchType [{}] from RootContext", unbindXid, previousBranchType); } if (!rpcXid.equalsIgnoreCase(unbindXid)) { LOGGER.warn("xid in change during RPC from {} to {},branchType from {} to {}", rpcXid, unbindXid, rpcBranchType != null ? rpcBranchType : "AT", previousBranchType); if (unbindXid != null) { RootContext.bind(unbindXid); LOGGER.warn("bind xid [{}] back to RootContext", unbindXid); if (BranchType.TCC == previousBranchType) { RootContext.bindBranchType(BranchType.TCC); LOGGER.warn("bind branchType [{}] back to RootContext", previousBranchType); } } } } //对RpcContext上下文里的东西进行解绑 RpcContext.getContext().removeAttachment(RootContext.KEY_XID); RpcContext.getContext().removeAttachment(RootContext.KEY_BRANCH_TYPE); RpcContext.getServerContext().removeAttachment(RootContext.KEY_XID); RpcContext.getServerContext().removeAttachment(RootContext.KEY_BRANCH_TYPE); } } private String getRpcXid() { String rpcXid = RpcContext.getContext().getAttachment(RootContext.KEY_XID); if (rpcXid == null) { rpcXid = RpcContext.getContext().getAttachment(RootContext.KEY_XID.toLowerCase()); } return rpcXid; } }
详细介绍后端技术栈的基础内容,包括但不限于:MySQL原理和优化、Redis原理和应用、JVM和G1原理和优化、RocketMQ原理应用及源码、Kafka原理应用及源码、ElasticSearch原理应用及源码、JUC源码、Netty源码、zk源码、Dubbo源码、Spring源码、Spring Boot源码、SCA源码、分布式锁源码、分布式事务、分库分表和TiDB、大型商品系统、大型订单系统等