讲解AMQP和JMS
JMS
java消息服务(Java message server), 一种与厂商无关的API, 用来访问消息收发系统消息。与JDBC类似
kafka 和RocketMQ 是基于JMS实现的
AMQP以及拓展项MQTT
最早应用于解决金融领域不同平台的消息传递交互问题,是一种链接协议直接定义网络交互的数据格式,类似http
Rabbitmq是实现的产品之一
MQTT:消息队列遥测传输(Message Queueing Telemetry Transport)
背景:计算性能不高的设备不能适应AMQP上的负责操作,MQTT是专门为了小设备设计的,MQTT在物联网中被大量使用
MQTT 特性
内存占用低,为小型无声设备之间通过低带宽发送短消息而设计
不支持长周期存储和转发,不允许分段消息(很难发送长消息)
支持主题发布-订阅、不支持事务(仅基本确认)
消息实际上是短暂的(短周期)
简单用户名和密码、不支持安全连接、消息不透明
AMQP和JMS的主要区别
AMQP不从API层进行限定,直接定义网络交换的数据格式,这使得实现了AMQP的provider天然性就是跨平台.
JMS消息类型:TextMessage/ObjectMessage/StreamMessage等
AMQP消息类型:Byte[]
核心
一种高吞吐量的分布式流处理平台,它可以处理消费者在网站中的所有动作流数据。
比如 网页浏览,搜索和其他用户的行为等,应用于大数据实时处理领域
官网
http://kafka.apache.org/
核心概念
Broker
Kafka的服务端程序,可以认为一个mq节点就是一个broker
broker存储topic的数据
Producer生产者
创建消息Message,然后发布到MQ中
该角色将消息发布到Kafka的topic中
Consumer消费者:
消费队列里面的消息
ConsumerGroup消费者组
同个topic, 广播发送给不同的group,一个group中只有一个consumer可以消费此消息
Topic
每条发布到Kafka集群的消息都有一个类别,这个类别被称为Topic,主题的意思
Partition分区
kafka数据存储的基本单元,topic中的数据分割为一个或多个partition,
每个topic至少有一个partition,是有序的
一个Topic的多个partitions, 被分布在kafka集群中的多个server上
消费者数量 <=小于或者等于Partition数量
Replication 副本
同个Partition会有多个副本replication ,多个副本的数据是一样的,当其他broker挂掉后,系统可以主动用副本提供服务
默认每个topic的副本都是1(默认是没有副本,节省资源),也可以在创建topic的时候指定
如果当前kafka集群只有3个broker节点,则replication-factor最大就是3了,如果创建副本为4,则会报错
ReplicationLeader、ReplicationFollower
Partition有多个副本,但只有一个replicationLeader负责该Partition和生产者消费者交互
ReplicationFollower只是做一个备份,从replicationLeader进行同步
ReplicationManager
负责Broker所有分区副本信息,Replication 副本状态切换offset
每个consumer实例需要为他消费的partition维护一个记录自己消费到哪里的偏移offset
kafka把offset保存在消费端的消费者组里
Kafka 流程图
Broker 类比数据库
Topic 类比数据库的表
Partition 类比数据库的分表
---------------------------------------
Topic 可以指定副本数量,多个副本位于多台机器上
Kafka 使用zookeeper在多个副本中选举出1个leader,其他副本作为follower.
Leader主要负责读写信息,也就是和生产者消费者打交道,同时将消息同步写到其他副本
如果某个broker失效,如果topic没有了leader,则会重新选举出新的leader
一个topic的多个partition,被分布在kafka集群的多个server里
kafka保证同一个partition的多个replication(副本)一定不会分配到同一台broker里
特点总结
1.多订阅者
一个topic可以有一个或者多个订阅者
每个订阅者都要有一个partition,所以订阅者数量要少于等于partition数量
2.高吞吐量、低延迟: 每秒可以处理几十万条消息
3.高并发:几千个客户端同时读写
4.容错性:多副本、多分区,允许集群中节点失败,如果副本数据量为n,则可以n-1个节点失败
5.扩展性强:支持热扩展
安装
前置条件
安装jdk 和 zookeeper
安装注意事项
安装zookeeper (默认端口2181)
tar -zvxf apache-zookeeper-3.7.0-bin.tar.gz
重命名
mv apache-zookeeper-3.7.0-bin zookeeper
拷贝配置文件
/usr/local/software/zookeeper/conf
cp zoo_sample.cfg zoo.cfg
启功
bin/zkServer.sh start
查看启动是否成功
yum -y install lsof
查看端口是否被监听以判断是否启动成功
lsof -i:2181
安装kafka (默认端口9092)
tar -zxvf kafka_2.13-2.8.0.tgz
mv kafka_2.13-2.8.0 kafka
修改配置文件 server.properties
/usr/local/software/kafka/config
vim server.properties
修改成服务器内网ip
listeners=PLAINTEXT://内网ip:9092
修改成服务器公网ip
advertised.listeners=PLAINTEXT://公网ip:9092
修改zk地址
zookeeper.connect=localhost:2181 zk和kafka同一台机器不用改,不同的话要改
根据配置文件启动
./kafka-server-start.sh ../config/server.properties &
日志显示这个表示启动成功
INFO [KafkaServer id=0] started (kafka.server.KafkaServer)
或者查看端口占用情况
lsof -i:9092
停止
/usr/local/software/kafka/bin
./kafka-server-stop.sh
Linux环境下守护进程启动Kafka
./kafka-server-start.sh -daemon ../config/server.properties &
开放服务器的网络安全组 2181(zookeeper默认端口) 9092(Kafka默认端口)
集群同理,开放对应端口
Kafka点对点模型和发布订阅模型
JMS规范目前支持两种消息模型
点对点(point to point)
同个发送者,不同消费者配置相同配置文件(保证消费者groupId一样)。可以实现轮询消费
现象: 一条消息只有一个消费者会消费
————————————————————————————————————
发布/订阅(publish/subscribe)
同个发送者,不同消费者配置相同不同配置文件(保证消费者groupId不一样)。可以实现发布订阅消费
现象: 同一条消息不同消费者组均有各自的消费者进行消费
kafka在linux系统常用命令
查看topic
./kafka-topics.sh --list --zookeeper 公网:2181
Linux环境下启动Kafka
./kafka-server-start.sh ../config/server.properties &
Linux环境下守护进程启动Kafka
./kafka-server-start.sh -daemon ../config/server.properties &
生产者发送消息 命令行
./kafka-console-producer.sh --broker-list 公网ip:9092 --topic test-topic
消费者消费消息 命令行
./kafka-console-consumer.sh --bootstrap-server 公网:9092 --from-beginning --topic test-topic
--from-beginning 这个会把主题以往的消息重头消费
./kafka-console-consumer.sh --bootstrap-server 公网:9092 --topic test-topic
删除topic
./kafka-topics.sh --zookeeper 公网ip:2181 --delete --topic test-topic
查看broker下topic状态信息
./kafka-topics.sh --describe --zookeeper 公网ip:2181 --topic test-topic
Kafka核心API生产者
Kafka的producer生产者发送到Broker分区策略讲解
生产者发送到broker里面的流程是怎样的呢,一个 topic 有多个 partition分区,每个分区又有多个副本
如果指定Partition ID,则PR被发送至指定Partition (ProducerRecord)
如果未指定Partition ID,但指定了Key, PR会按照hash(key)发送至对应Partition
如果未指定Partition ID也没指定Key,PR会按照默认 round-robin轮训模式发送到每个Partition
生产者到broker发送流程
Kafka的客户端发送数据到服务器,不是来一条就发一条,会经过内存缓冲区(默认是16KB),通过KafkaProducer发送出去的消息都是先进入到客户端本地的内存缓冲里,然后把很多消息收集到的Batch里面,再一次性发送到Broker上去的,这样性能才可能题高
kafka高效发送原理图
PR
生产者发送消息回调函数的消息
ProducerRecord(简称PR)
发送给Kafka Broker的key/value 值对, 封装基础数据信息
-- Topic (名字)
-- PartitionID (可选)
-- Key(可选)
-- Value
PR key的注意点
key默认是null,大多数应用程序会用到key
如果key为空,kafka使用默认的partitioner,使用RoundRobin算法将消息均衡地分布在各个partition上
如果key不为空,kafka使用自己实现的hash方法对key进行散列,决定消息该被写到Topic的哪个partition,拥有相同key的消息会被写到同一个partition,实现顺序消息
kafka producerAPI回调函数实战
生产者发送消息是异步调用,怎么知道是否有异常?
发送消息配置回调函数即可, 该回调方法会在 Producer 收到 ack 时被调用,为异步调用
回调函数有两个参数 RecordMetadata 和 Exception,如果 Exception 是 null,则消息发送成功,否则失败
/**
* 发送消息携带回调函数
* @throws ExecutionException
* @throws InterruptedException
*/
@Test
public void testSendWithCallBack() throws ExecutionException, InterruptedException {
//kafka 发送者配置
Properties properties = getProperties();
Producer<String,String> producer = new KafkaProducer<>(properties);
for(int i=0;i<3 ;i++){
Future<RecordMetadata> future = producer.send(new ProducerRecord<>(TOPIC_NAME,
"test-key" + i, "test-value" + i), new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if(null == e){
System.out.println("发送消息状态:"+recordMetadata.toString());
}else {
e.printStackTrace();
}
}
});
}
producer.close();
}
指定分区发送实现局部顺序消费
只要是send的时候实现指定partition 的值
/**
* 发送指定分区 实现顺序消费
* @throws ExecutionException
* @throws InterruptedException
*/
@Test
public void testSendWithCallBackAndPartition() throws ExecutionException, InterruptedException {
Properties properties = getProperties();
Producer<String,String> producer = new KafkaProducer<>(properties);
for(int i=0;i<3 ;i++){
Future<RecordMetadata> future = producer.send(new ProducerRecord<>(TOPIC_NAME,3,
"test-key" + i, "test-value" + i), new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if(null == e){
System.out.println("发送消息状态:"+recordMetadata.toString());
}else {
e.printStackTrace();
}
}
});
}
producer.close();
}
根据自定分区策略发送,发送到指定分区
1.源码解读默认分区器
org.apache.kafka.clients.producer.internals.DefaultPartitioner
2.自定义分区规则
创建类,实现Partitioner接口,重写方法
配置 partitioner.class 指定类即可
public class IdentifyPartitionStrategy implements Partitioner {
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
if ("testPartition".equals(key)) {
return 1;
} else {
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
}
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> map) {
}
}
Kafka核心API消费者
Kafka的Consumer消费者机制和分区策略讲解
消费者根据什么模式从broker获取数据的?
Pull 主动拉取
为什么是pull模式,而不是broker主动push?
消费者采用 pull 拉取方式,从broker的partition获取数据
pull 模式则可以根据 consumer 的消费能力进行自己调整,不同的消费者性能不一样
如果broker没有数据,consumer可以配置 timeout 时间,阻塞等待一段时间之后再返回
如果是broker主动push,优点是可以快速处理消息,但是容易造成消费者处理不过来,消息堆积和延迟。
Consumer消费者机制和分区策略讲解
顶层接口
org.apache.kafka.clients.consumer.internals.AbstractPartitionAssignor
round-robin (RoundRobinAssignor非默认策略)轮训
【按照消费者组】进行轮训分配,同个消费者组监听不同主题也一样,是把所有的 partition 和所有的 consumer 都列出来,
所以消费者组里面订阅的主题是一样的才行,主题不一样则会出现分配不均问题,例如7个分区,同组内2个消费者
topic-p0/topic-p1/topic-p2/topic-p3/topic-p4/topic-p5/topic-p6
c-1: topic-p0/topic-p2/topic-p4/topic-p6
c-2:topic-p1/topic-p3/topic-p5
弊端
如果同一消费者组内,所订阅的消息是不相同的,在执行分区分配的时候不是轮询分配,可能会导致分区分配的不均匀
有3个消费者C0、C1和C2,他们共订阅了 3 个主题:t0、t1 和 t2
t0有1个分区(p0),t1有2个分区(p0、p1),t2有3个分区(p0、p1、p2))
消费者C0订阅的是主题t0,消费者C1订阅的是主题t0和t1,消费者C2订阅的是主题t0、t1和t2
range (RangeAssignor默认策略)范围
【按照主题】进行分配,如果不平均分配,则第一个消费者会分配比较多分区, 一个消费者监听不同主题也不影响,例如7个分区,同组内2个消费者
topic-p0/topic-p1/topic-p2/topic-p3/topic-p4/topic-p5//topic-p6
c-1: topic-p0/topic-p1/topic-p2/topic-p3
c-2:topic-p4/topic-p5/topic-p6
弊端
只是针对 1 个 topic 而言,c-1多消费一个分区影响不大
如果有 N 多个 topic,那么针对每个 topic,消费者 C-1 都将多消费 1 个分区,topic越多则消费的分区也越多,则性能有所下降
Kafka的高性能原理
kafka高性能
1.存储模型,topic多分区,每个分区多segment段
2.index索引文件查找,利用分段和稀疏索引
3.磁盘顺序写入
4.异步操作少阻塞sender和main线程,批量操作(batch)
5.页缓存Page cache,没利用JVM内存,因为容易GC影响性能
6.零拷贝ZeroCopy(SendFile)
Kafka的高性能原理分析-ZeroCopy
零拷贝ZeroCopy(SendFile)
例子:将一个File读取并发送出去(Linux有两个上下文,内核态,用户态)
File文件的经历了4次copy
调用read,将文件拷贝到了kernel内核态
CPU控制 kernel态的数据copy到用户态
调用write时,user态下的内容会copy到内核态的socket的buffer中
最后将内核态socket buffer的数据copy到网卡设备中传送
缺点:增加了上下文切换、浪费了2次无效拷贝(即步骤2和3)
定义
ZeroCopy零拷贝:
请求kernel直接把disk的data传输给socket,而不是通过应用程序传输。Zero copy大大提高了应用程序的性能,
减少不必要的内核缓冲区跟用户缓冲区间的拷贝,从而减少CPU的开销和减少了kernel和user模式的上下文切换,达到性能的提升
对应零拷贝技术有mmap及sendfile
mmap:小文件传输快
sendfile:大文件传输比mmap快
原理图
Springboot项目整合spring-kafka
生产者code demo
@Autowired
private KafkaTemplate<String, Object> kafkaTemplate;
/**
* 发送消息
* @param phone
*/
@GetMapping("/api/user/{phone}")
public void sendMessage1(@PathVariable("phone") String phone) {
kafkaTemplate.send(TOPIC_NAME, phone).addCallback(success -> {
// 消息发送到的topic
String topic = success.getRecordMetadata().topic();
// 消息发送到的分区
int partition = success.getRecordMetadata().partition();
// 消息在分区内的offset
long offset = success.getRecordMetadata().offset();
System.out.println("发送消息成功:" + topic + "-" + partition + "-" + offset);
}, failure -> {
System.out.println("发送消息失败:" + failure.getMessage());
});
}
消费者code demo
@Component
public class MQListener {
/**
* 消费监听
* @param record
*/
@KafkaListener(topics = {"user.register.topic"},groupId = "xdlcass-test-gp")
public void onMessage1(ConsumerRecord<?, ?> record, Acknowledgment ack, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic){
// 打印出消息内容
System.out.println("消费:"+record.topic()+"-"+record.partition()+"-"+record.value());
ack.acknowledge();
}
}
Kafka事务消息
Kafka 从 0.11 版本开始引入了事务支持
事务可以保证对多个分区写入操作的原子性
操作的原子性是指多个操作要么全部成功,要么全部失败,不存在部分成功、部分失败的可能
code 实现demo
注解方式的事务
/**
* 注解方式的事务
* @param i
*/
@GetMapping("/kafka/transaction1")
@Transactional(rollbackFor = RuntimeException.class)
public void sendMessage1(int i) {
kafkaTemplate.send(TOPIC_NAME, "这个是事务里面的消息:1 i="+i);
if (i == 0) {
throw new RuntimeException("fail");
}
kafkaTemplate.send(TOPIC_NAME, "这个是事务里面的消息:2 i="+i);
}
现象: 当模拟异常i=0时,第一和第二条消息都发送不成功
声明式事务支持
/**
* 声明式事务支持
* @param i
*/
@GetMapping("/kafka/transaction2")
public void sendMessage2(int i) {
kafkaTemplate.executeInTransaction(new KafkaOperations.OperationsCallback() {
@Override
public Object doInOperations(KafkaOperations kafkaOperations) {
kafkaOperations.send(TOPIC_NAME,"这个是事务里面的消息:1 i="+i);
if(i==0)
{
throw new RuntimeException("input is error");
}
kafkaOperations.send(TOPIC_NAME,"这个是事务里面的消息:2 i="+i);
return true;
}
});
}
同理现象: 当模拟异常i=0时,第一和第二条消息都发送不成功