上一篇讲了简单队列,实际工作中,这种队列应该很少用到,因为生产者发送消息的耗时一般都很短,但是消费者收到消息后,往往伴随着对高消息的业务逻辑处理,是个耗时的过程,这势必会导致大量的消息积压在一个消费者手中,从而导致业务的积压.
所以我们需要多个消费者一起消费队列中的消息,模型如下:(为了方便讲解,暂时隐藏掉"交换机")
生产者
public class Producer
{
private const string QueueName = "test_work_queue";
public static void Send()
{
//获取一个连接
using (IConnection connection = ConnectionHelper.GetConnection())
{
//从连接中获取一个信道
using (IModel channel = connection.CreateModel())
{
//声明队列
channel.QueueDeclare(QueueName, false, false, false, null);
for (int i = 0; i < 50; i++)
{
//创建消息
string msg = "hello world " + i;
//发送消息
channel.BasicPublish("", QueueName, null, Encoding.Default.GetBytes(msg));
Console.WriteLine($"{DateTime.Now} : send {msg}");
}
}
}
}
}
消费者1
public class Consumer1
{
private const string QueueName = "test_work_queue";
public static void Receive()
{
//获取一个连接
IConnection connection = ConnectionHelper.GetConnection();
//从连接中获取一个信道
IModel channel = connection.CreateModel();
//声明队列
channel.QueueDeclare(QueueName, false, false, false, null);
//添加消费者
EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
//注册消费者收消息事件
consumer.Received += (s, e) =>
{
byte[] bytes = e.Body;
string str = Encoding.Default.GetString(bytes);
Console.WriteLine("consumer1 receive : " + str);
Thread.Sleep(500);//休息0.5秒
};
//开启消费者监听
channel.BasicConsume(QueueName, true, "", false, false, null, consumer);
}
}
消费者2
只有一点点区别:
Console.WriteLine("consumer2 receive : " + str);
Thread.Sleep(1000);//休息1秒
我们这里故意让两个消费者处理消息的耗时不一样,一个0.5秒,一个1秒.
我们来看看结果:
可以非常清楚的看到,尽管两个消费者处理消息的"耗时"不一样,但是处理的"数量"是一样的.
这里有几个细节要说明一下:
1、 在生产者和两个消费者中都声明了同一个队列.其实,如果这个队列之前已经存在了,那么生产者和消费者都可以不用再声明了;;
2、 一定要先启动两个消费者,再启动生产者.原因是,我们上面的代码中,消费者的BasicConsume方法的第2个参数传入的是true,;
这个参数就是 autoAck :是否自动确认(上面文章有讲过).
所以如果先开启生产者,那么会瞬间发送完50条消息,这时候启动消费者1,那么会立刻"消费"掉这50条消息.有朋友肯定要问,不是"睡"了0.5秒么?
这里"睡"0.5秒,是对消息的业务逻辑处理耗时,而不是"消费"消息,消息已经在消费者启动的那一刻从队列中"拿"过来了;
同时,由于采用的是"自动确认",所以队列看到50条都被"确认"了,就会将这些消息从队列中移除.
这时候再启动消费者2,则不会收到任何消息.