你上手了RabbitMQ?我还真不信,先来看看它的交换机吧

楔子

本篇是消息队列RabbitMQ的第三弹。

RabbitMQ的入门和RabbitMQ+SpringBoot的整合可以点此链接进去回顾,今天要讲的是RabbitMQ的交换机。

本篇是理解RabbitMQ很重要的一篇,交换机是消息的第一站,只有理解了交换机的分发模式,我们才能知道不同交换机根据什么规则分发消息,才能明白在面对不同业务需求的时候应采用哪种交换机。

1. Exchange

 

先来放上几乎每篇都要出现一遍的我画了好久的RabbitMQ架构图。

前两篇文中我们一直没有显式的去使用Exchange,都是使用的默认Exchange,其实Exchange是一个非常关键的组件,有了它才有了各种消息分发模式。

我先简单说说Exchange有哪几种类型:

  1. fanout:Fanout-Exchange会将它接收到的消息发往所有与他绑定的Queue中。
  2. direct:Direct-Exchange会把它接收到的消息发往与它有绑定关系且Routingkey完全匹配的Queue中(默认)。
  3. topic:Topic-Exchange与Direct-Exchange相似,不过Topic-Exchange不需要全匹配,可以部分匹配,它约定:Routingkey为一个句点号“. ”分隔的字符串(我们将被句点号“. ”分隔开的每一段独立的字符串称为一个单词)。
  4. header:Header-Exchange不依赖于RoutingKey或绑定关系来分发消息,而是根据发送的消息内容中的headers属性进行匹配。此模式已经不再使用,本文中也不会去讲,大家知道即可。

本文中我们主要讲前三种Exchange方式,相信凭借着我简练的文字和灵魂的画技给大家好好讲讲,争取老妪能解。

Tip:本文的代码演示直接使用SpringBoot+RabbitMQ的模式。

2. Fanout-Exchange

先来看看Fanout-Exchange,Fanout-Exchange又称扇形交换机,这个交换机应该是最容易理解的。

 

Exchange和Queue建立一个绑定关系,Exchange会分发给所有和它有绑定关系的Queue中,绑定了十个Queue就把消息复制十份进行分发。

这种绑定关系为了效率肯定都会维护一张表,从算法效率上来说一般是O(1),所以Fanout-Exchange是这几个交换机中查找需要被分发队列最快的交换机。

下面是一段代码演示:

    @Bean
    public Queue fanout1() {        return new Queue("fanout1");
    }    @Bean    public Queue fanout2() {        return new Queue("fanout2");
    }    @Bean    public FanoutExchange fanoutExchange() {        // 三个构造参数:name durable autoDelete
        return new FanoutExchange("fanoutExchange", false, false);
    }    @Bean    public Binding binding1() {        return BindingBuilder.bind(fanout1()).to(fanoutExchange());
    }    @Bean    public Binding binding2() {        return BindingBuilder.bind(fanout2()).to(fanoutExchange());
    }

为了清晰明了,我新建了两个演示用的队列,然后建了一个FanoutExchange,最后给他们都设置上绑定关系,这样一组队列和交换机的绑定设置就算完成了。

紧接着编写一下生产者和消费者:

    public void sendFanout() {
        Client client = new Client();
        // 应读者要求,以后代码打印的地方都会改成log方式,这是一种良好的编程习惯,用System.out.println一般是不推荐的。
        log.info("Message content : " + client);
        rabbitTemplate.convertAndSend("fanoutExchange",null,client);
        System.out.println("消息发送完毕。");
    }    @Test    public void sendFanoutMessage() {        rabbitProduce.sendFanout();    }
@Slf4j
@Component("rabbitFanoutConsumer")
public class RabbitFanoutConsumer {    @RabbitListener(queues = "fanout1")
    public void onMessage1(Message message, Channel channel) throws Exception {        log.info("Message content : " + message);
        channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
        log.info("消息已确认");
    }    @RabbitListener(queues = "fanout2")
    public void onMessage2(Message message, Channel channel) throws Exception {
        log.info("Message content : " + message);
        channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
        log.info("消息已确认");
    }}

这两段代码都很好理解,不再赘述,有遗忘的可以去看RabbitMQ第一弹的内容。

其中发送消息的代码有三个参数,第一个参数是Exchange的名称,第二个参数是routingKey的名称,这个参数在扇形交换机里面用不到,在其他两个交换机类型里面会用到。

代码的准备到此结束,我们可以运行发送方法之后run一下了~

项目启动后,我们可以先来观察一下队列与交换机的绑定关系有没有生效,我们在RabbitMQ控制台使用rabbitmqctl list_bindings命令查看绑定关系。

 

关键部分我用红框标记了起来,这就代表着名叫fanoutExchange的交换机绑定着两个队列,一个叫fanout1,另一个叫fanout2。

紧接着,我们来看控制台的打印情况:

 

可以看到,一条信息发送出去之后,两个队列都接收到了这条消息,紧接着由我们的两个消费者消费。

Tip: 如果你的演示应用启动之后没有消费信息,可以尝试重新运行一次生产者的方法发送消息。

3. Direct-Exchange

Direct-Exchange是一种精准匹配的交换机,我们之前一直使用默认的交换机,其实默认的交换机就是Direct类型。

如果将Direct交换机都比作一所公寓的管理员,那么队列就是里面的住户。(绑定关系)

管理员每天都会收到各种各样的信件(消息),这些信件的地址不光要标明地址(ExchangeKey)还需要标明要送往哪一户(routingKey),不然消息无法投递。

 

以上图为例,准备一条消息发往名为SendService的直接交换机中去,这个交换机主要是用来做发送服务,所以其绑定了两个队列,SMS队列和MAIL队列,用于发送短信和邮件。

我们的消息除了指定ExchangeKey还需要指定routingKey,routingKey对应着最终要发送的是哪个队列,我们的示例中的routingKey是sms,这里这条消息就会交给SMS队列。

听了上面这段,可能大家对routingKey还不是很理解,我们上段代码实践一下,大家应该就明白了。

准备工作:

    @Bean
    public Queue directQueue1() {        return new Queue("directQueue1");
    }    @Bean    public Queue directQueue2() {        return new Queue("directQueue2");
    }    @Bean    public DirectExchange directExchange() {        // 三个构造参数:name durable autoDelete
        return new DirectExchange("directExchange", false, false);
    }    @Bean    public Binding directBinding1() {        return BindingBuilder.bind(directQueue1()).to(directExchange()).with("sms");
    }    @Bean    public Binding directBinding2() {        return BindingBuilder.bind(directQueue2()).to(directExchange()).with("mail");
    }

新建两个队列,新建了一个直接交换机,并设置了绑定关系。

这里的示例代码和上面扇形交换机的代码很像,唯一可以说不同的就是绑定的时候多调用了一个with将routingKey设置了上去。

所以是交换机和队列建立绑定关系的时候设置的routingKey,一个消息到达交换机之后,交换机通过消息上带来的routingKey找到自己与队列建立绑定关系时设置的routingKey,然后将消息分发到这个队列去。

生产者:

    public void sendDirect() {
        Client client = new Client();
        log.info("Message content : " + client);
        rabbitTemplate.convertAndSend("directExchange","sms",client);
        System.out.println("消息发送完毕。");
    }

消费者:

@Slf4j
@Component("rabbitDirectConsumer")
public class RabbitDirectConsumer {    @RabbitListener(queues = "directQueue1")
    public void onMessage1(Message message, Channel channel) throws Exception {        log.info("Message content : " + message);
        channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
        log.info("消息已确认");
    }    @RabbitListener(queues = "directQueue2")
    public void onMessage2(Message message, Channel channel) throws Exception {
        log.info("Message content : " + message);
        channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
        log.info("消息已确认");
    }}

效果图如下:

 

只有一个消费者进行了消息,符合我们的预期。

4. Topic-Exchange

Topic-Exchange是直接交换机的模糊匹配版本,Topic类型的交换器,支持使用"*"和"#"通配符定义模糊bindingKey,然后按照routingKey进行模糊匹配队列进行分发。

  • *:能够模糊匹配一个单词。
  • #:能够模糊匹配零个或多个单词。

因为加入了两个通配定义符,所以Topic交换机的routingKey也有些变化,routingKey可以使用.将单词分开。

这里我们直接来用一个例子说明会更加的清晰:

准备工作:

    // 主题交换机示例
    @Bean    public Queue topicQueue1() {        return new Queue("topicQueue1");
    }    @Bean    public Queue topicQueue2() {        return new Queue("topicQueue2");
    }    @Bean    public TopicExchange topicExchange() {        // 三个构造参数:name durable autoDelete
        return new TopicExchange("topicExchange", false, false);
    }    @Bean    public Binding topicBinding1() {        return BindingBuilder.bind(topicQueue1()).to(topicExchange()).with("sms.*");
    }    @Bean    public Binding topicBinding2() {        return BindingBuilder.bind(topicQueue2()).to(topicExchange()).with("mail.#");
    }

新建两个队列,新建了一个Topic交换机,并设置了绑定关系。

这里的示例代码我们主要看设置routingKey,这里的routingKey用上了通配符,且中间用.隔开,这就代表topicQueue1消费sms开头的消息,topicQueue2消费mail开头的消息,具体不同往下看。

生产者:

    public void sendTopic() {
        Client client = new Client();
        log.info("Message content : " + client);
        rabbitTemplate.convertAndSend("topicExchange","sms.liantong",client);
        System.out.println("消息发送完毕。");
    }

消费者:

@Slf4j
@Component("rabbitTopicConsumer")
public class RabbitTopicConsumer {    @RabbitListener(queues = "topicQueue1")
    public void onMessage1(Message message, Channel channel) throws Exception {        log.info("Message content : " + message);
        channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
        log.info("消息已确认");
    }    @RabbitListener(queues = "topicQueue2")
    public void onMessage2(Message message, Channel channel) throws Exception {
        log.info("Message content : " + message);
        channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
        log.info("消息已确认");
    }}

这里我们的生产者发送的消息routingKey是sms.liantong,它就会被发到topicQueue1队列中去,这里消息的routingKey也需要用.隔离开,用其他符号无法正确识别。

如果我们的routingKey是sms.123.liantong,那么它将无法找到对应的队列,因为topicQueue1的模糊匹配用的通配符是*而不是#,只有#是可以匹配多个单词的。

Topic-Exchange和Direct-Exchange很相似,我就不再赘述了,通配符*和#的区别也很简单,大家可以自己试一下。

后记

周一没更文实在惭愧,去医院抽血了,抽了三管~,吃多少才能补回来~

RabbitMQ已经更新了三篇了,这三篇的内容有些偏基础,下一篇将会更新高级部分内容:包括防止消息丢失,防止消息重复消费等等内容,希望大家持续关注。

作者:和耳朵

链接:https://juejin.im/post/6861959704705237000

来源:掘金

全部评论

相关推荐

上周组里招人,我面了六个候选人,回来跟同事吃饭的时候聊起一个让我挺感慨的现象。前三个候选人,算法题写得都不错。第一道二分查找,五分钟之内给出解法,边界条件也处理得干净。第二道动态规划,状态转移方程写对了,空间复杂度也优化了一版。我翻他们的简历,力扣刷题量都在300以上。后三个呢,就有点参差不齐了。有的边界条件没处理好,有的直接说这道题没刷过能不能换个思路讲讲。其中有一个女生,我印象特别深——她拿到题之后没有马上写,而是先问我:“面试官,我能先跟你确认一下我对题目的理解吗?”然后她把自己的思路讲了一遍,虽然最后代码写得不是最优解,但整个沟通过程非常顺畅。这个女生的代码不是最优的,但当我问她“如果这里是线上环境,你会怎么设计’的时候,她给我讲了一套完整的方案——异常怎么处理、日志怎么打、怎么平滑发布。她对这是之前在实习的时候踩过的坑。”我在想LeetCode到底在筛选什么?我自己的经历可能有点代表性。我当年校招的时候,也是刷了三百多道题才敢去面试。那时候大家都刷,你不刷就过不了笔试关。后来工作了,前三年基本没再打开过力扣。真正干活的时候,没人让你写反转链表,也没人让你手撕红黑树。更多的是:这个接口为什么慢了、那个服务为什么OOM了、线上数据对不上了得排查一下。所以后来我当面试官,慢慢调整了自己的评判标准。算法题我还会出,但目的变了。我出算法题,不是想看你能不能背出最优解。而是想看你拿到一个陌生问题的时候,是怎么思考的。你会先理清题意吗?你会主动问边界条件吗?你想不出来的时候会怎么办?你写出来的代码,变量命名乱不乱、结构清不清楚?这些才是工作中真正用得到的能力。LeetCode是一个工具,不是目的。它帮你熟悉数据结构和常见算法思路,这没问题。但如果你刷了三百道题,却说不清楚自己的项目解决了什么问题、遇到了什么困难、你是怎么解决的,那这三百道题可能真的白刷了。所以还要不要刷LeetCode?要刷,但别只刷题。刷题的时候,多问自己几个为什么:为什么用这个数据结构?为什么这个解法比那个好?如果换个条件,解法还成立吗?把刷题当成锻炼思维的方式,而不是背答案的任务。毕竟面试官想看到的,从来不是一台背题机器,而是一个能解决问题的人。
国企上岸了的向宇同桌...:最害怕答非所问了,但是频繁反问确定意思又害怕面试官觉得我笨
AI时代还有必要刷lee...
点赞 评论 收藏
分享
03-31 21:47
东南大学 C++
彭于晏前来求offe...:吓晕了
点赞 评论 收藏
分享
评论
点赞
收藏
分享

创作者周榜

更多
牛客网
牛客网在线编程
牛客网题解
牛客企业服务