当前位置: 首页>编程语言>正文

kafka exporter grafana 问题 kafka factor

kafka概述

消息中间件对比

特性

ActiveMQ

RabbitMQ

RocketMQ

Kafka

开发语言

java

erlang

java

scala

单机吞吐量

万级

万级

10万级

100万级

时效性

ms

us

ms

ms级以内

可用性

高(主从)

高(主从)

非常高(分布式)

非常高(分布式)

功能特性

成熟的产品、较全的文档、各种协议支持好

并发能力强、性能好、延迟低

MQ功能比较完善,扩展性佳

只支持主要的MQ功能,主要应用于大数据领域

消息中间件对比-选择建议

消息中间件

建议

Kafka

追求高吞吐量,适合产生大量数据的互联网服务的数据收集业务

RocketMQ

可靠性要求很高的金融互联网领域,稳定性高,经历了多次阿里双11考验

RabbitMQ

性能较好,社区活跃度高,数据量没有那么大,优先选择功能比较完备的RabbitMQ

kafka介绍

Kafka 是一个分布式流媒体平台,类似于消息队列或企业消息传递系统。kafka官网:Apache Kafka

kafka exporter grafana 问题 kafka factor,kafka exporter grafana 问题 kafka factor_kafka,第1张

 

 

名词解释

kafka exporter grafana 问题 kafka factor,kafka exporter grafana 问题 kafka factor_偏移量_02,第2张

  • producer:发布消息的对象称之为主题生产者(Kafka topic producer)
  • topic:Kafka将消息分门别类,每一类的消息称之为一个主题(Topic)
  • consumer:订阅消息并处理发布的消息的对象称之为主题消费者(consumers)
  • broker:已发布的消息保存在一组服务器中,称之为Kafka集群。集群中的每一个服务器都是一个代理(Broker)。 消费者可以订阅一个或多个主题(topic),并从Broker拉数据,从而消费这些已发布的消息。

kafka安装配置

Kafka对于zookeeper是强依赖,保存kafka相关的节点数据,所以安装Kafka之前必须先安装zookeeper

①:Docker安装zookeeper

下载镜像:

docker pull zookeeper:3.4.14

 创建容器

docker run -d --name zookeeper -p 2181:2181 zookeeper:3.4.14

②:Docker安装kafka

下载镜像:

docker pull wurstmeister/kafka:2.12-2.3.1

创建容器

docker run -d --name kafka \
--env KAFKA_ADVERTISED_HOST_NAME=192.168.200.130 \
--env KAFKA_ZOOKEEPER_CONNECT=192.168.200.130:2181 \
--env KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.200.130:9092 \
--env KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 \
--env KAFKA_HEAP_OPTS="-Xmx256M -Xms256M" \
--net=host wurstmeister/kafka:2.12-2.3.1

依赖:

<!-- kafka -->
    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
        <exclusions>
            <exclusion>
                <groupId>org.apache.kafka</groupId>
                <artifactId>kafka-clients</artifactId>
            </exclusion>
        </exclusions>
    </dependency>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
    </dependency>

配置文件:

server:
  port: 9991
spring:
  application:
    name: kafka-demo
  kafka:
    bootstrap-servers: 192.168.200.130:9092

简单示例:

消息生产者

@RestController
public class HelloController {

    @Autowired
    private KafkaTemplate<String,String> kafkaTemplate;

    @GetMapping("/hello")
    public String hello(){
        kafkaTemplate.send("topic","hello");
        return "ok";
    }
}

消息消费者:

@Component
public class HelloListener {

    @KafkaListener(topics = "topic")
    public void onMessage(String message){
        if(!StringUtils.isEmpty(message)){
            System.out.println(message);
        }

    }
}

生产者发送类型及参数配置

单消费者(Queue模型)

一个生产者,多个消费者,但只有第一个消费者能收到消息。

消息生产者

@RestController
public class HelloController {

    @Autowired
    private KafkaTemplate<String,String> kafkaTemplate;

    @GetMapping("/hello")
    public String hello(){
        kafkaTemplate.send("topic","hello");
        return "ok";
    }
}

消息消费者(编写了两个消费者方法,启动测试后发现只有第一个方法能收到消息)

@Component
public class HelloListener {

    @KafkaListener(topics = "topic")
    public void onMessage(String message){
        System.out.println("消费者1");
        System.out.println(message);
    }
    
    @KafkaListener(topics = "topic")
    public void onMessage2(String message){
        System.out.println("消费者2");
        System.out.println(message);
    }
}

多消费者(发布订阅模型)

一个生产者,多个消费者,多个消费者都要收到消息

kafka exporter grafana 问题 kafka factor,kafka exporter grafana 问题 kafka factor_spring_03,第3张

 多个消费者,只要保证不存在同一个群组中,就可以同时收到生产者发送的消息。

@Component
public class TestListener {

    /**
     * 消费者1
     *
     * @param message 收到的消息
     */
    @KafkaListener(topics = "abc-topic", groupId = "group101")
    public void listener(String message) {
        System.out.println(message);
    }

    /**
     * 消费者2
     *
     * @param message 收到的消息
     */
    @KafkaListener(topics = "abc-topic", groupId = "group102")
    public void listener2(String message) {
        System.out.println(message);
    }

}

同步发送

将消息成功发送给Kafka后,接收KafKa给的回执;

在消息安全性要求比较高的场景中使用,防止消息丢失;就是在后面加个get().会报红,可以咆哮虎一场或者try catch

@RestController
public class HelloController {

    @Autowired
    private KafkaTemplate<String,String> kafkaTemplate;

    @GetMapping("/hello")
    public String hello(){
        kafkaTemplate.send("topic","hello").get();
        return "ok";
    }
}

常用配置

通用配置

配置

参考值

说明

spring.kafka.bootstrap-servers

192.168.200.130:9200,192.168.200.130:9300

kafka服务ip (多个地址用,隔开)

spring.kafaka.listener.ack-mode

manual

开启消费者回执功能

生产者配置

配置

参考值

说明

spring.producer.retries

10

失败重试次数

spring.producer.value-serializer

org.apache.kafka.common.serialization.StringSerializer

消息value序列化

spring.producer.key-serializer

org.apache.kafka.common.serialization.StringSerializer

消息key序列化

spring.producer.compression-type

gzip

数据压缩的类型。默认为空(不压缩)。有效的值有 none,gzip,snappy, 或 lz4。

压缩算法

说明

snappy

占用较少的 CPU, 却能提供较好的性能和相当可观的压缩比, 如果看重性能和网络带宽,建议采用

lz4

占用较少的 CPU, 压缩和解压缩速度较快,压缩比也很客观

gzip

占用较多的 CPU,但会提供更高的压缩比,网络带宽有限,可以使用这种算法

消费者配置

配置

参考值

说明

spring.consumer.group-id

${spring.application.name}-test

默认分组id

spring.consumer.value-deserializer

org.apache.kafka.common.serialization.StringDeserializer

消息value反序列化

spring.consumer.key-deserializer

org.apache.kafka.common.serialization.StringDeserializer

消息key反序列化

spring.kafka.consumer.enable-auto-commit

true(默认)

是否自动提交偏移量

常用配置

通用配置

配置

参考值

说明

spring.kafka.bootstrap-servers

192.168.200.130:9200,192.168.200.130:9300

kafka服务ip (多个地址用,隔开)

spring.kafaka.listener.ack-mode

manual

开启消费者回执功能

生产者配置

配置

参考值

说明

spring.producer.retries

10

失败重试次数

spring.producer.value-serializer

org.apache.kafka.common.serialization.StringSerializer

消息value序列化

spring.producer.key-serializer

org.apache.kafka.common.serialization.StringSerializer

消息key序列化

spring.producer.compression-type

gzip

数据压缩的类型。默认为空(不压缩)。有效的值有 none,gzip,snappy, 或 lz4。

压缩算法

说明

snappy

占用较少的 CPU, 却能提供较好的性能和相当可观的压缩比, 如果看重性能和网络带宽,建议采用

lz4

占用较少的 CPU, 压缩和解压缩速度较快,压缩比也很客观

gzip

占用较多的 CPU,但会提供更高的压缩比,网络带宽有限,可以使用这种算法

消费者配置

配置

参考值

说明

spring.consumer.group-id

${spring.application.name}-test

默认分组id

spring.consumer.value-deserializer

org.apache.kafka.common.serialization.StringDeserializer

消息value反序列化

spring.consumer.key-deserializer

org.apache.kafka.common.serialization.StringDeserializer

消息key反序列化

spring.kafka.consumer.enable-auto-commit

true(默认)

是否自动提交偏移量


消费者的消息特点及偏移量提交方式

5.1)概述

Broker消息是有序的,每条消息有自己的编号;当消费者消费一条消息后,会给Broker发送回执,Broker会记录消息消费进度,这个消费进度就称为偏移量

kafka exporter grafana 问题 kafka factor,kafka exporter grafana 问题 kafka factor_spring_04,第4张

 

偏移量提交方式

提交偏移量的方式有两种,分别是自动提交偏移量和手动提交

kafka exporter grafana 问题 kafka factor,kafka exporter grafana 问题 kafka factor_偏移量_05,第5张

 

方式一:自动提交偏移量

当 spring.kafka.consumer.enable-auto-commit 被设置为true,提交方式就是让消费者自动提交偏移量,每隔5秒消费者会自动把从poll()方法接收的最大偏移量提交上去。

方式二:手动提交

当 spring.kafka.consumer.enable-auto-commit 被设置为false,可以用以下方法进行偏移量提交

①:开启回执模式并关闭自动提交

spring:
  kafka:
    consumer:
      enable-auto-commit: false
    listener:
      ack-mode: manual

 ②:改造消费者

@Component
public class TestListener {

    @KafkaListener(topics = "topic-1")
    public void onMessage2(ConsumerRecord<String, String> record, Acknowledgment acknowledgment) {
        System.out.println(record.value());
        acknowledgment.acknowledge();
    }

}

kafka高可用设计

集群

  • Kafka 的服务器端由被称为 Broker 的服务进程构成,即一个 Kafka 集群由多个 Broker 组成。
  • 这样如果集群中某一台机器宕机,其他机器上的 Broker 也依然能够对外提供服务,这其实就是 Kafka 提供高可用的主要手段之一。

kafka exporter grafana 问题 kafka factor,kafka exporter grafana 问题 kafka factor_rabbitmq_06,第6张

 

 

备份机制(Replication)

6.2.1)副本的概念

Kafka 中消息的备份又叫做副本

Kafka 定义了两类副本:

  • 领导者副本(Leader Replica)
  • 追随者副本(Follower Replica)

kafka exporter grafana 问题 kafka factor,kafka exporter grafana 问题 kafka factor_kafka_07,第7张

 

 

副本间的数据同步方式

追随者副本分为两种:

  • ISR(in-sync replica):需要同步复制保存的follower
  • 普通副本:异步复制保存的follower

kafka exporter grafana 问题 kafka factor,kafka exporter grafana 问题 kafka factor_kafka_08,第8张

 

 

)副本的选举规则

如果leader失效后,需要选出新的leader,选举的原则如下:

第一:选举时优先从ISR中选定,因为这个列表中follower的数据是与leader同步的

第二:如果ISR列表中的follower都不行了,就只能从其他follower中选取

极端情况,就是所有副本都失效了,这时有两种方案

第一:等待ISR中的一个活过来,选为Leader,数据可靠,但活过来的时间不确定

第二:选择第一个活过来的Replication,不一定是ISR中的,选为leader,以最快速度恢复可用性,但数据不一定完整

 


https://www.xamrdz.com/lan/59z1937643.html

相关文章: