kafka概述
消息中间件对比
特性 | ActiveMQ | RabbitMQ | RocketMQ | Kafka |
开发语言 | java | erlang | java | scala |
单机吞吐量 | 万级 | 万级 | 10万级 | 100万级 |
时效性 | ms | us | ms | ms级以内 |
可用性 | 高(主从) | 高(主从) | 非常高(分布式) | 非常高(分布式) |
功能特性 | 成熟的产品、较全的文档、各种协议支持好 | 并发能力强、性能好、延迟低 | MQ功能比较完善,扩展性佳 | 只支持主要的MQ功能,主要应用于大数据领域 |
消息中间件对比-选择建议
消息中间件 | 建议 |
Kafka | 追求高吞吐量,适合产生大量数据的互联网服务的数据收集业务 |
RocketMQ | 可靠性要求很高的金融互联网领域,稳定性高,经历了多次阿里双11考验 |
RabbitMQ | 性能较好,社区活跃度高,数据量没有那么大,优先选择功能比较完备的RabbitMQ |
kafka介绍
Kafka 是一个分布式流媒体平台,类似于消息队列或企业消息传递系统。kafka官网:Apache Kafka
名词解释
- producer:发布消息的对象称之为主题生产者(Kafka topic producer)
- topic:Kafka将消息分门别类,每一类的消息称之为一个主题(Topic)
- consumer:订阅消息并处理发布的消息的对象称之为主题消费者(consumers)
- broker:已发布的消息保存在一组服务器中,称之为Kafka集群。集群中的每一个服务器都是一个代理(Broker)。 消费者可以订阅一个或多个主题(topic),并从Broker拉数据,从而消费这些已发布的消息。
kafka安装配置
Kafka对于zookeeper是强依赖,保存kafka相关的节点数据,所以安装Kafka之前必须先安装zookeeper
①:Docker安装zookeeper
下载镜像:
docker pull zookeeper:3.4.14
创建容器
docker run -d --name zookeeper -p 2181:2181 zookeeper:3.4.14
②:Docker安装kafka
下载镜像:
docker pull wurstmeister/kafka:2.12-2.3.1
创建容器
docker run -d --name kafka \
--env KAFKA_ADVERTISED_HOST_NAME=192.168.200.130 \
--env KAFKA_ZOOKEEPER_CONNECT=192.168.200.130:2181 \
--env KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.200.130:9092 \
--env KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 \
--env KAFKA_HEAP_OPTS="-Xmx256M -Xms256M" \
--net=host wurstmeister/kafka:2.12-2.3.1
依赖:
<!-- kafka -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<exclusions>
<exclusion>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
</dependency>
配置文件:
server:
port: 9991
spring:
application:
name: kafka-demo
kafka:
bootstrap-servers: 192.168.200.130:9092
简单示例:
消息生产者
@RestController
public class HelloController {
@Autowired
private KafkaTemplate<String,String> kafkaTemplate;
@GetMapping("/hello")
public String hello(){
kafkaTemplate.send("topic","hello");
return "ok";
}
}
消息消费者:
@Component
public class HelloListener {
@KafkaListener(topics = "topic")
public void onMessage(String message){
if(!StringUtils.isEmpty(message)){
System.out.println(message);
}
}
}
生产者发送类型及参数配置
单消费者(Queue模型)
一个生产者,多个消费者,但只有第一个消费者能收到消息。
消息生产者
@RestController
public class HelloController {
@Autowired
private KafkaTemplate<String,String> kafkaTemplate;
@GetMapping("/hello")
public String hello(){
kafkaTemplate.send("topic","hello");
return "ok";
}
}
消息消费者(编写了两个消费者方法,启动测试后发现只有第一个方法能收到消息)
@Component
public class HelloListener {
@KafkaListener(topics = "topic")
public void onMessage(String message){
System.out.println("消费者1");
System.out.println(message);
}
@KafkaListener(topics = "topic")
public void onMessage2(String message){
System.out.println("消费者2");
System.out.println(message);
}
}
多消费者(发布订阅模型)
一个生产者,多个消费者,多个消费者都要收到消息
多个消费者,只要保证不存在同一个群组中,就可以同时收到生产者发送的消息。
@Component
public class TestListener {
/**
* 消费者1
*
* @param message 收到的消息
*/
@KafkaListener(topics = "abc-topic", groupId = "group101")
public void listener(String message) {
System.out.println(message);
}
/**
* 消费者2
*
* @param message 收到的消息
*/
@KafkaListener(topics = "abc-topic", groupId = "group102")
public void listener2(String message) {
System.out.println(message);
}
}
同步发送
将消息成功发送给Kafka后,接收KafKa给的回执;
在消息安全性要求比较高的场景中使用,防止消息丢失;就是在后面加个get().会报红,可以咆哮虎一场或者try catch
@RestController
public class HelloController {
@Autowired
private KafkaTemplate<String,String> kafkaTemplate;
@GetMapping("/hello")
public String hello(){
kafkaTemplate.send("topic","hello").get();
return "ok";
}
}
常用配置
通用配置
配置 | 参考值 | 说明 |
spring.kafka.bootstrap-servers | 192.168.200.130:9200,192.168.200.130:9300 | kafka服务ip (多个地址用,隔开) |
spring.kafaka.listener.ack-mode | manual | 开启消费者回执功能 |
生产者配置
配置 | 参考值 | 说明 |
spring.producer.retries | 10 | 失败重试次数 |
spring.producer.value-serializer | org.apache.kafka.common.serialization.StringSerializer | 消息value序列化 |
spring.producer.key-serializer | org.apache.kafka.common.serialization.StringSerializer | 消息key序列化 |
spring.producer.compression-type | gzip | 数据压缩的类型。默认为空(不压缩)。有效的值有 none,gzip,snappy, 或 lz4。 |
压缩算法 | 说明 |
snappy | 占用较少的 CPU, 却能提供较好的性能和相当可观的压缩比, 如果看重性能和网络带宽,建议采用 |
lz4 | 占用较少的 CPU, 压缩和解压缩速度较快,压缩比也很客观 |
gzip | 占用较多的 CPU,但会提供更高的压缩比,网络带宽有限,可以使用这种算法 |
消费者配置
配置 | 参考值 | 说明 |
spring.consumer.group-id | ${spring.application.name}-test | 默认分组id |
spring.consumer.value-deserializer | org.apache.kafka.common.serialization.StringDeserializer | 消息value反序列化 |
spring.consumer.key-deserializer | org.apache.kafka.common.serialization.StringDeserializer | 消息key反序列化 |
spring.kafka.consumer.enable-auto-commit | true(默认) | 是否自动提交偏移量 |
常用配置
通用配置
配置 | 参考值 | 说明 |
spring.kafka.bootstrap-servers | 192.168.200.130:9200,192.168.200.130:9300 | kafka服务ip (多个地址用,隔开) |
spring.kafaka.listener.ack-mode | manual | 开启消费者回执功能 |
生产者配置
配置 | 参考值 | 说明 |
spring.producer.retries | 10 | 失败重试次数 |
spring.producer.value-serializer | org.apache.kafka.common.serialization.StringSerializer | 消息value序列化 |
spring.producer.key-serializer | org.apache.kafka.common.serialization.StringSerializer | 消息key序列化 |
spring.producer.compression-type | gzip | 数据压缩的类型。默认为空(不压缩)。有效的值有 none,gzip,snappy, 或 lz4。 |
压缩算法 | 说明 |
snappy | 占用较少的 CPU, 却能提供较好的性能和相当可观的压缩比, 如果看重性能和网络带宽,建议采用 |
lz4 | 占用较少的 CPU, 压缩和解压缩速度较快,压缩比也很客观 |
gzip | 占用较多的 CPU,但会提供更高的压缩比,网络带宽有限,可以使用这种算法 |
消费者配置
配置 | 参考值 | 说明 |
spring.consumer.group-id | ${spring.application.name}-test | 默认分组id |
spring.consumer.value-deserializer | org.apache.kafka.common.serialization.StringDeserializer | 消息value反序列化 |
spring.consumer.key-deserializer | org.apache.kafka.common.serialization.StringDeserializer | 消息key反序列化 |
spring.kafka.consumer.enable-auto-commit | true(默认) | 是否自动提交偏移量 |
消费者的消息特点及偏移量提交方式
5.1)概述
Broker消息是有序的,每条消息有自己的编号;当消费者消费一条消息后,会给Broker发送回执,Broker会记录消息消费进度,这个消费进度就称为偏移量。
偏移量提交方式
提交偏移量的方式有两种,分别是自动提交偏移量和手动提交
方式一:自动提交偏移量
当 spring.kafka.consumer.enable-auto-commit 被设置为true,提交方式就是让消费者自动提交偏移量,每隔5秒消费者会自动把从poll()方法接收的最大偏移量提交上去。
方式二:手动提交
当 spring.kafka.consumer.enable-auto-commit 被设置为false,可以用以下方法进行偏移量提交
①:开启回执模式并关闭自动提交
spring:
kafka:
consumer:
enable-auto-commit: false
listener:
ack-mode: manual
②:改造消费者
@Component
public class TestListener {
@KafkaListener(topics = "topic-1")
public void onMessage2(ConsumerRecord<String, String> record, Acknowledgment acknowledgment) {
System.out.println(record.value());
acknowledgment.acknowledge();
}
}
kafka高可用设计
集群
- Kafka 的服务器端由被称为 Broker 的服务进程构成,即一个 Kafka 集群由多个 Broker 组成。
- 这样如果集群中某一台机器宕机,其他机器上的 Broker 也依然能够对外提供服务,这其实就是 Kafka 提供高可用的主要手段之一。
备份机制(Replication)
6.2.1)副本的概念
Kafka 中消息的备份又叫做副本
Kafka 定义了两类副本:
- 领导者副本(Leader Replica)
- 追随者副本(Follower Replica)
副本间的数据同步方式
追随者副本分为两种:
- ISR(in-sync replica):需要同步复制保存的follower
- 普通副本:异步复制保存的follower
)副本的选举规则
如果leader失效后,需要选出新的leader,选举的原则如下:
第一:选举时优先从ISR中选定,因为这个列表中follower的数据是与leader同步的
第二:如果ISR列表中的follower都不行了,就只能从其他follower中选取
极端情况,就是所有副本都失效了,这时有两种方案
第一:等待ISR中的一个活过来,选为Leader,数据可靠,但活过来的时间不确定
第二:选择第一个活过来的Replication,不一定是ISR中的,选为leader,以最快速度恢复可用性,但数据不一定完整