当前位置: 首页>后端>正文

三大消息之一Rabbitmq

消息队列核心的几个概念

定义

是一个开源的AMQP实现,服务器端用Erlang语言编写,支持多种客户端,如:Python、Ruby、.NET、Java、C、用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不错
缺点:使用Erlang开发,阅读和修改源码难度大

核心应用

解耦:订单系统-》物流系统
异步:用户注册-》发送邮件,初始化信息
削峰:秒杀、日志处理

MQ消息中间件MessageQueue

主要是用于程序和程序直接通信,异步+解耦

JMS消息服务

AMQP消息服务

AMQP(advanced message queuing protocol)在2003年时被提出,最早用于解决金融领不同平台之间的消息传递交互问题,就是是一种协议,兼容JMS
更准确说的链接协议 binary- wire-level-protocol 直接定义网络交换的数据格式,类似http
具体的产品实现比较多,RabbitMQ就是其中一种

特性

独立于平台的底层消息传递协议
消费者驱动消息传递
跨语言和平台的互用性、属于底层协议
有5种交换类型direct,fanout,topic,headers,system
面向缓存的、可实现高性能、支持经典的消息队列,循环,存储和转发
支持长周期消息传递、支持事务(跨消息队列)

AMQP和JMS的主要区别

1.AMQP具有跨平台性
AMQP不从API层进行限定,直接定义网络交换的数据格式,这使得实现了
AMQP的provider天然性就是跨平台,比如Java语言产生的消息,可以用其他语言比如python的进行消费
AQMP可以用http来进行类比,不关心实现接口的语言,只要都按照相应的数据格式去发送报文请求,不同语言的client可以和不同语言的server进行通讯
2.消息类型
JMS消息类型:TextMessage/ObjectMessage/StreamMessage等
AMQP消息类型:Byte[]

rabbitmq的核心概念

Broker

--RabbitMQ的服务端程序,可以认为?个mq节点就是?个broker

Producer生产者

--创建消息Message,然后发布到RabbitMQ中

Consumer消费者

--消费队列里面的消息

Message 消息

--生产消费的内容,有消息头和消息体,也包括多个属性配置,比如routingKey路由键

Queue 队列

--是RabbitMQ 的内部对象,用于存储消息,消息都只能存储在队列中

Channel 信道

1.一条支持多路复用的通道,独立的双向数据流通道,可以发布、订阅、接收消息。
2.信道是建立在真实的TCP连接内的虚拟连接,复用TCP连接的通道

Connection连接

--是RabbitMQ的socket链接,它封装了socket协议相关部分逻辑,一个连接上可以有多个channel进行通信

Exchange 交换器

--生产者将消息发送到 Exchange,交换器将消息路由到一个或者多个队列中,里面有多个类型,后续再一一介绍,队列和交换机是多对多的关系。

RoutingKey 路由键

1.生产者将消息发给交换器的时候,一般会指定一个RoutingKey,用来指定这个消息的路由规则
2.最大长度255 字节

Binding 绑定

--通过绑定将交换器与队列关联起来,在绑定的时候一般会指定一个绑定键 ( BindingKey ),这样 RabbitMQ 就知道如何正确地将消息路由到队列了

主要端口介绍

4369 erlang 发现口
5672 client 端通信口
15672 管理界面 ui 端口
25672 server 间内部通信口

交换机类型

Direct Exchange 定向

将一个队列绑定到交换机上,要求该消息与一个特定的路由键完全匹配

Fanout Exchange 广播

Fanout交换机转发消息是最快的,用于发布订阅,广播形式,中文是扇形
不处理路由健

Topic Exchange 通配符

将路由键和某模式进行匹配。此时队列需要绑定要一个模式上
符号“#”匹配一个或多个词,符号“*”匹配不多不少一个词

SpringBoot2.X整合RabbitMQ实战

配置文件

##消息队列
spring:
  rabbitmq:
    host: 10.211.55.13
    port: 5672
    virtual-host: /dev
    password: password
    username: admin

RabbitMQConfig文件

@Configuration
public class RabbitMQConfig {

    public static final String EXCHANGE_NAME = "order_exchange";
    public static final String QUEUE_NAME = "order_queue";
    /**
     * 交换机
     * @return
     */
    @Bean
    public Exchange orderExchange() {
        return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build();
        //return new TopicExchange(EXCHANGE_NAME, true, false);
    }

    /**
     * 队列
     * @return
     */
    @Bean
    public Queue orderQueue() {
        return QueueBuilder.durable(QUEUE_NAME).build();
        //return new Queue(QUEUE_NAME, true, false, false, null);
    }

    /**
     * 交换机和队列绑定关系
     */
    @Bean
    public Binding orderBinding(Queue queue, Exchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with("order.#").noargs();
        //return new Binding(QUEUE_NAME, Binding.DestinationType.QUEUE, EXCHANGE_NAME, "order.#", null);
    }


}

消息生产者-测试类

@SpringBootTest
class DemoApplicationTests {

  @Autowired
  private RabbitTemplate template;

  @Test
  void send() {
    template.convertAndSend(RabbitMQConfig.EXCHANGE_NAME,"order.new","新订单来啦1");
  }
}

消息消费者

@Component
@RabbitListener(queues = "order_queue")
public class OrderMQListener {

    /**
     * RabbitHandler 会自动匹配 消息类型(消息自动确认)
     * @param msg
     * @param message
     * @throws IOException
     */
    @RabbitHandler
    public void releaseCouponRecord(String msg, Message message) throws IOException {

        long msgTag = message.getMessageProperties().getDeliveryTag();
        System.out.println("msgTag="+msgTag);
        System.out.println("message="+message.toString());
        System.out.println("监听到消息:消息内容:"+message.getBody());

    }

}

消息可靠性投递+消费

RabbitMQ消息投递路径生产者-->交换机->队列->消费者

生产者到交换机

通过confirmCallback
生产者投递消息后,如果Broker收到消息后,会给生产者一个ACK。生产者通过ACK,可以确认这条消息是否正常发送到Broker,这种方式是消息可靠性投递的核心

开启confirmCallback
旧版,确认消息发送成功,通过实现ConfirmCallBack接口,消息发送到交换器Exchange后触发回调
spring.rabbitmq.publisher-confirms=true
新版,NONE值是禁用发布确认模式,是默认值,
CORRELATED值是发布消息成功到交换器后会触发回调方法
spring.rabbitmq.publisher-confirm-type: correlated

@Autowired
  private RabbitTemplate template;

  @Test
  void testConfirmCallback() {

    template.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
      /**
       *
       * @param correlationData 配置
       * @param ack 交换机是否收到消息,true是成功,false是失败
       * @param cause 失败的原因
       */
      @Override
      public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        System.out.println("confirm=====>");
        System.out.println("confirm==== ack="+ack);
        System.out.println("confirm==== cause="+cause);

        //根据ACK状态做对应的消息更新操作 TODO
      }
    });
    template.convertAndSend(RabbitMQConfig.EXCHANGE_NAME+,"order.new","新订单来啦1");
  }

交换机到队列

通过returnCallback

消息从交换器发送到对应队列失败时触发
默认:交换机到队列不成功,则丢弃消息(默认)
交换机到队列不成功,返回给消息生产者,触发returnCallback
第一步 开启returnCallback配置
第二步 修改交换机投递到队列失败的策略
code

@Test
  void testReturnCallback() {
    //为true,则交换机处理消息到路由失败,则会返回给生产者
    //开启强制消息投递(mandatory为设置为true),但消息未被路由至任何一个queue,则回退一条消息
    template.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
      @Override
      public void returnedMessage(ReturnedMessage returned) {
        int code = returned.getReplyCode();
        System.out.println("code="+code);
        System.out.println("returned="+returned.toString());
      }
    });
    template.convertAndSend(RabbitMQConfig.EXCHANGE_NAME,"xxx.order.new","新订单来啦11");
  }

建议

开启消息确认机制以后,保证了消息的准确送达,但由于频繁的确认交互, rabbitmq 整体效率变低,吞吐量下降严重,不是非常重要的消息真心不建议用消息确认机制

RabbitMQ消息确认机制ACK讲解

RabbitMQ的ACK介绍

1.消费者从RabbitMQ收到消息并处理完成后,反馈给RabbitMQ,RabbitMQ收到反馈后才将此消息从队列中删除
2.消费者在处理消息出现了网络不稳定、服务器异常等现象,那么就不会有ACK反馈,RabbitMQ会认为这个消息没有正常消费,会将消息重新放入队列中
3.只有当消费者正确发送ACK反馈,RabbitMQ确认收到后,消息才会从RabbitMQ服务器的数据中删除。
4.消息的ACK确认机制默认是打开的,消息如未被进行ACK的消息确认机制,这条消息被锁定Unacked

确认方式

自动确认(默认)
手动确认 manual

spring:
  rabbitmq:
    #开启手动确认消息,如果消息重新入队,进行重试
    listener:
      simple:
        acknowledge-mode: manual

消费者

@RabbitHandler
public void releaseCouponRecord(String body, Message message, Channel channel) throws IOException {
        long msgTag = message.getMessageProperties().getDeliveryTag();
        System.out.println("msgTag="+msgTag);
        System.out.println("message="+message.toString());
        System.out.println("body="+body);

        //成功确认,使用此回执方法后,消息会被 rabbitmq broker 删除
        //channel.basicAck(msgTag,false);
        //channel.basicNack(msgTag,false,true);

    }

消息确认

//成功确认,使用此回执方法后,消息会被 rabbitmq broker 删除

channel.basicAck(msgTag,false);

消息拒绝

basicReject一次只能拒绝接收一个消息,可以设置是否requeue。
basicNack方法可以支持一次0个或多个消息的拒收,可以设置是否requeue。

延迟消息

什么是TTL

time to live 消息存活时间
如果消息在存活时间内未被消费,则会别清除
RabbitMQ支持两种ttl设置
1.单独消息进行配置ttl
2.整个队列进行配置ttl(居多)

什么是rabbitmq的死信队列

没有被及时消费的消息存放的队列

什么是rabbitmq的死信交换机

什么是rabbitmq的死信交换机


三大消息之一Rabbitmq,第1张

https://www.xamrdz.com/backend/38e1940096.html

相关文章: