RocketMQ 消息发送方式
以下以RocketMQ为例,展示如何实现消息的同步发送、异步发送、单向发送、延迟发送和批量发送:
1. 同步发送
同步发送是指生产者发送消息后,会阻塞等待Broker的响应,直到收到发送结果。这种方式可靠性高,适用于对消息发送结果有确认需求的场景,比如重要的业务通知消息。
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
public class SyncProducer {
public static void main(String[] args) throws Exception {
// 创建生产者实例
DefaultMQProducer producer = new DefaultMQProducer("SyncProducerGroup");
// 设置NameServer地址
producer.setNamesrvAddr("localhost:9876");
// 启动生产者
producer.start();
// 创建消息实例,指定Topic、Tag和消息内容
Message msg = new Message("SyncTopic",
"TagA",
"Hello RocketMQ Sync Message".getBytes(RemotingHelper.DEFAULT_CHARSET));
// 同步发送消息
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
// 关闭生产者
producer.shutdown();
}
}
2. 异步发送
异步发送时,生产者发送消息后不会阻塞等待响应,而是继续执行后续代码。当Broker返回响应时,通过回调函数来处理发送结果。这种方式适用于对响应时间敏感,但仍需确保消息发送成功的场景,例如用户注册成功后的异步通知。
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
public class AsyncProducer {
public static void main(String[] args) throws Exception {
// 创建生产者实例
DefaultMQProducer producer = new DefaultMQProducer("AsyncProducerGroup");
// 设置NameServer地址
producer.setNamesrvAddr("localhost:9876");
// 启动生产者
producer.start();
// 创建消息实例,指定Topic、Tag和消息内容
Message msg = new Message("AsyncTopic",
"TagA",
"Hello RocketMQ Async Message".getBytes(RemotingHelper.DEFAULT_CHARSET));
// 异步发送消息
producer.send(msg, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.printf("异步发送成功: %s%n", sendResult);
}
@Override
public void onException(Throwable e) {
System.out.printf("异步发送失败: %s%n", e);
}
});
// 防止主线程退出
Thread.sleep(5000);
// 关闭生产者
producer.shutdown();
}
}
3. 单向发送
单向发送是指生产者发送消息后,不关心Broker的响应,直接继续执行后续代码。这种方式发送速度最快,但无法确认消息是否成功发送到Broker,适用于对消息可靠性要求不高,只追求发送速度的场景,如日志记录。
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
public class OnewayProducer {
public static void main(String[] args) throws Exception {
// 创建生产者实例
DefaultMQProducer producer = new DefaultMQProducer("OnewayProducerGroup");
// 设置NameServer地址
producer.setNamesrvAddr("localhost:9876");
// 启动生产者
producer.start();
// 创建消息实例,指定Topic、Tag和消息内容
Message msg = new Message("OnewayTopic",
"TagA",
"Hello RocketMQ Oneway Message".getBytes(RemotingHelper.DEFAULT_CHARSET));
// 单向发送消息
producer.sendOneway(msg);
System.out.println("单向消息已发送");
// 关闭生产者
producer.shutdown();
}
}
4. 延迟发送
RocketMQ支持延迟消息,通过设置消息的延迟级别来控制消息在Broker中延迟多久被投递。目前RocketMQ默认的延迟级别为1s, 5s, 10s, 30s, 1m, 2m, 3m, 4m, 5m, 6m, 7m, 8m, 9m, 10m, 20m, 30m, 1h, 2h。
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
public class DelayProducer {
public static void main(String[] args) throws Exception {
// 创建生产者实例
DefaultMQProducer producer = new DefaultMQProducer("DelayProducerGroup");
// 设置NameServer地址
producer.setNamesrvAddr("localhost:9876");
// 启动生产者
producer.start();
// 创建消息实例,指定Topic、Tag和消息内容
Message msg = new Message("DelayTopic",
"TagA",
"Hello RocketMQ Delay Message".getBytes(RemotingHelper.DEFAULT_CHARSET));
// 设置延迟级别为3级,即延迟10s
msg.setDelayTimeLevel(3);
// 发送延迟消息
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
// 关闭生产者
producer.shutdown();
}
}
5. 批量发送
批量发送可以将多个消息合并成一个批次发送到Broker,减少网络通信开销,提高发送效率。但需要注意的是,批量消息的总大小不能超过Broker允许的最大消息大小(默认4MB)。
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import java.util.ArrayList;
import java.util.List;
public class BatchProducer {
public static void main(String[] args) throws Exception {
// 创建生产者实例
DefaultMQProducer producer = new DefaultMQProducer("BatchProducerGroup");
// 设置NameServer地址
producer.setNamesrvAddr("localhost:9876");
// 启动生产者
producer.start();
List<Message> messages = new ArrayList<>();
messages.add(new Message("BatchTopic",
"TagA",
"Message 1".getBytes(RemotingHelper.DEFAULT_CHARSET)));
messages.add(new Message("BatchTopic",
"TagA",
"Message 2".getBytes(RemotingHelper.DEFAULT_CHARSET)));
messages.add(new Message("BatchTopic",
"TagA",
"Message 3".getBytes(RemotingHelper.DEFAULT_CHARSET)));
// 批量发送消息
SendResult sendResult = producer.send(messages);
System.out.printf("%s%n", sendResult);
// 关闭生产者
producer.shutdown();
}
}
上述代码基于RocketMQ 4.x版本,不同版本在API上可能略有差异,实际使用时请参考对应版本的官方文档。同时,确保RocketMQ服务已正确安装和启动,并根据实际情况调整NameServer地址等配置。
#牛客创作赏金赛#