在C#中使用RabbitMQ
官方提供了 6 中应用场景使用教程: https://www.rabbitmq.com/getstarted.html
使用消息队列的好处:
- 业务系统往往要求相应能力特别强,起到削峰填谷的作用。
- 解耦和高可用。如果一个系统挂了,不会影响到其他系统的运行。
- 业务系统往往有对消息的高可靠要求,以及有对复杂功能(如ACK)的要求。
- 增强业务系统的异步处理能力,减少甚至几乎不可能出现并发现象。
该案例使用 RabbitMQ.Client 包
- 基本用法
<appSettings>
<add key="AppID" value="150107"/>
<add key="RabbitMQUri" value="amqp://test:123456@localhost:5672" />
</appSettings>/// <summary>
/// 发送消息
/// </summary>
public class Send
{
private static readonly string appID = ConfigurationManager.AppSettings["AppID"];
static void Main(string[] args)
{
var factory = new ConnectionFactory { Uri = ConfigurationManager.AppSettings["RabbitMQUri"] };
using (var connection = factory.CreateConnection())
{
using (var channel = connection.CreateModel())
{
string queue = string.Format("MQ{0}.BaseStudy", appID);
channel.QueueDeclare(queue, false, false, false, null); //定义一个队列
while (true)
{
Console.Write("请输入要发送的消息:");
var message = Console.ReadLine();
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish("", queue, null, body); //发送消息
Console.WriteLine("已发送的消息: {0}", message);
}
}
}
}
}/// <summary>
/// 接收消息
/// </summary>
public class Receive
{
private static readonly string appID = ConfigurationManager.AppSettings["AppID"];
static void Main(string[] args)
{
var factory = new ConnectionFactory { Uri = ConfigurationManager.AppSettings["RabbitMQUri"] };
using (var connection = factory.CreateConnection())
{
using (var channel = connection.CreateModel())
{
string queue = string.Format("MQ{0}.BaseStudy", appID);
channel.QueueDeclare(queue, false, false, false, null); //定义一个队列
Console.WriteLine("准备接收消息:");
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (s, e) =>
{
var body = e.Body;
var message = Encoding.UTF8.GetString(body);
Console.WriteLine("接收到的消息: {0}", message);
};
channel.BasicConsume(queue, true, consumer); //开启消费者与通道、队列关联
Console.ReadLine();
}
}
}
}- 主题发布订阅
/// <summary>
/// 发送消息,采用主题模式
/// </summary>
public class Send
{
private static readonly string appID = ConfigurationManager.AppSettings["AppID"];
static void Main(string[] args)
{
var factory = new ConnectionFactory { Uri = ConfigurationManager.AppSettings["RabbitMQUri"] };
using (var connection = factory.CreateConnection())
{
using (var channel = connection.CreateModel())
{
string exchange = string.Format("Ex{0}.Logs", appID);
channel.ExchangeDeclare(exchange, "topic"); //声明创建一个交换机,交换机类型设定为‘topic’
while (true)
{
Console.Write("请输入要发送的消息,输入格式如'RoutingKey_Message':");
var keyWithMsg = Console.ReadLine();
args = keyWithMsg.Split('_');
var routingKey = args.Length > 1 ? args[0] : "*.rabbit";
var message = args.Length > 1 ? args[1] : "Hello World";
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange, routingKey, null, body); //发布消息
Console.WriteLine("已发送的消息: '{0}':'{1}'", routingKey, message);
}
}
}
}
}/// <summary>
/// 接收消息,采用主题模式
/// </summary>
public class Receive
{
private static readonly string appID = ConfigurationManager.AppSettings["AppID"];
static void Main(string[] args)
{
var factory = new ConnectionFactory { Uri = ConfigurationManager.AppSettings["RabbitMQUri"] };
using (var connection = factory.CreateConnection())
{
using (var channel = connection.CreateModel())
{
string exchange = string.Format("Ex{0}.Logs", appID);
channel.ExchangeDeclare(exchange, "topic"); //声明创建一个交换机,交换机类型设定为‘topic’
var queueName = channel.QueueDeclare().QueueName; //获取连接通道所使用的队列
Console.Write("请输入准备监听的消息主题格式,格式如'*.rabbit'或者'info.*'或者'info.warning.error'等:");
while (true)
{
var bindingKey = Console.ReadLine();
channel.QueueBind(queueName, exchange, bindingKey); //队列绑定到交换机
Console.WriteLine("准备接收消息");
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (s, e) =>
{
var routingKey = e.RoutingKey;
var body = e.Body;
var message = Encoding.UTF8.GetString(body);
Console.WriteLine("接收到的消息: '{0}':'{1}'", routingKey, message);
};
channel.BasicConsume(queueName, true, consumer); //开启消费者与通道、队列关联
}
}
}
}
}高级特性
/// <summary> /// 发送消息 /// </summary> public class Send { private static readonly string appID = ConfigurationManager.AppSettings["AppID"]; static void Main(string[] args) { var factory = new ConnectionFactory { Uri = ConfigurationManager.AppSettings["RabbitMQUri"] }; using (var connection = factory.CreateConnection()) { using (var channel = connection.CreateModel()) { string queue = string.Format("MQ{0}.TaskQueue", appID); channel.QueueDeclare(queue, true, false, false, null); //定义一个支持持久化的消息队列 var properties = channel.CreateBasicProperties(); properties.DeliveryMode = 2; //1表示不持久,2表示持久化 Console.WriteLine("请注意:演示耗时较长的消息时,可通过发送带有‘.’的内容去模拟,每个‘.’加1秒!"); while (true) { Console.Write("请输入要发送的消息:"); var message = Console.ReadLine(); var body = Encoding.UTF8.GetBytes(message); channel.BasicPublish("", queue, properties, body); //发送消息 Console.WriteLine("已发送的消息: {0}", message); } } } } }/// <summary> /// 接收消息 /// </summary> public class Receive { private static readonly string appID = ConfigurationManager.AppSettings["AppID"]; static void Main(string[] args) { var factory = new ConnectionFactory { Uri = ConfigurationManager.AppSettings["RabbitMQUri"] }; using (var connection = factory.CreateConnection()) { using (var channel = connection.CreateModel()) { string queue = string.Format("MQ{0}.TaskQueue", appID); channel.QueueDeclare(queue, true, false, false, null); //定义一个支持持久化的消息队列 channel.BasicQos(0, 1, false); //在一个消费者还在处理消息且没响应消息之前,不要给他分发新的消息,而是将这条新的消息发送给下一个不那么忙碌的消费者 Console.WriteLine("准备接收消息:"); var consumer = new EventingBasicConsumer(channel); consumer.Received += (s, e) => { var message = Encoding.UTF8.GetString(e.Body); SimulationTask(message); channel.BasicAck(e.DeliveryTag, false); //手动Ack:用来确认消息已经被消费完成了 }; channel.BasicConsume(queue, false, consumer); //开启消费者与通道、队列关联 //channel.BasicConsume(queue, true, consumer); //开启消费者与通道、队列关联;自动Ack Console.ReadLine(); } } } /// <summary> /// 模拟消息任务的处理过程 /// </summary> /// <param name="message">消息</param> private static void SimulationTask(string message) { Console.WriteLine("接收的消息: {0}", message); int dots = message.Split('.').Length - 1; Thread.Sleep(dots * 1000); Console.WriteLine("接收的消息处理完成,现在时间为{0}!", DateTime.Now); } }