在C#中使用RabbitMQ

官方提供了 6 中应用场景使用教程: https://www.rabbitmq.com/getstarted.html

使用消息队列的好处:

  1. 业务系统往往要求相应能力特别强,起到削峰填谷的作用。
  2. 解耦和高可用。如果一个系统挂了,不会影响到其他系统的运行。
  3. 业务系统往往有对消息的高可靠要求,以及有对复杂功能(如ACK)的要求。
  4. 增强业务系统的异步处理能力,减少甚至几乎不可能出现并发现象。

该案例使用 RabbitMQ.Client 包

  1. 基本用法
  <appSettings>
    <add key="AppID" value="150107"/>
    <add key="RabbitMQUri" value="amqp://test:123456@localhost:5672" />
  </appSettings>
/// <summary>
/// 发送消息
/// </summary>
public class Send
{
    private static readonly string appID = ConfigurationManager.AppSettings["AppID"];

    static void Main(string[] args)
    {
        var factory = new ConnectionFactory { Uri = ConfigurationManager.AppSettings["RabbitMQUri"] };
        using (var connection = factory.CreateConnection())
        {
            using (var channel = connection.CreateModel())
            {
                string queue = string.Format("MQ{0}.BaseStudy", appID);

                channel.QueueDeclare(queue, false, false, false, null);   //定义一个队列

                while (true)
                {
                    Console.Write("请输入要发送的消息:");
                    var message = Console.ReadLine();
                    var body = Encoding.UTF8.GetBytes(message);

                    channel.BasicPublish("", queue, null, body); //发送消息

                    Console.WriteLine("已发送的消息: {0}", message);
                }
            }
        }            
    }
}
/// <summary>
/// 接收消息
/// </summary>
public class Receive
{
    private static readonly string appID = ConfigurationManager.AppSettings["AppID"];

    static void Main(string[] args)
    {
        var factory = new ConnectionFactory { Uri = ConfigurationManager.AppSettings["RabbitMQUri"] };
        using (var connection = factory.CreateConnection())
        {
            using (var channel = connection.CreateModel())
            {
                string queue = string.Format("MQ{0}.BaseStudy", appID);

                channel.QueueDeclare(queue, false, false, false, null);   //定义一个队列

                Console.WriteLine("准备接收消息:");                    

                var consumer = new EventingBasicConsumer(channel);
                consumer.Received += (s, e) =>
                {
                    var body = e.Body;
                    var message = Encoding.UTF8.GetString(body);
                    Console.WriteLine("接收到的消息: {0}", message);
                };
                channel.BasicConsume(queue, true, consumer);  //开启消费者与通道、队列关联

                Console.ReadLine();
            }
        }            
    }
}
  1. 主题发布订阅
/// <summary>
/// 发送消息,采用主题模式
/// </summary>
public class Send
{
    private static readonly string appID = ConfigurationManager.AppSettings["AppID"];

    static void Main(string[] args)
    {
        var factory = new ConnectionFactory { Uri = ConfigurationManager.AppSettings["RabbitMQUri"] };
        using (var connection = factory.CreateConnection())
        {
            using (var channel = connection.CreateModel())
            {
                string exchange = string.Format("Ex{0}.Logs", appID);

                channel.ExchangeDeclare(exchange, "topic");    //声明创建一个交换机,交换机类型设定为‘topic’

                while (true)
                {
                    Console.Write("请输入要发送的消息,输入格式如'RoutingKey_Message':");
                    var keyWithMsg = Console.ReadLine();

                    args = keyWithMsg.Split('_');
                    var routingKey = args.Length > 1 ? args[0] : "*.rabbit";
                    var message = args.Length > 1 ? args[1] : "Hello World";
                    var body = Encoding.UTF8.GetBytes(message);

                    channel.BasicPublish(exchange, routingKey, null, body);    //发布消息

                    Console.WriteLine("已发送的消息: '{0}':'{1}'", routingKey, message);
                }
            }
        }            
    }
}
/// <summary>
/// 接收消息,采用主题模式
/// </summary>
public class Receive
{
    private static readonly string appID = ConfigurationManager.AppSettings["AppID"];

    static void Main(string[] args)
    {
        var factory = new ConnectionFactory { Uri = ConfigurationManager.AppSettings["RabbitMQUri"] };
        using (var connection = factory.CreateConnection())
        {
            using (var channel = connection.CreateModel())
            {
                string exchange = string.Format("Ex{0}.Logs", appID);                    

                channel.ExchangeDeclare(exchange, "topic");    //声明创建一个交换机,交换机类型设定为‘topic’
                var queueName = channel.QueueDeclare().QueueName;   //获取连接通道所使用的队列

                Console.Write("请输入准备监听的消息主题格式,格式如'*.rabbit'或者'info.*'或者'info.warning.error'等:");

                while (true)
                {
                    var bindingKey = Console.ReadLine();
                    channel.QueueBind(queueName, exchange, bindingKey);   //队列绑定到交换机

                    Console.WriteLine("准备接收消息");

                    var consumer = new EventingBasicConsumer(channel);
                    consumer.Received += (s, e) =>
                    {
                        var routingKey = e.RoutingKey;
                        var body = e.Body;
                        var message = Encoding.UTF8.GetString(body);
                        Console.WriteLine("接收到的消息: '{0}':'{1}'", routingKey, message);
                    };
                    channel.BasicConsume(queueName, true, consumer);    //开启消费者与通道、队列关联
                }
            }
        }
    }
}
  1. 高级特性

    /// <summary>
    /// 发送消息
    /// </summary>
    public class Send
    {
     private static readonly string appID = ConfigurationManager.AppSettings["AppID"];
    
     static void Main(string[] args)
     {
         var factory = new ConnectionFactory { Uri = ConfigurationManager.AppSettings["RabbitMQUri"] };
         using (var connection = factory.CreateConnection())
         {
             using (var channel = connection.CreateModel())
             {
                 string queue = string.Format("MQ{0}.TaskQueue", appID);
    
                 channel.QueueDeclare(queue, true, false, false, null);   //定义一个支持持久化的消息队列
                 var properties = channel.CreateBasicProperties();
                 properties.DeliveryMode = 2;    //1表示不持久,2表示持久化
    
                 Console.WriteLine("请注意:演示耗时较长的消息时,可通过发送带有‘.’的内容去模拟,每个‘.’加1秒!");
    
                 while (true)
                 {
                     Console.Write("请输入要发送的消息:");
                     var message = Console.ReadLine();
                     var body = Encoding.UTF8.GetBytes(message);
    
                     channel.BasicPublish("", queue, properties, body);   //发送消息
    
                     Console.WriteLine("已发送的消息: {0}", message);
                 }
             }
         }
     }
    }
    /// <summary>
    /// 接收消息
    /// </summary>
    public class Receive
    {
     private static readonly string appID = ConfigurationManager.AppSettings["AppID"];
    
     static void Main(string[] args)
     {
         var factory = new ConnectionFactory { Uri = ConfigurationManager.AppSettings["RabbitMQUri"] };
         using (var connection = factory.CreateConnection())
         {
             using (var channel = connection.CreateModel())
             {
                 string queue = string.Format("MQ{0}.TaskQueue", appID);
    
                 channel.QueueDeclare(queue, true, false, false, null);   //定义一个支持持久化的消息队列
    
                 channel.BasicQos(0, 1, false);  //在一个消费者还在处理消息且没响应消息之前,不要给他分发新的消息,而是将这条新的消息发送给下一个不那么忙碌的消费者
    
                 Console.WriteLine("准备接收消息:");
                 var consumer = new EventingBasicConsumer(channel);
                 consumer.Received += (s, e) =>
                 {
                     var message = Encoding.UTF8.GetString(e.Body);
                     SimulationTask(message);
    
                     channel.BasicAck(e.DeliveryTag, false); //手动Ack:用来确认消息已经被消费完成了
                 };
                 channel.BasicConsume(queue, false, consumer);    //开启消费者与通道、队列关联
                 //channel.BasicConsume(queue, true, consumer);    //开启消费者与通道、队列关联;自动Ack
    
                 Console.ReadLine();
             }
         }            
     }
    
     /// <summary>
     /// 模拟消息任务的处理过程
     /// </summary>
     /// <param name="message">消息</param>
     private static void SimulationTask(string message)
     {
         Console.WriteLine("接收的消息: {0}", message);
         int dots = message.Split('.').Length - 1;
         Thread.Sleep(dots * 1000);  
         Console.WriteLine("接收的消息处理完成,现在时间为{0}!", DateTime.Now);            
     }
    }
全部评论

相关推荐

点赞 收藏 评论
分享
牛客网
牛客企业服务