RocketMQ特性--事务消息是个啥,咋发出去的呢?

1 概述
分布式事务时指在多个系统或者多个数据库中的多个操作要么全部成功,要么全部失败,并且需要满足ACID四个特性:
- A Atomicity,原子性。操作是一个整体,要么全部成功,要么全部失败
- C Consistency,一致性。事务操作前后,数据必须是一致的。
- I Isolation,隔离性。多个事务同时执行时,不能互相干扰。
- D Durability,持久性。一旦事务被提交,数据改变就是永久性的。
2 事务消息机制
2.1 生产者发送事务消息
事务消息的发送过程分为两个阶段,
- 发送事务消息
- 发送endTransaction消息
2.2 TransactionMQProducer类
发送过程的实现类是org.apache.rocketmq.client.producer.TransactionMQProducer,不仅能发送事务消息还能发送其他的消息。
public class TransactionMQProducer extends DefaultMQProducer { private TransactionCheckListener transactionCheckListener; private int checkThreadPoolMinSize = 1; private int checkThreadPoolMaxSize = 1; private int checkRequestHoldMax = 2000; private ExecutorService executorService; private TransactionListener transactionListener; }
2.2.3 transactionListener
transactionListener是事务监听器,主要功能是执行本地事务和执行事务回查。
public interface TransactionListener { LocalTransactionState executeLocalTransaction(final Message msg, final Object arg); LocalTransactionState checkLocalTransaction(final MessageExt msg); }
TransactionListener包含两个方法,
- executeLocalTransaction()方法执行本地事务
- checkLocalTransaction()方法是当生产者由于各种问题未发Commit或者Rollback消息给Broker时,Broker回调生产者查询本地事务状态的处理方法。
2.2.4 executorService Broker回查请求处理的线程池。
2.2.5 start()方法
start()方法添加了一个initTransactionEnv(),作用是初始化事务消息的环境信息。
@Override public void start() throws MQClientException { this.defaultMQProducerImpl.initTransactionEnv(); super.start(); }
从代码中我们可以看出,这里主要初始化了Broker回查请求处理的线程池,初始化的时候,可以指定初始化对象,当不指定的时候,将初始化一个单线程的线程池。
public void initTransactionEnv() { TransactionMQProducer producer = (TransactionMQProducer) this.defaultMQProducer; if (producer.getExecutorService() != null) { this.checkExecutor = producer.getExecutorService(); } else { this.checkRequestQueue = new LinkedBlockingQueue<Runnable>(producer.getCheckRequestHoldMax()); this.checkExecutor = new ThreadPoolExecutor( producer.getCheckThreadPoolMinSize(), producer.getCheckThreadPoolMaxSize(), 1000 * 60, TimeUnit.MILLISECONDS, this.checkRequestQueue); } }
2.2.6 shutdown()
关闭生产者,回收生产者资源。
@Override public void shutdown() { super.shutdown(); this.defaultMQProducerImpl.destroyTransactionEnv(); }
destroyTransactionEnv()销毁了事务回查线程池,清除了回查任务队列
3 发送事务消息的步骤
3.1 发送Half消息
事务消息的发送是通过sendMessageInTransaction方法来完成的。
@Override public TransactionSendResult sendMessageInTransaction(final Message msg, final Object arg) throws MQClientException { if (null == this.transactionListener) { throw new MQClientException("TransactionListener is null", null); } msg.setTopic(NamespaceUtil.wrapNamespace(this.getNamespace(), msg.getTopic())); return this.defaultMQProducerImpl.sendMessageInTransaction(msg, null, arg); }
3.1.1 数据校验
判断transactionListener的值是否为null,topic和Namespace的空判断以及一些基础校验。还校验了message:
Validators.checkMessage(msg, this.defaultMQProducer);
3.1.2 消息预处理
在方法sendMessageInTransaction()中,消息预处理主要是通过扩展字段设置消息类型。
//表示当前消息是事务half消息 MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true"); //设置消息的group name和扩展字段。 MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup());
3.1.3 发送事务消息
这里直接调用了同步发送消息的发送方法,之前写过一节专门讲这个的。 juejin.cn/post/712652…
try { sendResult = this.send(msg); } catch (Exception e) { throw new MQClientException("send message Exception", e); }
3.1.4 执行本地事务
消息发送成功之后,执行本地事务
LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW; Throwable localException = null; switch (sendResult.getSendStatus()) { case SEND_OK: { try { if (sendResult.getTransactionId() != null) { msg.putUserProperty("__transactionId__", sendResult.getTransactionId()); } String transactionId = msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX); if (null != transactionId && !"".equals(transactionId)) { msg.setTransactionId(transactionId); } if (null != localTransactionExecuter) { localTransactionState = localTransactionExecuter.executeLocalTransactionBranch(msg, arg); } else if (transactionListener != null) { log.debug("Used new transaction API"); localTransactionState = transactionListener.executeLocalTransaction(msg, arg); } if (null == localTransactionState) { localTransactionState = LocalTransactionState.UNKNOW; } if (localTransactionState != LocalTransactionState.COMMIT_MESSAGE) { log.info("executeLocalTransactionBranch return {}", localTransactionState); log.info(msg.toString()); } } catch (Throwable e) { log.info("executeLocalTransactionBranch exception", e); log.info(msg.toString()); localException = e; } } break; case FLUSH_DISK_TIMEOUT: case FLUSH_SLAVE_TIMEOUT: case SLAVE_NOT_AVAILABLE: localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE; break; default: break; }
3.2 发送Commit或者RollBack消息
本地事务处理完成之后,会调用endTransaction()方法,通知Broker进行Commit或者Rollback.
try { this.endTransaction(msg, sendResult, localTransactionState, localException); } catch (Exception e) { log.warn("local transaction execute " + localTransactionState + ", but end broker transaction failed", e); }
public void endTransaction( final Message msg, //Half消息的发送结果 final SendResult sendResult, //本地事务执行结果 final LocalTransactionState localTransactionState, final Throwable localException) throws RemotingException, MQBrokerException, InterruptedException, UnknownHostException { final MessageId id; if (sendResult.getOffsetMsgId() != null) { id = MessageDecoder.decodeMessageId(sendResult.getOffsetMsgId()); } else { id = MessageDecoder.decodeMessageId(sendResult.getMsgId()); } //事务消息的事务id String transactionId = sendResult.getTransactionId(); final String destBrokerName = this.mQClientFactory.getBrokerNameFromMessageQueue(defaultMQProducer.queueWithNamespace(sendResult.getMessageQueue())); //存储当前Half消息的Broker的服务器地址 final String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(destBrokerName); EndTransactionRequestHeader requestHeader = new EndTransactionRequestHeader(); requestHeader.setTransactionId(transactionId); requestHeader.setCommitLogOffset(id.getOffset()); switch (localTransactionState) { case COMMIT_MESSAGE: requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE); break; case ROLLBACK_MESSAGE: requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE); break; case UNKNOW: requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE); break; default: break; } doExecuteEndTransactionHook(msg, sendResult.getMsgId(), brokerAddr, localTransactionState, false); //生产者group requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup()); //消息位移 requestHeader.setTranStateTableOffset(sendResult.getQueueOffset()); requestHeader.setMsgId(sendResult.getMsgId()); String remark = localException != null ? ("executeLocalTransactionBranch exception: " + localException.toString()) : null; //以发送oneway消息的方式通知Broker进行Commit或者Rollback this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, requestHeader, remark, this.defaultMQProducer.getSendMsgTimeout()); }
至此,事务消息发送并处理完毕。
#Java##程序员#