当前位置: 首页>后端>正文

三大消息之Kafka

讲解AMQP和JMS

JMS

java消息服务(Java message server), 一种与厂商无关的API, 用来访问消息收发系统消息。与JDBC类似

kafka 和RocketMQ 是基于JMS实现的

AMQP以及拓展项MQTT

最早应用于解决金融领域不同平台的消息传递交互问题,是一种链接协议直接定义网络交互的数据格式,类似http

Rabbitmq是实现的产品之一

MQTT:消息队列遥测传输(Message Queueing Telemetry Transport)
背景:计算性能不高的设备不能适应AMQP上的负责操作,MQTT是专门为了小设备设计的,MQTT在物联网中被大量使用

MQTT 特性
内存占用低,为小型无声设备之间通过低带宽发送短消息而设计
不支持长周期存储和转发,不允许分段消息(很难发送长消息)
支持主题发布-订阅、不支持事务(仅基本确认)
消息实际上是短暂的(短周期)
简单用户名和密码、不支持安全连接、消息不透明

AMQP和JMS的主要区别

AMQP不从API层进行限定,直接定义网络交换的数据格式,这使得实现了AMQP的provider天然性就是跨平台.
JMS消息类型:TextMessage/ObjectMessage/StreamMessage等
AMQP消息类型:Byte[]

核心

一种高吞吐量的分布式流处理平台,它可以处理消费者在网站中的所有动作流数据。

比如 网页浏览,搜索和其他用户的行为等,应用于大数据实时处理领域

官网

http://kafka.apache.org/

核心概念

Broker
Kafka的服务端程序,可以认为一个mq节点就是一个broker
broker存储topic的数据

Producer生产者
创建消息Message,然后发布到MQ中
该角色将消息发布到Kafka的topic中

Consumer消费者:
消费队列里面的消息
ConsumerGroup消费者组
同个topic, 广播发送给不同的group,一个group中只有一个consumer可以消费此消息

 
Topic
每条发布到Kafka集群的消息都有一个类别,这个类别被称为Topic,主题的意思

 
Partition分区
kafka数据存储的基本单元,topic中的数据分割为一个或多个partition,
每个topic至少有一个partition,是有序的
一个Topic的多个partitions, 被分布在kafka集群中的多个server上
消费者数量 <=小于或者等于Partition数量
 

Replication 副本
同个Partition会有多个副本replication ,多个副本的数据是一样的,当其他broker挂掉后,系统可以主动用副本提供服务
默认每个topic的副本都是1(默认是没有副本,节省资源),也可以在创建topic的时候指定
如果当前kafka集群只有3个broker节点,则replication-factor最大就是3了,如果创建副本为4,则会报错
 

ReplicationLeader、ReplicationFollower
Partition有多个副本,但只有一个replicationLeader负责该Partition和生产者消费者交互
ReplicationFollower只是做一个备份,从replicationLeader进行同步
 

ReplicationManager
负责Broker所有分区副本信息,Replication 副本状态切换offset
每个consumer实例需要为他消费的partition维护一个记录自己消费到哪里的偏移offset
kafka把offset保存在消费端的消费者组里

Kafka 流程图

三大消息之Kafka,第1张
kafka流程图.drawio.png
Broker 类比数据库
Topic 类比数据库的表
Partition 类比数据库的分表
---------------------------------------
Topic 可以指定副本数量,多个副本位于多台机器上
Kafka 使用zookeeper在多个副本中选举出1个leader,其他副本作为follower.
Leader主要负责读写信息,也就是和生产者消费者打交道,同时将消息同步写到其他副本
如果某个broker失效,如果topic没有了leader,则会重新选举出新的leader
一个topic的多个partition,被分布在kafka集群的多个server里
kafka保证同一个partition的多个replication(副本)一定不会分配到同一台broker里

特点总结

1.多订阅者
      一个topic可以有一个或者多个订阅者
      每个订阅者都要有一个partition,所以订阅者数量要少于等于partition数量
2.高吞吐量、低延迟: 每秒可以处理几十万条消息

3.高并发:几千个客户端同时读写

4.容错性:多副本、多分区,允许集群中节点失败,如果副本数据量为n,则可以n-1个节点失败

5.扩展性强:支持热扩展

安装

前置条件

安装jdk 和 zookeeper

安装注意事项

安装zookeeper (默认端口2181)
tar -zvxf apache-zookeeper-3.7.0-bin.tar.gz 
重命名
mv apache-zookeeper-3.7.0-bin zookeeper
拷贝配置文件
/usr/local/software/zookeeper/conf
cp zoo_sample.cfg zoo.cfg
启功
bin/zkServer.sh start
查看启动是否成功
yum -y install lsof
查看端口是否被监听以判断是否启动成功
lsof -i:2181


安装kafka (默认端口9092)
tar -zxvf kafka_2.13-2.8.0.tgz
mv kafka_2.13-2.8.0 kafka
修改配置文件 server.properties
/usr/local/software/kafka/config
vim server.properties
修改成服务器内网ip
listeners=PLAINTEXT://内网ip:9092  
修改成服务器公网ip
advertised.listeners=PLAINTEXT://公网ip:9092

修改zk地址
zookeeper.connect=localhost:2181 zk和kafka同一台机器不用改,不同的话要改  

根据配置文件启动
./kafka-server-start.sh  ../config/server.properties &
日志显示这个表示启动成功
INFO [KafkaServer id=0] started (kafka.server.KafkaServer)
或者查看端口占用情况
lsof -i:9092

停止
/usr/local/software/kafka/bin
./kafka-server-stop.sh

Linux环境下守护进程启动Kafka
./kafka-server-start.sh -daemon ../config/server.properties &

开放服务器的网络安全组 2181(zookeeper默认端口) 9092(Kafka默认端口)
集群同理,开放对应端口

Kafka点对点模型和发布订阅模型

JMS规范目前支持两种消息模型

点对点(point to point)
同个发送者,不同消费者配置相同配置文件(保证消费者groupId一样)。可以实现轮询消费
现象: 一条消息只有一个消费者会消费

————————————————————————————————————

发布/订阅(publish/subscribe)
同个发送者,不同消费者配置相同不同配置文件(保证消费者groupId不一样)。可以实现发布订阅消费

现象: 同一条消息不同消费者组均有各自的消费者进行消费

kafka在linux系统常用命令

查看topic
./kafka-topics.sh --list --zookeeper  公网:2181

Linux环境下启动Kafka
./kafka-server-start.sh  ../config/server.properties &
Linux环境下守护进程启动Kafka
./kafka-server-start.sh -daemon ../config/server.properties &

生产者发送消息 命令行
./kafka-console-producer.sh --broker-list 公网ip:9092 --topic test-topic

消费者消费消息 命令行 
./kafka-console-consumer.sh --bootstrap-server 公网:9092 --from-beginning --topic test-topic
--from-beginning 这个会把主题以往的消息重头消费
./kafka-console-consumer.sh --bootstrap-server 公网:9092  --topic test-topic


删除topic
./kafka-topics.sh --zookeeper 公网ip:2181 --delete --topic test-topic

查看broker下topic状态信息
./kafka-topics.sh --describe --zookeeper 公网ip:2181 --topic test-topic

Kafka核心API生产者

Kafka的producer生产者发送到Broker分区策略讲解

生产者发送到broker里面的流程是怎样的呢,一个 topic 有多个 partition分区,每个分区又有多个副本

如果指定Partition ID,则PR被发送至指定Partition (ProducerRecord)

如果未指定Partition ID,但指定了Key, PR会按照hash(key)发送至对应Partition

如果未指定Partition ID也没指定Key,PR会按照默认 round-robin轮训模式发送到每个Partition


生产者到broker发送流程

Kafka的客户端发送数据到服务器,不是来一条就发一条,会经过内存缓冲区(默认是16KB),通过KafkaProducer发送出去的消息都是先进入到客户端本地的内存缓冲里,然后把很多消息收集到的Batch里面,再一次性发送到Broker上去的,这样性能才可能题高

kafka高效发送原理图

三大消息之Kafka,第2张
kafka发送者逻辑.png

PR

生产者发送消息回调函数的消息
ProducerRecord(简称PR)
发送给Kafka Broker的key/value 值对, 封装基础数据信息

-- Topic (名字)
-- PartitionID (可选)
-- Key(可选)
-- Value

PR key的注意点

key默认是null,大多数应用程序会用到key

如果key为空,kafka使用默认的partitioner,使用RoundRobin算法将消息均衡地分布在各个partition上

如果key不为空,kafka使用自己实现的hash方法对key进行散列,决定消息该被写到Topic的哪个partition,拥有相同key的消息会被写到同一个partition,实现顺序消息

kafka producerAPI回调函数实战

生产者发送消息是异步调用,怎么知道是否有异常?

发送消息配置回调函数即可, 该回调方法会在 Producer 收到 ack 时被调用,为异步调用
回调函数有两个参数 RecordMetadata 和 Exception,如果 Exception 是 null,则消息发送成功,否则失败

/**
     * 发送消息携带回调函数
     * @throws ExecutionException
     * @throws InterruptedException
     */

    @Test
    public void testSendWithCallBack() throws ExecutionException, InterruptedException {

        //kafka 发送者配置
        Properties properties = getProperties();

        Producer<String,String> producer = new KafkaProducer<>(properties);

        for(int i=0;i<3 ;i++){

            Future<RecordMetadata> future = producer.send(new ProducerRecord<>(TOPIC_NAME,
                    "test-key" + i, "test-value" + i), new Callback() {
                @Override
                public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                    if(null == e){
                        System.out.println("发送消息状态:"+recordMetadata.toString());
                    }else {
                        e.printStackTrace();
                    }
                }
            });
        }
        producer.close();
    }

指定分区发送实现局部顺序消费

只要是send的时候实现指定partition 的值

/**
     * 发送指定分区 实现顺序消费
     * @throws ExecutionException
     * @throws InterruptedException
     */
    @Test
    public void testSendWithCallBackAndPartition() throws ExecutionException, InterruptedException {


        Properties properties = getProperties();

        Producer<String,String> producer = new KafkaProducer<>(properties);

        for(int i=0;i<3 ;i++){

            Future<RecordMetadata> future = producer.send(new ProducerRecord<>(TOPIC_NAME,3,
                    "test-key" + i, "test-value" + i), new Callback() {
                @Override
                public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                    if(null == e){
                        System.out.println("发送消息状态:"+recordMetadata.toString());
                    }else {
                        e.printStackTrace();
                    }
                }
            });


        }

        producer.close();


    }

根据自定分区策略发送,发送到指定分区

1.源码解读默认分区器
org.apache.kafka.clients.producer.internals.DefaultPartitioner
2.自定义分区规则
创建,实现Partitioner接口,重写方法
配置 partitioner.class 指定类即可

public class IdentifyPartitionStrategy implements Partitioner {
    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        if ("testPartition".equals(key)) {
            return 1;
        } else {
            List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
            int numPartitions = partitions.size();
            return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
        }
    }

    @Override
    public void close() {

    }

    @Override
    public void configure(Map<String, ?> map) {

    }
}

Kafka核心API消费者

Kafka的Consumer消费者机制和分区策略讲解

消费者根据什么模式从broker获取数据的?
Pull   主动拉取

为什么是pull模式,而不是broker主动push?

消费者采用 pull 拉取方式,从broker的partition获取数据

pull 模式则可以根据 consumer 的消费能力进行自己调整,不同的消费者性能不一样

如果broker没有数据,consumer可以配置 timeout 时间,阻塞等待一段时间之后再返回
如果是broker主动push,优点是可以快速处理消息,但是容易造成消费者处理不过来,消息堆积和延迟。

Consumer消费者机制和分区策略讲解

顶层接口

org.apache.kafka.clients.consumer.internals.AbstractPartitionAssignor

round-robin (RoundRobinAssignor非默认策略)轮训

【按照消费者组】进行轮训分配,同个消费者组监听不同主题也一样,是把所有的 partition 和所有的 consumer 都列出来,
 所以消费者组里面订阅的主题是一样的才行,主题不一样则会出现分配不均问题,例如7个分区,同组内2个消费者

topic-p0/topic-p1/topic-p2/topic-p3/topic-p4/topic-p5/topic-p6

c-1: topic-p0/topic-p2/topic-p4/topic-p6

c-2:topic-p1/topic-p3/topic-p5

弊端

如果同一消费者组内,所订阅的消息是不相同的,在执行分区分配的时候不是轮询分配,可能会导致分区分配的不均匀
有3个消费者C0、C1和C2,他们共订阅了 3 个主题:t0、t1 和 t2
t0有1个分区(p0),t1有2个分区(p0、p1),t2有3个分区(p0、p1、p2))
消费者C0订阅的是主题t0,消费者C1订阅的是主题t0和t1,消费者C2订阅的是主题t0、t1和t2

range (RangeAssignor默认策略)范围

【按照主题】进行分配,如果不平均分配,则第一个消费者会分配比较多分区, 一个消费者监听不同主题也不影响,例如7个分区,同组内2个消费者

topic-p0/topic-p1/topic-p2/topic-p3/topic-p4/topic-p5//topic-p6

c-1: topic-p0/topic-p1/topic-p2/topic-p3

c-2:topic-p4/topic-p5/topic-p6

弊端

只是针对 1 个 topic 而言,c-1多消费一个分区影响不大
如果有 N 多个 topic,那么针对每个 topic,消费者 C-1 都将多消费 1 个分区,topic越多则消费的分区也越多,则性能有所下降

Kafka的高性能原理

kafka高性能

1.存储模型,topic多分区,每个分区多segment段

2.index索引文件查找,利用分段和稀疏索引

3.磁盘顺序写入

4.异步操作少阻塞sender和main线程,批量操作(batch)

5.页缓存Page cache,没利用JVM内存,因为容易GC影响性能

6.零拷贝ZeroCopy(SendFile)

Kafka的高性能原理分析-ZeroCopy

零拷贝ZeroCopy(SendFile)

例子:将一个File读取并发送出去(Linux有两个上下文,内核态,用户态)

File文件的经历了4次copy

调用read,将文件拷贝到了kernel内核态
CPU控制 kernel态的数据copy到用户态
调用write时,user态下的内容会copy到内核态的socket的buffer中
最后将内核态socket buffer的数据copy到网卡设备中传送
缺点:增加了上下文切换、浪费了2次无效拷贝(即步骤2和3)

定义

ZeroCopy零拷贝:

请求kernel直接把disk的data传输给socket,而不是通过应用程序传输。Zero copy大大提高了应用程序的性能,
减少不必要的内核缓冲区跟用户缓冲区间的拷贝,从而减少CPU的开销和减少了kernel和user模式的上下文切换,达到性能的提升

对应零拷贝技术有mmap及sendfile

mmap:小文件传输快
sendfile:大文件传输比mmap快

原理图

三大消息之Kafka,第3张
aa3fc4edf467a7ccc93ac0456d5b3aa.png

Springboot项目整合spring-kafka

生产者code demo

    @Autowired
    private KafkaTemplate<String, Object> kafkaTemplate;


    /**
     * 发送消息
     * @param phone
     */
    @GetMapping("/api/user/{phone}")
    public void sendMessage1(@PathVariable("phone") String phone) {
        kafkaTemplate.send(TOPIC_NAME, phone).addCallback(success -> {
            // 消息发送到的topic
            String topic = success.getRecordMetadata().topic();
            // 消息发送到的分区
            int partition = success.getRecordMetadata().partition();
            // 消息在分区内的offset
            long offset = success.getRecordMetadata().offset();
            System.out.println("发送消息成功:" + topic + "-" + partition + "-" + offset);

        }, failure -> {
            System.out.println("发送消息失败:" + failure.getMessage());
        });

    }

消费者code demo

@Component
public class MQListener {


    /**
     *  消费监听
     * @param record
     */
    @KafkaListener(topics = {"user.register.topic"},groupId = "xdlcass-test-gp")
    public void onMessage1(ConsumerRecord<?, ?> record, Acknowledgment ack, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic){
        // 打印出消息内容
        System.out.println("消费:"+record.topic()+"-"+record.partition()+"-"+record.value());

        ack.acknowledge();
    }


}

Kafka事务消息

Kafka 从 0.11 版本开始引入了事务支持
事务可以保证对多个分区写入操作的原子性
操作的原子性是指多个操作要么全部成功,要么全部失败,不存在部分成功、部分失败的可能

code 实现demo

注解方式的事务

     /**
     * 注解方式的事务
     * @param i
     */
    @GetMapping("/kafka/transaction1")
    @Transactional(rollbackFor = RuntimeException.class)
    public void sendMessage1(int i) {

        kafkaTemplate.send(TOPIC_NAME, "这个是事务里面的消息:1  i="+i);
            if (i == 0) {
                throw new RuntimeException("fail");
            }
        kafkaTemplate.send(TOPIC_NAME, "这个是事务里面的消息:2  i="+i);

    }

现象: 当模拟异常i=0时,第一和第二条消息都发送不成功

声明式事务支持

 /**
     * 声明式事务支持
     * @param i
     */
    @GetMapping("/kafka/transaction2")
    public void sendMessage2(int i) {

        kafkaTemplate.executeInTransaction(new KafkaOperations.OperationsCallback() {
            @Override
            public Object doInOperations(KafkaOperations kafkaOperations) {
                kafkaOperations.send(TOPIC_NAME,"这个是事务里面的消息:1  i="+i);
                if(i==0)
                {
                    throw new RuntimeException("input is error");
                }
                kafkaOperations.send(TOPIC_NAME,"这个是事务里面的消息:2  i="+i);
                return true;
            }
        });

    }

同理现象: 当模拟异常i=0时,第一和第二条消息都发送不成功


https://www.xamrdz.com/backend/3a41923405.html

相关文章: