在C#中使用RabbitMQ

官方提供了 6 中应用场景使用教程: https://www.rabbitmq.com/getstarted.html

使用消息队列的好处:

  1. 业务系统往往要求相应能力特别强,起到削峰填谷的作用。
  2. 解耦和高可用。如果一个系统挂了,不会影响到其他系统的运行。
  3. 业务系统往往有对消息的高可靠要求,以及有对复杂功能(如ACK)的要求。
  4. 增强业务系统的异步处理能力,减少甚至几乎不可能出现并发现象。

该案例使用 RabbitMQ.Client 包

  1. 基本用法
  <appSettings>
    <add key="AppID" value="150107"/>
    <add key="RabbitMQUri" value="amqp://test:123456@localhost:5672" />
  </appSettings>
/// <summary>
/// 发送消息
/// </summary>
public class Send
{
    private static readonly string appID = ConfigurationManager.AppSettings["AppID"];

    static void Main(string[] args)
    {
        var factory = new ConnectionFactory { Uri = ConfigurationManager.AppSettings["RabbitMQUri"] };
        using (var connection = factory.CreateConnection())
        {
            using (var channel = connection.CreateModel())
            {
                string queue = string.Format("MQ{0}.BaseStudy", appID);

                channel.QueueDeclare(queue, false, false, false, null);   //定义一个队列

                while (true)
                {
                    Console.Write("请输入要发送的消息:");
                    var message = Console.ReadLine();
                    var body = Encoding.UTF8.GetBytes(message);

                    channel.BasicPublish("", queue, null, body); //发送消息

                    Console.WriteLine("已发送的消息: {0}", message);
                }
            }
        }            
    }
}
/// <summary>
/// 接收消息
/// </summary>
public class Receive
{
    private static readonly string appID = ConfigurationManager.AppSettings["AppID"];

    static void Main(string[] args)
    {
        var factory = new ConnectionFactory { Uri = ConfigurationManager.AppSettings["RabbitMQUri"] };
        using (var connection = factory.CreateConnection())
        {
            using (var channel = connection.CreateModel())
            {
                string queue = string.Format("MQ{0}.BaseStudy", appID);

                channel.QueueDeclare(queue, false, false, false, null);   //定义一个队列

                Console.WriteLine("准备接收消息:");                    

                var consumer = new EventingBasicConsumer(channel);
                consumer.Received += (s, e) =>
                {
                    var body = e.Body;
                    var message = Encoding.UTF8.GetString(body);
                    Console.WriteLine("接收到的消息: {0}", message);
                };
                channel.BasicConsume(queue, true, consumer);  //开启消费者与通道、队列关联

                Console.ReadLine();
            }
        }            
    }
}
  1. 主题发布订阅
/// <summary>
/// 发送消息,采用主题模式
/// </summary>
public class Send
{
    private static readonly string appID = ConfigurationManager.AppSettings["AppID"];

    static void Main(string[] args)
    {
        var factory = new ConnectionFactory { Uri = ConfigurationManager.AppSettings["RabbitMQUri"] };
        using (var connection = factory.CreateConnection())
        {
            using (var channel = connection.CreateModel())
            {
                string exchange = string.Format("Ex{0}.Logs", appID);

                channel.ExchangeDeclare(exchange, "topic");    //声明创建一个交换机,交换机类型设定为‘topic’

                while (true)
                {
                    Console.Write("请输入要发送的消息,输入格式如'RoutingKey_Message':");
                    var keyWithMsg = Console.ReadLine();

                    args = keyWithMsg.Split('_');
                    var routingKey = args.Length > 1 ? args[0] : "*.rabbit";
                    var message = args.Length > 1 ? args[1] : "Hello World";
                    var body = Encoding.UTF8.GetBytes(message);

                    channel.BasicPublish(exchange, routingKey, null, body);    //发布消息

                    Console.WriteLine("已发送的消息: '{0}':'{1}'", routingKey, message);
                }
            }
        }            
    }
}
/// <summary>
/// 接收消息,采用主题模式
/// </summary>
public class Receive
{
    private static readonly string appID = ConfigurationManager.AppSettings["AppID"];

    static void Main(string[] args)
    {
        var factory = new ConnectionFactory { Uri = ConfigurationManager.AppSettings["RabbitMQUri"] };
        using (var connection = factory.CreateConnection())
        {
            using (var channel = connection.CreateModel())
            {
                string exchange = string.Format("Ex{0}.Logs", appID);                    

                channel.ExchangeDeclare(exchange, "topic");    //声明创建一个交换机,交换机类型设定为‘topic’
                var queueName = channel.QueueDeclare().QueueName;   //获取连接通道所使用的队列

                Console.Write("请输入准备监听的消息主题格式,格式如'*.rabbit'或者'info.*'或者'info.warning.error'等:");

                while (true)
                {
                    var bindingKey = Console.ReadLine();
                    channel.QueueBind(queueName, exchange, bindingKey);   //队列绑定到交换机

                    Console.WriteLine("准备接收消息");

                    var consumer = new EventingBasicConsumer(channel);
                    consumer.Received += (s, e) =>
                    {
                        var routingKey = e.RoutingKey;
                        var body = e.Body;
                        var message = Encoding.UTF8.GetString(body);
                        Console.WriteLine("接收到的消息: '{0}':'{1}'", routingKey, message);
                    };
                    channel.BasicConsume(queueName, true, consumer);    //开启消费者与通道、队列关联
                }
            }
        }
    }
}
  1. 高级特性

    /// <summary>
    /// 发送消息
    /// </summary>
    public class Send
    {
     private static readonly string appID = ConfigurationManager.AppSettings["AppID"];
    
     static void Main(string[] args)
     {
         var factory = new ConnectionFactory { Uri = ConfigurationManager.AppSettings["RabbitMQUri"] };
         using (var connection = factory.CreateConnection())
         {
             using (var channel = connection.CreateModel())
             {
                 string queue = string.Format("MQ{0}.TaskQueue", appID);
    
                 channel.QueueDeclare(queue, true, false, false, null);   //定义一个支持持久化的消息队列
                 var properties = channel.CreateBasicProperties();
                 properties.DeliveryMode = 2;    //1表示不持久,2表示持久化
    
                 Console.WriteLine("请注意:演示耗时较长的消息时,可通过发送带有‘.’的内容去模拟,每个‘.’加1秒!");
    
                 while (true)
                 {
                     Console.Write("请输入要发送的消息:");
                     var message = Console.ReadLine();
                     var body = Encoding.UTF8.GetBytes(message);
    
                     channel.BasicPublish("", queue, properties, body);   //发送消息
    
                     Console.WriteLine("已发送的消息: {0}", message);
                 }
             }
         }
     }
    }
    /// <summary>
    /// 接收消息
    /// </summary>
    public class Receive
    {
     private static readonly string appID = ConfigurationManager.AppSettings["AppID"];
    
     static void Main(string[] args)
     {
         var factory = new ConnectionFactory { Uri = ConfigurationManager.AppSettings["RabbitMQUri"] };
         using (var connection = factory.CreateConnection())
         {
             using (var channel = connection.CreateModel())
             {
                 string queue = string.Format("MQ{0}.TaskQueue", appID);
    
                 channel.QueueDeclare(queue, true, false, false, null);   //定义一个支持持久化的消息队列
    
                 channel.BasicQos(0, 1, false);  //在一个消费者还在处理消息且没响应消息之前,不要给他分发新的消息,而是将这条新的消息发送给下一个不那么忙碌的消费者
    
                 Console.WriteLine("准备接收消息:");
                 var consumer = new EventingBasicConsumer(channel);
                 consumer.Received += (s, e) =>
                 {
                     var message = Encoding.UTF8.GetString(e.Body);
                     SimulationTask(message);
    
                     channel.BasicAck(e.DeliveryTag, false); //手动Ack:用来确认消息已经被消费完成了
                 };
                 channel.BasicConsume(queue, false, consumer);    //开启消费者与通道、队列关联
                 //channel.BasicConsume(queue, true, consumer);    //开启消费者与通道、队列关联;自动Ack
    
                 Console.ReadLine();
             }
         }            
     }
    
     /// <summary>
     /// 模拟消息任务的处理过程
     /// </summary>
     /// <param name="message">消息</param>
     private static void SimulationTask(string message)
     {
         Console.WriteLine("接收的消息: {0}", message);
         int dots = message.Split('.').Length - 1;
         Thread.Sleep(dots * 1000);  
         Console.WriteLine("接收的消息处理完成,现在时间为{0}!", DateTime.Now);            
     }
    }
全部评论

相关推荐

不愿透露姓名的神秘牛友
05-29 22:21
Offer1:小马智行,深圳,测试开发工程师,17.0k*16.0,Offer2:追觅科技,深圳,嵌入式工程师,18.0k*15.0,
嵌软狗都不学:各位base深圳的同事,作为也是并肩作战的一员,今天想站在管理视角,和大家开诚布公地聊一聊:从近几个月的上下班数据对比看来,我们发现一个明显的差异:深圳同事的在岗时间普遍比苏州同事短。很多深圳同事早上9点之后才到公司,晚上不到 20 点就下班了;而总部那边,20点半甚至 22 点后还有不少同事在办公室忙碌,特别是研发团队,加班更是常态。相信去过苏州的同事,对这种场景都不陌生。我很好奇,这是因为苏州工作任务太重还是咱们深圳同事效率真的高到能在更短时间内完成工作?MOVA在深圳成立分公司是为了吸引更优秀的人才贡献更多更高质的价值,公司管理层给我反馈的是深圳招到的多是行业的专家大拿,大部分都是薪资比苏州高的,而且我们办公的租金等也远高于苏州的..MOVA虽脱胎于强壮的集团母体不久,各业务板块尚未实现全面盈利,虽说公司管理层目光长远,不纠结当下的人才投入,但行业内的普遍标准是,员工创造的价值要达到公司雇佣成本的 15 倍以上。大家不妨自我审视一下,自己是否达到了这个标准?如果是抱着划水、按时打卡走人拿毛爷爷的心态那不适合来MOVA,那样过下去不但自己过得尴尬也会影响MOVA这个大船的攻城略地的速度.我并非鼓励大家盲目加班,而是倡导高效工作,拒绝无效忙碌,不要让项目进度因低效受影响,也别把精力浪费在和苏州同事拼打卡时长上,提倡更高的人效比;考虑到两地地域和交通差异,相信大家会找最适合自己发挥的工作方式(比如按时下班后1小时到家晚饭后继续未竟工作等..)大家在遵守公司规章的情况下尽情地体现自己的能力价值,为MOV!和深圳公司争光我们在这边才能更安心更有信心的工作下去;请客BU长、名部门长、项目管理和各业务单元负责人,全面梳理团队情况,及时评估成员工作负荷与成果质量,坚决清退划水害虫痕疫,践行公司价值观,相互监督,防止管理漏洞及渎职。感谢人家的理解,也请人家多担待我的直言不讳……
点赞 评论 收藏
分享
AAA专业长城贴瓷砖刘大爷:这样的简历我会直接丢进垃圾桶,花里胡哨的
点赞 评论 收藏
分享
评论
点赞
收藏
分享

创作者周榜

更多
牛客网
牛客网在线编程
牛客网题解
牛客企业服务