当前位置: 首页>后端>正文

Kafka篇

一、单体模式

1 基础概念

1.1发布-订阅模式

producer推送消息到broker (集群的话有多个broker服务器,其中有leader和Follower) consumer只需要订阅topic 从broker拉取消息,不必在意是从哪个partion里来的
topic, partition
topic、partition: 一条消息会属于某个topic,一个topic可能会有多个partition,每个partition数据有序,若有多个,所有加一起无序; topic类似于数据库的表,partion类似于一部分表数据
保证顺序: 如果要保证消息顺序,需要将partition设置为1
数据保存: 每个partition使用多个segment文件进行存储
消费组:每个Consumer属于一个特定的Consumer Group(可为每个Consumer指定group name,若不指定group name则属于默认的group)

Kafka篇,第1张
基础架构.png

消息停留时间:消息会在broker中停留一段时间,不会一次消费后就删除
1.2 生产者producer的消息发送及确认机制
Kafka篇,第2张
确认机制.png
1.3 三种发送机制
  • 1.发送并忘记(fire-and-forget)
    不关心是否发送到broker,虽然会重发,但是还是会出现消息丢失 ,不推荐
  • 2.同步发送
    使用send()方法发送,返回一个Future对象,调用get()方法进行等待,就可以知道消息是否发送成功
 //发送消息
  try {
      RecordMetadata recordMetadata = producer.send(record).get();
      System.out.println(recordMetadata.offset());//获取偏移量
  }catch (Exception e){
      e.printStackTrace();
  }

如果服务器返回错误,get()方法会抛出异常,如果没有发生错误,我们就会得到一个RecordMetadata对象,可以用它来获取消息的偏移量

  • 3.异步发送
    调用send()方法,并指定一个回调函数,服务器在返回响应时调用函数。如下代码
 //发送消息
  try {
      producer.send(record, new Callback() {
          @Override
          public void onCompletion(RecordMetadata recordMetadata, Exception e) {
              if(e!=null){
                  e.printStackTrace();
              }
              System.out.println(recordMetadata.offset());
          }
      });
  }catch (Exception e){
      e.printStackTrace();
  }

如果kafka返回一个错误,onCompletion()方法会抛出一个非空(non null)异常,可以根据实际情况处理,比如记录错误日志,或者把消息写入“错误消息”文件中,方便后期进行分析。

acks=0 只发,不确认是否成功
acks=1 发送,获得首节点确认,如果未得到首节点确认,重发
acks=-1(all) 获得所有节点确认
retries

1.4 生产者消费者代码如下

1.4.1 生产者
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
/**
 * Kafka 消息生产者
 */
public class ProducerFastStart {
    // Kafka集群地址
    private static final String brokerList = "192.168.226.134:9092";
    // 主题名称-之前已经创建
    private static final String topic = "kafka-hello";
    public static void main(String[] args) {
        Properties properties = new Properties();
        // 设置key序列化器
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        // 设置重试次数
        properties.put(ProducerConfig.RETRIES_CONFIG, 10);
        // 设置值序列化器
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        // 设置集群地址
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
        // KafkaProducer
        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);
        //封装消息
        ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic, "Kafka-demo-001", "hello, Kafka!");
        try {
            //发送消息
            producer.send(record);
        } catch (Exception e) {
            e.printStackTrace();
        }
        producer.close();
    }
}
1.4.2 消费者
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
/**
 * Kafka 消息消费者
 */
public class ConsumerFastStart {
    // Kafka集群地址
    private static final String brokerList = "192.168.200.130:9092";
    // 主题名称-之前已经创建
    private static final String topic = "kafka-hello";
    // 消费组
    private static final String groupId = "group.demo1";

    public static void main(String[] args) {
        Properties properties = new Properties();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
        consumer.subscribe(Collections.singletonList(topic));
        while (true) {
//指定的1秒是等待消息超时时间
//假设消费者能收到一条消息,kafka会直接将一条消息发送给消费者,这样就保证kafka及时性
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
            for (ConsumerRecord<String, String> record : records) {
                System.out.println(record.value());
            }
        }
    }
}

1.5 重复消费

在 Kafka 消费的编程逻辑中位移提交是一大难点,自动提交消费位移的方式非常简便,它免去了复杂的位移提交逻辑,让编码更简洁。但随之而来的是重复消费和消息丢失的问题。假设刚提交完一次消费位移,然后拉取一批消息进行消费,在下一次自动提交消费位移之前,消费者崩溃了,那么又得从上一次位移提交的地方重新开始消费,这样便发生了重复消费的现象(对于再均衡的情况同样适用)。我们可以通过减小位移提交的时间间隔来减小重复消息的窗口大小,但这样并不能避免重复消费的发送,而且也会使位移提交更加频繁。

1.6 消息丢失

auto.commit.enable=true,消费端自动提交offersets设置为true,当消费者拉到消息之后,还没有处理完 commit interval 提交间隔就到了,提交了offersets。这时consummer又挂了,重启后,从下一个offersets开始消费,之前的消息丢失了。

1.7 Kafka避免消息丢失的解决方案:

(1)设置auto.commit.enable=false,每次处理完手动提交。确保消息真的被消费并处理完成。
(2)配置消息重试的机制,并且重试的时间间隔一定要长一些,默认 1 秒钟不符合生产环境(网络中断时间有可能超过 1秒)。
(3)配置多个副本,保证数据的完整性。
(4)合理设置flush间隔。kafka 的数据一开始就是存储在 PageCache 上的,定期 flush 到磁盘上的,也就是说,不是每个消息都被存储在磁盘了,如果出现断电或者机器故障等,PageCache 上的数据就丢。

二、集群模式

2.1 Broker

Kafka 集群包含一个或多个服务器,服务器节点称为broker。broker存储topic的数据。如果某topic有N个partition,集群有N个broker,那么每个broker存储该topic的一个partition。如果某topic有N个partition,集群有(N+M)个broker,那么其中有N个broker存储该topic的一个partition,剩下的M个broker不存储该topic的partition数据。如果某topic有N个partition,集群中broker数目少于N个,那么一个broker存储该topic的一个或多个partition。在实际生产环境中,尽量避免这种情况的发生,这种情况容易导致Kafka集群数据不均衡。

2.2 Follower 和 Leader

Follower跟随Leader,所有写请求都通过Leader路由,数据变更会广播给所有Follower,Follower与Leader保持数据同步。如果Leader失效,则从Follower中选举出一个新的Leader。当Follower与Leader挂掉、卡住或者同步太慢,leader会把这个follower从“in sync replicas”(ISR)列表中删除,重新创建一个Follower。

2.3 需要Zookeeper

Zookeeper负责维护和协调broker。当Kafka系统中新增了broker或者某个broker发生故障失效时,由ZooKeeper通知生产者和消费者。生产者和消费者依据Zookeeper的broker状态信息与broker协调数据的发布和订阅任务。

2.4 AR(Assigned Replicas)

分区中所有的副本统称为AR。

2.5 ISR(In-Sync Replicas)

所有与Leader部分保持一定程度的副(包括Leader副本在内)本组成ISR。

2.6 OSR(Out-of-Sync-Replicas)

与Leader副本同步滞后过多的副本。

2.7 HW(High Watermark)

高水位,标识了一个特定的offset,消费者只能拉取到这个offset之前的消息。

2.8 LEO(Log End Offset)

即日志末端位移(log end offset),记录了该副本底层日志(log)中下一条消息的位移值。注意是下一条消息!也就是说,如果LEO=10,那么表示该副本保存了10条消息,位移值范围是[0, 9]。

持续更新中。。。。


https://www.xamrdz.com/backend/3k31940654.html

相关文章: