目录
- 什么是RabbitMQ?
- 为什么要使用RabbitMQ?
- 如何安装RabbitMQ?
- RabbitMQ的基本概念
- 生产者
- 消费者
- Connection
- Channel
- Exchange
- Queue
- 如何在.NET Core中使用RabbitMQ?
- RabbitMQ的工作模式
- Work Queue 工作队列模式
- publish/subscribe
- routing模式
- topic通配符模式
- header模式
什么是RabbitMQ?
为什么要使用RabbitMQ?
- 异步处理:比如发送邮件,发送短信等不需要等待处理结果的操作
- 应用解耦:比如下单成功后,通知仓库发货,不需要等待仓库回应,通过消息队列去通知仓库,降低应用间耦合程序,可并行开发两个功能模块
- 流量削锋:在抢购或者其他的活动页,服务处于爆发式请求状态,如果直连数据库,数据库容易被拖垮。抢购商品也容易出现库存超卖的情况。通过队列可有效解决该问题。
- 日志处理:在单机中,日志直接写入到文件目录中,但是在分布式应用中,日志需要有统一的处理机制,可通过消息队列统一由某个消费端做处理。
- 消息通信:如生产端和消费端可通过队列进行异步通信
如何安装RabbitMQ?
Windows端
1.安装erlang语言运行环境
https://erlang.org/download/otp_win64_23.2.exe
下载后直接下一步即可
2.安装RabbitMQ
https://www.rabbitmq.com/install-windows.html
直接点击安装下一步即可按章
3.安装RabbitMQ的Web管理平台
RabbitMQ的管理平台是通过插件的形式使用,需要手动启用管理平台
在Windows下,RabbitMQ默认被安装到C:\Program Files\RabbitMQ Server\rabbitmq_server-3.8.14 下。
打开sbin ,在cmd或者powershell中执行
rabbitmq-plugins.bat enable rabbitmq_management
安装完成后,浏览器打开 http://localhost:15672/ 即可看到RabbitMQ的管理界面。输入默认账号密码 guest 成功登录。
Linux环境安装
1.Ubuntu:https://www.rabbitmq.com/install-debian.html
2.Centos:https://www.rabbitmq.com/install-rpm.html
RabbitMQ的基本概念
生产者
发送消息的端
消费者
获取消息并处理的端
Connection
一个终端连接。每一个Connection都可以在RabbitMQ后台看到
Channel
Channel是建立在Connection上的一个虚拟通信管道。一般情况下,往消息队列中写入多条消息,为了不每条消息都建立一个TCP连接,所以RabbitMQ的做法是多条消息可以公用一个Connection,大大提高MQ的负载能力。
Exchange
Exchange是一个虚拟交换机。每一条消息都必须要通过交换机才能能进入对应的队列,可以理解为网络设备中的交换机,是一个意思。
Queue
Queue是一个存储消息的内部对象,所有的Rabbit MQ消息都存储在Queue中。生产者所生产的消息会存储在Queue中,消费者获取的消息也是从Queue中获取。
如何在.NET Core中使用RabbitMQ?
nuget安装
dotnet add package RabbitMQ.Client
创建生产者
const string QUEUENAME = "HELLO_MQ";
//创建连接对象工厂
var factory = new ConnectionFactory()
{
UserName = "guest",
Password = "guest",
HostName = "localhost",
Port = 5672, //RabbitMQ默认的端口
};
while (true)
{
using var conn = factory.CreateConnection();
var chanel = conn.CreateModel();
chanel.QueueDeclare(QUEUENAME, true, false, false);
Console.WriteLine("输入生产内容:");
var input = Console.ReadLine();
chanel.BasicPublish("", QUEUENAME, null, Encoding.Default.GetBytes("hello rabbitmq:" + input));
}
在循环中,输入一个值,按下enter,即可推送一条消息到队列。
也可以直接在RabbitMQ的管理后台查看
创建消费者
const string QUEUENAME = "HELLO_MQ";
var factory = new ConnectionFactory()
{
UserName = "guest",
Password = "guest",
HostName = "localhost",
Port = 5672,
};
var conn = factory.CreateConnection();
var chanel = conn.CreateModel();
chanel.QueueDeclare(QUEUENAME, true, false, false);
EventingBasicConsumer consumer = new EventingBasicConsumer(chanel);
consumer.Received += (a, e) =>
{
Console.WriteLine($"{DateTime.Now.ToString()}接收到消息:" + Encoding.Default.GetString(e.Body.ToArray()));
chanel.BasicAck(e.DeliveryTag, true); //收到回复后,RabbitMQ会直接在队列中删除这条消息
};
chanel.BasicConsume(QUEUENAME, false, consumer);
Console.WriteLine("启动成功");
Console.ReadLine();
启动成功后,consumer的Received方法,会收到一条来自MQ的消息,
RabbitMQ的工作模式
Work Queue 工作队列模式
publish/subscribe
发布订阅模式是属于比较用多的一种。
const string QUEUENAME = "HELLO_MQ_B";
const string TESTEXCHANGE = "TESTEXCHANGE";
var factory = new ConnectionFactory()
{
UserName = "guest",
Password = "guest",
HostName = "localhost",
Port = 5672,
};
var conn = factory.CreateConnection();
var channel = conn.CreateModel();
//定义队列
channel.QueueDeclare(QUEUENAME, true, false, false);
//定义交换机
channel.ExchangeDeclare(TESTEXCHANGE, ExchangeType.Fanout, true, false);
//绑定队列到交换机
channel.QueueBind(QUEUENAME, TESTEXCHANGE, "");
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (a, e) =>
{
Console.WriteLine($"{DateTime.Now.ToString()}接收到消息:" + Encoding.Default.GetString(e.Body.ToArray()));
channel.BasicAck(e.DeliveryTag, true); //收到回复后,RabbitMQ会直接在队列中删除这条消息
};
channel.BasicConsume(QUEUENAME, false, consumer);
Console.WriteLine("启动成功");
Console.ReadLine();
B
const string QUEUENAME = "HELLO_MQ";
const string TESTEXCHANGE = "TESTEXCHANGE";
var factory = new ConnectionFactory()
{
UserName = "guest",
Password = "guest",
HostName = "localhost",
Port = 5672,
};
var conn = factory.CreateConnection();
var channel = conn.CreateModel();
//定义队列
channel.QueueDeclare(QUEUENAME, true, false, false);
//定义交换机
channel.ExchangeDeclare(TESTEXCHANGE, ExchangeType.Fanout, true, false);
//绑定队列到交换机
channel.QueueBind(QUEUENAME, TESTEXCHANGE, "");
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (a, e) =>
{
Console.WriteLine($"{DateTime.Now.ToString()}接收到消息:" + Encoding.Default.GetString(e.Body.ToArray()));
channel.BasicAck(e.DeliveryTag, true); //收到回复后,RabbitMQ会直接在队列中删除这条消息
};
channel.BasicConsume(QUEUENAME, false, consumer);
Console.WriteLine("启动成功");
Console.ReadLine();
生产者
const string QUEUENAME = "HELLO_MQ";
const string QUEUENAME_B = "HELLO_MQ_B";
const string TESTEXCHANGE = "TESTEXCHANGE";
//创建连接对象工厂
var factory = new ConnectionFactory()
{
UserName = "guest",
Password = "guest",
HostName = "localhost",
Port = 5672, //RabbitMQ默认的端口
};
using var conn = factory.CreateConnection();
while (true)
{
var channel = conn.CreateModel();
//定义交换机
channel.ExchangeDeclare(TESTEXCHANGE, ExchangeType.Fanout, true, false);
Console.WriteLine("输入生产内容:");
var input = Console.ReadLine();
channel.BasicPublish(TESTEXCHANGE,"", null, Encoding.Default.GetBytes("hello rabbitmq:" + input));
}
在生产者运行成功后,RabbitMQ后台会出现一个交换机,点击交换机会看到交换机下绑定了两个队列
routing模式
var channel = conn.CreateModel();
//定义交换机
channel.ExchangeDeclare(TESTEXCHANGE, ExchangeType.Direct, true, false);
//绑定队列到交换机
Console.WriteLine("输入生产内容:");
var input = Console.ReadLine();
channel.BasicPublish(TESTEXCHANGE, "INFO", null, Encoding.Default.GetBytes("hello rabbitmq:" + input));
//定义队列
channel.QueueDeclare(QUEUENAME, true, false, false);
//定义交换机
channel.ExchangeDeclare(TESTEXCHANGE, ExchangeType.Direct, true, false);
//绑定队列到交换机
channel.QueueBind(QUEUENAME, TESTEXCHANGE, "INFO");
//定义队列
channel.QueueDeclare(QUEUENAME, true, false, false);
//定义交换机
channel.ExchangeDeclare(TESTEXCHANGE, ExchangeType.Direct, true, false);
//绑定队列到交换机
channel.QueueBind(QUEUENAME, TESTEXCHANGE, "ERROR");
如果遇到指定routingKey生产一条消息,结果 AB消费者都收到的情况。建议在RabbitMQ后台的交换机下看一下绑定的Queue是否重复绑定了多个routingKey.