当前位置: 首页>后端>正文

spring rabbitmq多线程消费者 rabbitmq多个消费者

一、生成者-队列-多消费者(前言)

 上篇文章,我们做了一个简单的Demo,一个生产者对应一个消费者,本篇文章就介绍 生产者-队列-多个消费者,下面简单示意图

spring rabbitmq多线程消费者 rabbitmq多个消费者,spring rabbitmq多线程消费者 rabbitmq多个消费者_重新启动,第1张

 

 P 生产者    C 消费者  中间队列

 需求背景:工厂某部门需要生产n个零件,部门下面有2个小组,每个小组需要生产n/2个

公平派遣

每个小组的情况下,当所有奇怪的信息都很重,甚至信息很轻的时候,一个工作人员将不断忙碌,另一个工作人员几乎不会做任何工作。那么,RabbitMQ不知道什么,还会平均分配消息。

这是因为当消息进入队列时,RabbitMQ只会分派消息。它不看消费者的未确认消息的数量。它只是盲目地向第n个消费者发送每个第n个消息。

spring rabbitmq多线程消费者 rabbitmq多个消费者,spring rabbitmq多线程消费者 rabbitmq多个消费者_代码实现_02,第2张

 下面就由我们撸代码实现,这一需求::::

二、代码

P 生产者代码::: 

static void Main(string[] args)
        {
            using (var channel = HelpConnection.GetConnection().CreateModel())
            {
                //声明队列  
                channel.QueueDeclare("firstQueue", true, false, false, null);
                //声明路由
                channel.ExchangeDeclare("firstExchange", "direct", true, false, null);
                //绑定 建立关系
                channel.QueueBind("firstQueue", "firstExchange", "firstQueue_Exchange");
                //内容的基本属性
               var properties=channel.CreateBasicProperties();
                //设置消息内容持久化
               properties.Persistent = true;
                int j = 0;
                for (int i = 0; i < 100; i++)
                {
                    var msg = Encoding.UTF8.GetBytes("生产者-队列-多个消费者" + i);
                    channel.BasicPublish(exchange: "firstExchange",
                                         routingKey: "firstQueue_Exchange",
                                         basicProperties: properties,
                                         body: msg);
                    j = i;
                    Console.WriteLine( i);
                }
                Console.WriteLine("添加成功" + j + "条");
                Console.ReadKey();
            }
        }

成功添加100条

spring rabbitmq多线程消费者 rabbitmq多个消费者,spring rabbitmq多线程消费者 rabbitmq多个消费者_持久性_03,第3张

 

 C 消费者代码::: 

/// <summary>
        /// 
        /// </summary>
        /// <param name="args"></param>
        static void Main(string[] args)
        {
            using (var channel = HelpConnection.GetConnection().CreateModel())
            {
                //声明队列
                channel.QueueDeclare("firstQueue", true, false, false, null);
                //声明路由
                channel.ExchangeDeclare("firstExchange", "direct", true, false, null);
                //绑定 建立关系
                channel.QueueBind("firstQueue", "firstExchange", "firstQueue_Exchange");

                //公平分发 同一时间只处理一个消息
                channel.BasicQos(0, 1, true);
                var conSumer = new EventingBasicConsumer(channel);
                conSumer.Received += (moede, e) =>
                {
                    var body = e.Body;
                    var msg = Encoding.UTF8.GetString(body);
                    Console.WriteLine("显示结果:"+msg);
                    //进行交付,确定此消息已经处理完成
                   // channel.BasicAck( e.DeliveryTag,  false);
                };
                //确认收到消息    进行消费
                channel.BasicConsume("firstQueue", true, conSumer);//false 手动应答;true:自动应答
              
                Console.ReadKey();
            }
        }

效果图(特意建立好几个项目,同事启动进行测试)

spring rabbitmq多线程消费者 rabbitmq多个消费者,spring rabbitmq多线程消费者 rabbitmq多个消费者_大数据_04,第4张

 

三、总结

 本章总结注意几点:::

1、即使RabbitMQ重新启动,task_queue队列也不会丢失。现在我们需要将我们的消息标记为持久性 - 将IBasicProperties.SetPersistent设置为true。

var properties = channel.CreateBasicProperties();
  properties.Persistent = true;

2、公平分发同一时间只处理一个消息

channel.BasicQos(0,1,false)

https://www.xamrdz.com/backend/3mu1921992.html

相关文章: