一、单体模式
1 基础概念
1.1发布-订阅模式
producer推送消息到broker (集群的话有多个broker服务器,其中有leader和Follower) consumer只需要订阅topic 从broker拉取消息,不必在意是从哪个partion里来的
topic, partition
topic、partition: 一条消息会属于某个topic,一个topic可能会有多个partition,每个partition数据有序,若有多个,所有加一起无序; topic类似于数据库的表,partion类似于一部分表数据
保证顺序: 如果要保证消息顺序,需要将partition设置为1
数据保存: 每个partition使用多个segment文件进行存储
消费组:每个Consumer属于一个特定的Consumer Group(可为每个Consumer指定group name,若不指定group name则属于默认的group)
消息停留时间:消息会在broker中停留一段时间,不会一次消费后就删除
1.2 生产者producer的消息发送及确认机制
1.3 三种发送机制
- 1.发送并忘记(fire-and-forget)
不关心是否发送到broker,虽然会重发,但是还是会出现消息丢失 ,不推荐 - 2.同步发送
使用send()方法发送,返回一个Future对象,调用get()方法进行等待,就可以知道消息是否发送成功
//发送消息
try {
RecordMetadata recordMetadata = producer.send(record).get();
System.out.println(recordMetadata.offset());//获取偏移量
}catch (Exception e){
e.printStackTrace();
}
如果服务器返回错误,get()方法会抛出异常,如果没有发生错误,我们就会得到一个RecordMetadata对象,可以用它来获取消息的偏移量
- 3.异步发送
调用send()方法,并指定一个回调函数,服务器在返回响应时调用函数。如下代码
//发送消息
try {
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if(e!=null){
e.printStackTrace();
}
System.out.println(recordMetadata.offset());
}
});
}catch (Exception e){
e.printStackTrace();
}
如果kafka返回一个错误,onCompletion()方法会抛出一个非空(non null)异常,可以根据实际情况处理,比如记录错误日志,或者把消息写入“错误消息”文件中,方便后期进行分析。
acks=0 只发,不确认是否成功
acks=1 发送,获得首节点确认,如果未得到首节点确认,重发
acks=-1(all) 获得所有节点确认
retries
1.4 生产者消费者代码如下
1.4.1 生产者
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
/**
* Kafka 消息生产者
*/
public class ProducerFastStart {
// Kafka集群地址
private static final String brokerList = "192.168.226.134:9092";
// 主题名称-之前已经创建
private static final String topic = "kafka-hello";
public static void main(String[] args) {
Properties properties = new Properties();
// 设置key序列化器
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// 设置重试次数
properties.put(ProducerConfig.RETRIES_CONFIG, 10);
// 设置值序列化器
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// 设置集群地址
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
// KafkaProducer
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);
//封装消息
ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic, "Kafka-demo-001", "hello, Kafka!");
try {
//发送消息
producer.send(record);
} catch (Exception e) {
e.printStackTrace();
}
producer.close();
}
}
1.4.2 消费者
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
/**
* Kafka 消息消费者
*/
public class ConsumerFastStart {
// Kafka集群地址
private static final String brokerList = "192.168.200.130:9092";
// 主题名称-之前已经创建
private static final String topic = "kafka-hello";
// 消费组
private static final String groupId = "group.demo1";
public static void main(String[] args) {
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
properties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
consumer.subscribe(Collections.singletonList(topic));
while (true) {
//指定的1秒是等待消息超时时间
//假设消费者能收到一条消息,kafka会直接将一条消息发送给消费者,这样就保证kafka及时性
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
System.out.println(record.value());
}
}
}
}
1.5 重复消费
在 Kafka 消费的编程逻辑中位移提交是一大难点,自动提交消费位移的方式非常简便,它免去了复杂的位移提交逻辑,让编码更简洁。但随之而来的是重复消费和消息丢失的问题。假设刚提交完一次消费位移,然后拉取一批消息进行消费,在下一次自动提交消费位移之前,消费者崩溃了,那么又得从上一次位移提交的地方重新开始消费,这样便发生了重复消费的现象(对于再均衡的情况同样适用)。我们可以通过减小位移提交的时间间隔来减小重复消息的窗口大小,但这样并不能避免重复消费的发送,而且也会使位移提交更加频繁。
1.6 消息丢失
auto.commit.enable=true,消费端自动提交offersets设置为true,当消费者拉到消息之后,还没有处理完 commit interval 提交间隔就到了,提交了offersets。这时consummer又挂了,重启后,从下一个offersets开始消费,之前的消息丢失了。
1.7 Kafka避免消息丢失的解决方案:
(1)设置auto.commit.enable=false,每次处理完手动提交。确保消息真的被消费并处理完成。
(2)配置消息重试的机制,并且重试的时间间隔一定要长一些,默认 1 秒钟不符合生产环境(网络中断时间有可能超过 1秒)。
(3)配置多个副本,保证数据的完整性。
(4)合理设置flush间隔。kafka 的数据一开始就是存储在 PageCache 上的,定期 flush 到磁盘上的,也就是说,不是每个消息都被存储在磁盘了,如果出现断电或者机器故障等,PageCache 上的数据就丢。
二、集群模式
2.1 Broker
Kafka 集群包含一个或多个服务器,服务器节点称为broker。broker存储topic的数据。如果某topic有N个partition,集群有N个broker,那么每个broker存储该topic的一个partition。如果某topic有N个partition,集群有(N+M)个broker,那么其中有N个broker存储该topic的一个partition,剩下的M个broker不存储该topic的partition数据。如果某topic有N个partition,集群中broker数目少于N个,那么一个broker存储该topic的一个或多个partition。在实际生产环境中,尽量避免这种情况的发生,这种情况容易导致Kafka集群数据不均衡。
2.2 Follower 和 Leader
Follower跟随Leader,所有写请求都通过Leader路由,数据变更会广播给所有Follower,Follower与Leader保持数据同步。如果Leader失效,则从Follower中选举出一个新的Leader。当Follower与Leader挂掉、卡住或者同步太慢,leader会把这个follower从“in sync replicas”(ISR)列表中删除,重新创建一个Follower。
2.3 需要Zookeeper
Zookeeper负责维护和协调broker。当Kafka系统中新增了broker或者某个broker发生故障失效时,由ZooKeeper通知生产者和消费者。生产者和消费者依据Zookeeper的broker状态信息与broker协调数据的发布和订阅任务。
2.4 AR(Assigned Replicas)
分区中所有的副本统称为AR。
2.5 ISR(In-Sync Replicas)
所有与Leader部分保持一定程度的副(包括Leader副本在内)本组成ISR。
2.6 OSR(Out-of-Sync-Replicas)
与Leader副本同步滞后过多的副本。
2.7 HW(High Watermark)
高水位,标识了一个特定的offset,消费者只能拉取到这个offset之前的消息。
2.8 LEO(Log End Offset)
即日志末端位移(log end offset),记录了该副本底层日志(log)中下一条消息的位移值。注意是下一条消息!也就是说,如果LEO=10,那么表示该副本保存了10条消息,位移值范围是[0, 9]。