RocketMQ 消息发送方式

以下以RocketMQ为例,展示如何实现消息的同步发送、异步发送、单向发送、延迟发送和批量发送:

1. 同步发送

同步发送是指生产者发送消息后,会阻塞等待Broker的响应,直到收到发送结果。这种方式可靠性高,适用于对消息发送结果有确认需求的场景,比如重要的业务通知消息。

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;

public class SyncProducer {
    public static void main(String[] args) throws Exception {
        // 创建生产者实例
        DefaultMQProducer producer = new DefaultMQProducer("SyncProducerGroup");
        // 设置NameServer地址
        producer.setNamesrvAddr("localhost:9876");
        // 启动生产者
        producer.start();

        // 创建消息实例,指定Topic、Tag和消息内容
        Message msg = new Message("SyncTopic",
                "TagA",
                "Hello RocketMQ Sync Message".getBytes(RemotingHelper.DEFAULT_CHARSET));

        // 同步发送消息
        SendResult sendResult = producer.send(msg);
        System.out.printf("%s%n", sendResult);

        // 关闭生产者
        producer.shutdown();
    }
}

2. 异步发送

异步发送时,生产者发送消息后不会阻塞等待响应,而是继续执行后续代码。当Broker返回响应时,通过回调函数来处理发送结果。这种方式适用于对响应时间敏感,但仍需确保消息发送成功的场景,例如用户注册成功后的异步通知。

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;

public class AsyncProducer {
    public static void main(String[] args) throws Exception {
        // 创建生产者实例
        DefaultMQProducer producer = new DefaultMQProducer("AsyncProducerGroup");
        // 设置NameServer地址
        producer.setNamesrvAddr("localhost:9876");
        // 启动生产者
        producer.start();

        // 创建消息实例,指定Topic、Tag和消息内容
        Message msg = new Message("AsyncTopic",
                "TagA",
                "Hello RocketMQ Async Message".getBytes(RemotingHelper.DEFAULT_CHARSET));

        // 异步发送消息
        producer.send(msg, new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                System.out.printf("异步发送成功: %s%n", sendResult);
            }

            @Override
            public void onException(Throwable e) {
                System.out.printf("异步发送失败: %s%n", e);
            }
        });

        // 防止主线程退出
        Thread.sleep(5000);
        // 关闭生产者
        producer.shutdown();
    }
}

3. 单向发送

单向发送是指生产者发送消息后,不关心Broker的响应,直接继续执行后续代码。这种方式发送速度最快,但无法确认消息是否成功发送到Broker,适用于对消息可靠性要求不高,只追求发送速度的场景,如日志记录。

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;

public class OnewayProducer {
    public static void main(String[] args) throws Exception {
        // 创建生产者实例
        DefaultMQProducer producer = new DefaultMQProducer("OnewayProducerGroup");
        // 设置NameServer地址
        producer.setNamesrvAddr("localhost:9876");
        // 启动生产者
        producer.start();

        // 创建消息实例,指定Topic、Tag和消息内容
        Message msg = new Message("OnewayTopic",
                "TagA",
                "Hello RocketMQ Oneway Message".getBytes(RemotingHelper.DEFAULT_CHARSET));

        // 单向发送消息
        producer.sendOneway(msg);
        System.out.println("单向消息已发送");

        // 关闭生产者
        producer.shutdown();
    }
}

4. 延迟发送

RocketMQ支持延迟消息,通过设置消息的延迟级别来控制消息在Broker中延迟多久被投递。目前RocketMQ默认的延迟级别为1s, 5s, 10s, 30s, 1m, 2m, 3m, 4m, 5m, 6m, 7m, 8m, 9m, 10m, 20m, 30m, 1h, 2h。

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;

public class DelayProducer {
    public static void main(String[] args) throws Exception {
        // 创建生产者实例
        DefaultMQProducer producer = new DefaultMQProducer("DelayProducerGroup");
        // 设置NameServer地址
        producer.setNamesrvAddr("localhost:9876");
        // 启动生产者
        producer.start();

        // 创建消息实例,指定Topic、Tag和消息内容
        Message msg = new Message("DelayTopic",
                "TagA",
                "Hello RocketMQ Delay Message".getBytes(RemotingHelper.DEFAULT_CHARSET));
        // 设置延迟级别为3级,即延迟10s
        msg.setDelayTimeLevel(3);

        // 发送延迟消息
        SendResult sendResult = producer.send(msg);
        System.out.printf("%s%n", sendResult);

        // 关闭生产者
        producer.shutdown();
    }
}

5. 批量发送

批量发送可以将多个消息合并成一个批次发送到Broker,减少网络通信开销,提高发送效率。但需要注意的是,批量消息的总大小不能超过Broker允许的最大消息大小(默认4MB)。

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;

import java.util.ArrayList;
import java.util.List;

public class BatchProducer {
    public static void main(String[] args) throws Exception {
        // 创建生产者实例
        DefaultMQProducer producer = new DefaultMQProducer("BatchProducerGroup");
        // 设置NameServer地址
        producer.setNamesrvAddr("localhost:9876");
        // 启动生产者
        producer.start();

        List<Message> messages = new ArrayList<>();
        messages.add(new Message("BatchTopic",
                "TagA",
                "Message 1".getBytes(RemotingHelper.DEFAULT_CHARSET)));
        messages.add(new Message("BatchTopic",
                "TagA",
                "Message 2".getBytes(RemotingHelper.DEFAULT_CHARSET)));
        messages.add(new Message("BatchTopic",
                "TagA",
                "Message 3".getBytes(RemotingHelper.DEFAULT_CHARSET)));

        // 批量发送消息
        SendResult sendResult = producer.send(messages);
        System.out.printf("%s%n", sendResult);

        // 关闭生产者
        producer.shutdown();
    }
}

上述代码基于RocketMQ 4.x版本,不同版本在API上可能略有差异,实际使用时请参考对应版本的官方文档。同时,确保RocketMQ服务已正确安装和启动,并根据实际情况调整NameServer地址等配置。

#牛客创作赏金赛#
全部评论

相关推荐

评论
点赞
2
分享

创作者周榜

更多
牛客网
牛客企业服务