RocketMQ特性--事务消息是个啥,咋发出去的呢?


1 概述

分布式事务时指在多个系统或者多个数据库中的多个操作要么全部成功,要么全部失败,并且需要满足ACID四个特性:

  • A Atomicity,原子性。操作是一个整体,要么全部成功,要么全部失败
  • C Consistency,一致性。事务操作前后,数据必须是一致的。
  • I Isolation,隔离性。多个事务同时执行时,不能互相干扰。
  • D Durability,持久性。一旦事务被提交,数据改变就是永久性的。

2 事务消息机制

2.1 生产者发送事务消息

事务消息的发送过程分为两个阶段,

  1. 发送事务消息
  2. 发送endTransaction消息

2.2 TransactionMQProducer类

发送过程的实现类是org.apache.rocketmq.client.producer.TransactionMQProducer,不仅能发送事务消息还能发送其他的消息。

public class TransactionMQProducer extends DefaultMQProducer { private TransactionCheckListener transactionCheckListener; private int checkThreadPoolMinSize = 1; private int checkThreadPoolMaxSize = 1; private int checkRequestHoldMax = 2000; private ExecutorService executorService; private TransactionListener transactionListener;
}

2.2.3 transactionListener

transactionListener是事务监听器,主要功能是执行本地事务和执行事务回查

public interface TransactionListener { LocalTransactionState executeLocalTransaction(final Message msg, final Object arg); LocalTransactionState checkLocalTransaction(final MessageExt msg);
}

TransactionListener包含两个方法,

  • executeLocalTransaction()方法执行本地事务
  • checkLocalTransaction()方法是当生产者由于各种问题未发Commit或者Rollback消息给Broker时,Broker回调生产者查询本地事务状态的处理方法。

2.2.4 executorService Broker回查请求处理的线程池。

2.2.5 start()方法

start()方法添加了一个initTransactionEnv(),作用是初始化事务消息的环境信息

@Override public void start() throws MQClientException { this.defaultMQProducerImpl.initTransactionEnv(); super.start();
}

从代码中我们可以看出,这里主要初始化了Broker回查请求处理的线程池,初始化的时候,可以指定初始化对象,当不指定的时候,将初始化一个单线程的线程池。

public void initTransactionEnv() {
    TransactionMQProducer producer = (TransactionMQProducer) this.defaultMQProducer; if (producer.getExecutorService() != null) { this.checkExecutor = producer.getExecutorService();
    } else { this.checkRequestQueue = new LinkedBlockingQueue<Runnable>(producer.getCheckRequestHoldMax()); this.checkExecutor = new ThreadPoolExecutor(
            producer.getCheckThreadPoolMinSize(),
            producer.getCheckThreadPoolMaxSize(), 1000 * 60,
            TimeUnit.MILLISECONDS, this.checkRequestQueue);
    }
}

2.2.6 shutdown()

关闭生产者,回收生产者资源。

@Override public void shutdown() { super.shutdown(); this.defaultMQProducerImpl.destroyTransactionEnv();
}

destroyTransactionEnv()销毁了事务回查线程池,清除了回查任务队列

3 发送事务消息的步骤

3.1 发送Half消息

事务消息的发送是通过sendMessageInTransaction方法来完成的。

@Override public TransactionSendResult sendMessageInTransaction(final Message msg, final Object arg) throws MQClientException { if (null == this.transactionListener) { throw new MQClientException("TransactionListener is null", null);
    }

    msg.setTopic(NamespaceUtil.wrapNamespace(this.getNamespace(), msg.getTopic())); return this.defaultMQProducerImpl.sendMessageInTransaction(msg, null, arg);
}

3.1.1 数据校验

判断transactionListener的值是否为null,topic和Namespace的空判断以及一些基础校验。还校验了message:

Validators.checkMessage(msg, this.defaultMQProducer);

3.1.2 消息预处理

在方法sendMessageInTransaction()中,消息预处理主要是通过扩展字段设置消息类型。

//表示当前消息是事务half消息 MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true"); //设置消息的group name和扩展字段。 MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup());

3.1.3 发送事务消息

这里直接调用了同步发送消息的发送方法,之前写过一节专门讲这个的。 juejin.cn/post/712652…

try {
    sendResult = this.send(msg);
} catch (Exception e) { throw new MQClientException("send message Exception", e);
}

3.1.4 执行本地事务

消息发送成功之后,执行本地事务

LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
Throwable localException = null; switch (sendResult.getSendStatus()) { case SEND_OK: { try { if (sendResult.getTransactionId() != null) {
                msg.putUserProperty("__transactionId__", sendResult.getTransactionId());
            } String transactionId = msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX); if (null != transactionId && !"".equals(transactionId)) {
                msg.setTransactionId(transactionId);
            } if (null != localTransactionExecuter) {
                localTransactionState = localTransactionExecuter.executeLocalTransactionBranch(msg, arg);
            } else if (transactionListener != null) {
                log.debug("Used new transaction API");
                localTransactionState = transactionListener.executeLocalTransaction(msg, arg);
            } if (null == localTransactionState) {
                localTransactionState = LocalTransactionState.UNKNOW;
            } if (localTransactionState != LocalTransactionState.COMMIT_MESSAGE) {
                log.info("executeLocalTransactionBranch return {}", localTransactionState);
                log.info(msg.toString());
            }
        } catch (Throwable e) {
            log.info("executeLocalTransactionBranch exception", e);
            log.info(msg.toString());
            localException = e;
        }
    } break; case FLUSH_DISK_TIMEOUT: case FLUSH_SLAVE_TIMEOUT: case SLAVE_NOT_AVAILABLE:
        localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE; break; default: break;
}

3.2 发送Commit或者RollBack消息

本地事务处理完成之后,会调用endTransaction()方法,通知Broker进行Commit或者Rollback.

try { this.endTransaction(msg, sendResult, localTransactionState, localException);
} catch (Exception e) { log.warn("local transaction execute " + localTransactionState + ", but end broker transaction failed", e);
}
public void endTransaction( final Message msg,
    //Half消息的发送结果 final SendResult sendResult,
    //本地事务执行结果 final LocalTransactionState localTransactionState, final Throwable localException) throws RemotingException, MQBrokerException, InterruptedException, UnknownHostException { final MessageId id; if (sendResult.getOffsetMsgId() != null) {
        id = MessageDecoder.decodeMessageId(sendResult.getOffsetMsgId());
    } else {
        id = MessageDecoder.decodeMessageId(sendResult.getMsgId());
    } //事务消息的事务id String transactionId = sendResult.getTransactionId(); final String destBrokerName = this.mQClientFactory.getBrokerNameFromMessageQueue(defaultMQProducer.queueWithNamespace(sendResult.getMessageQueue())); //存储当前Half消息的Broker的服务器地址 final String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(destBrokerName);
    EndTransactionRequestHeader requestHeader = new EndTransactionRequestHeader();
    requestHeader.setTransactionId(transactionId);
    requestHeader.setCommitLogOffset(id.getOffset()); switch (localTransactionState) { case COMMIT_MESSAGE:
            requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE); break; case ROLLBACK_MESSAGE:
            requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE); break; case UNKNOW:
            requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE); break; default: break;
    }

    doExecuteEndTransactionHook(msg, sendResult.getMsgId(), brokerAddr, localTransactionState, false); //生产者group requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup()); //消息位移 requestHeader.setTranStateTableOffset(sendResult.getQueueOffset());
    requestHeader.setMsgId(sendResult.getMsgId());
    String remark = localException != null ? ("executeLocalTransactionBranch exception: " + localException.toString()) : null; //以发送oneway消息的方式通知Broker进行Commit或者Rollback this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, requestHeader, remark, this.defaultMQProducer.getSendMsgTimeout());
}

至此,事务消息发送并处理完毕。

#Java##程序员#
全部评论

相关推荐

昨天 16:14
已编辑
西安邮电大学 golang
不止遇到一次了,什么都不会,让提合并请求,问什么是合并请求。让gitlab.页面把测试截图附上,不知道截图要放在哪,那么大的编辑看不到吗让配开发机,问ip是什么东西……这都咋进来的啊,我们(我2023年毕业)那会儿没AI的时候面试都是直接linux,docker,k8s,git,结构与算法,计网。怎么才过去2年,实习生跟傻子一样,有些问题问的我难受,不会git&nbsp;commit,不会git&nbsp;pull,不会切换分支,直接要覆盖master....————而且态度非常敷衍,3天前给开个仓库权限,连本地都没有拉下来。让写一个小文档,都是说一句,写一句,说把目录加上,挺嗤之以鼻,最后还是把目录加上了😂😂任何文档和注释都是方便后来人的,现在的人真的很自负啊,打开github看看任何一个开源项目的文档和注释,都写的很详细。难道现在的同学在校期间不经常拉开源项目看源码学习吗?&nbsp;哪怕是一个swap函数,开源项目里都经常注释:1&nbsp;3&nbsp;5&nbsp;7&nbsp;9&nbsp;2&nbsp;4&nbsp;6&nbsp;8&nbsp;10^&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;^l&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;rswap:{功能描述}{使用样例}————给我气笑了,没次问我有什么任务的时候,我都是说,优先你学校导师的项目,然后再做公司需求。然后给了两个需求,一个月内搞定就行,既然是agent开发,1.&nbsp;部署需要维护项目的开发环境2.阅读opencode/openclaude代码(我个人感觉龙虾的源码agent部分很常规,就一个channel+agent,还不如看claude泄露的代码和opencode)然后任务1搞了几周说因为环境问题,他申请到的远程开发机是linux,装的python2,项目是py3的,所以没搭建,我说你不行就用conda或docker把环境屏蔽了呢,没搭理我。任务2:看了很长时间代码,给我回了一句,opencode和openclaude是用go写的……我说你打开github看右下角那的语言是ts还是go……&nbsp;结果满脸懵的说ts是什么……我让看agent&nbsp;loop,哪怕全局搜索一下while(true),跳过去从头看到尾就大致清楚了,压根没看。————嘻嘻,我已经开始做社招简历了。
redf1sh:默认会git结果发现真不会,这种一看就是没做过项目的,真做过项目的至少会提交
点赞 评论 收藏
分享
zaakfung:26届不应该春招吗 为啥还实习
点赞 评论 收藏
分享
评论
点赞
收藏
分享

创作者周榜

更多
牛客网
牛客网在线编程
牛客网题解
牛客企业服务