当前位置: 首页>编程语言>正文

Kafka开发实战

1. 消息的发送与接收

生产者主要的对象有: **KafkaProducer , ProducerRecord 。 其中 KafkaProducer 是用于发送消息的类, ProducerRecord 类用于封装Kafka的消息。 KafkaProducer 的创建需要指定的参数和含义:

参数

说明

bootstrap.servers

配置生产者如何与broker建立连接。该参数设置的是初始化参数。如果生产者需要连接的是Kafka集群,则这里配置集群中几个broker的地址,而不是全部,当生产者连接上此处指定的broker之后,在通过该连接发现集群中的其他节点

key.serializer

要发送信息的key数据的序列化类。

value.serializer

要发送消息的alue数据的序列化类

acks

默认值:all会等待所有的副本分区确认记录,  acks=0生产者不等待broker对消息的确认,只要将消息放到缓冲区,就认为消息已经发送完成,acks=1 表示消息只需要写到主分区即可,然后就响应客户端,而不等待副本分区的确认

retries

retries重试次数

其他参数可以从 org.apache.kafka.clients.producer.ProducerConfig 中找到. 消费者生产消息后,需要broker端的确认,可以同步确认,也可以异步确认。 同步确认效率低,异步确认效率高,但是需要设置回调对象。

1.1. 创建kafka-01-producer-consumer项目

创建kafka-01-producer-consumer项目,并在pom.xml添加依赖

<dependencies>
    <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>3.7.0</version>
    </dependency>

</dependencies>

1.2. 生产者

步骤如下:

  • 1.创建用于连接Kafka的Properties配置
  • 2.创建一个生产者对象KafkaProducer

  1. 发送1-100的消息到指定的topic中

  1. 5.关闭生产者 代码如下:

public class KafkaProducer1 {
    public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {


        Properties props = new Properties();
        // 设置连接Kafka的初始连接用到的服务器地址
        // 如果是集群,则可以通过此初始连接发现集群中的其他broker
        props.put("bootstrap.servers", "kafka1:9092,kafka2:9092,kafka3:9092");


        //        acks=1 表示消息只需要写到主分区即可,然后就响应客户端,而不等待副本分区的 确认。
        //acks=all首领分区会等待所有的ISR副本分区确认记录。该处理保证了只要有一个ISR副本分区存活,消息就不会丢失。
        //这是Kafka最强的可靠性保证,等效于 acks=-1
        props.put("acks", "all");
        // 设置key的序列化器
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        // 设置value的序列化器
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        // 2. 创建一个生产者对象KafkaProducer
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String,
                String>(props);
        // 3. 发送1-100的消息到指定的topic中
        for(int i = 0; i < 100; ++i) {
            // 一、使用同步等待的方式发送消息
            // // 构建一条消息,直接new ProducerRecord
            // ProducerRecord<String, String> producerRecord = new ProducerRecord<>("test", null, i + "");
            // Future<RecordMetadata> future = kafkaProducer.send(producerRecord);
            // // 调用Future的get方法等待响应
            // future.get();
            // System.out.println("第" + i + "条消息写入成功!");

            // 二、使用异步回调的方式发送消息
            ProducerRecord<String, String> producerRecord = new ProducerRecord<>("test", null, i + "");
            kafkaProducer.send(producerRecord, new Callback() {
                @Override
                public void onCompletion(RecordMetadata metadata, Exception exception) {
                    // 1. 判断发送消息是否成功
                    if(exception == null) {
                        // 发送成功
                        // 主题
                        String topic = metadata.topic();
                        // 分区id
                        int partition = metadata.partition();
                        // 偏移量
                        long offset = metadata.offset();
                        System.out.println("topic:" + topic + " 分区id:" + partition + " 偏移量:" + offset);
                    }
                    else {
                        // 发送出现错误
                        System.out.println("生产消息出现异常!");
                        // 打印异常消息
                        System.out.println(exception.getMessage());
                        // 打印调用栈
                        System.out.println(exception.getStackTrace());
                    }
                }
            });
        }

        // 4.关闭生产者
        kafkaProducer.close();
    }
}

1.3. 消费者

  • 1.创建Kafka消费者配置
  • 2.创建Kafka消费者
  • 3.订阅要消费的主题
  • 4.使用一个while循环,不断从Kafka的topic中拉取消息
public class KafkaConsumer1 {
    public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {




        // 1.创建Kafka消费者配置
        Properties props = new Properties();
        props.setProperty("bootstrap.servers", "kafka1:9092,kafka2:9092,kafka3:9092");
        // 消费者组(可以使用消费者组将若干个消费者组织到一起),共同消费Kafka中topic的数据
        // 每一个消费者需要指定一个消费者组,如果消费者的组名是一样的,表示这几个消费者是一个组中的
        props.setProperty("group.id", "test");
        // 自动提交offset
        props.setProperty("enable.auto.commit", "true");
        // 自动提交offset的时间间隔
        props.setProperty("auto.commit.interval.ms", "1000");
        // 拉取的key、value数据的
        props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        // 2.创建Kafka消费者
        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(props);

        // 3. 订阅要消费的主题
        // 指定消费者从哪个topic中拉取数据
        kafkaConsumer.subscribe(Arrays.asList("test"));

        // 4.使用一个while循环,不断从Kafka的topic中拉取消息
        while(true) {
            // Kafka的消费者一次拉取一批的数据
            ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(5));
            // 5.将将记录(record)的offset、key、value都打印出来
            for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
                // 主题
                String topic = consumerRecord.topic();
                // offset:这条消息处于Kafka分区中的哪个位置
                long offset = consumerRecord.offset();
                // key\value
                String key = consumerRecord.key();
                String value = consumerRecord.value();

                System.out.println("topic: " + topic + " offset:" + offset + " key:" + key + " value:" + value);
            }
            Thread.sleep(1000);
        }
    }
}

编写上面代码之后,大家有没有发现一个问题,我们没有使用SpringBoot进行开发,那么接下来就是要SprigBoot开发

2.SpringBoot Kafka实战

2.1 创建kafka-02-springboot项目

kafka-02-springboot依赖如下

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>spring-boot-starter-parent</artifactId>
        <groupId>org.springframework.boot</groupId>
        <version>2.7.18</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>kafka-02-springboot</artifactId>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
            <version>2.7.18</version>
        </dependency>

        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>
    </dependencies>

</project>

2.2 添加配置项

image

application.yml添加kafka配置

server:
  port: 8899
spring:
  application:
    name: kafka-springboot
  kafka:
    producer:
      # producer用到的key和value的序列化类
      key-serializer: org.apache.kafka.common.serialization.IntegerSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      batch-size: 33554432
    # 用于建立初始连接的broker地址
    bootstrap-servers: kafka1:9092,kafka2:9092,kafka3:9092
    consumer:
    ## consumer用到的key和value的反序列化类
      key-deserializer: org.apache.kafka.common.serialization.IntegerDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      group-id: springboot-consumer02
      # 如果在kafka中找不到当前消费者的偏移量,则直接将偏移量重置为最早的
      auto-offset-reset: earliest
      # 消费者的偏移量是自动提交还是手动提交,此处自动提交偏移量
      enable-auto-commit: true
      # 消费者偏移量自动提交的时间间隔
      auto-commit-interval: 1000

2.3. 创建Kafka配置类

@Configuration
public class KafkaConfig {


    @Bean
    public NewTopic topic1() {
        return new NewTopic("zbbmeta-01", 3, (short) 1);
    }


    @Bean
    public KafkaAdmin kafkaAdmin() {
        Map<String, Object> configs = new HashMap<>();
        configs.put("bootstrap.servers", "kafka1:9092,kafka2:9092,kafka3:9092");
        KafkaAdmin admin = new KafkaAdmin(configs);
        return admin;
    }

    @Bean
    @Autowired
    public KafkaTemplate<Integer, String> kafkaTemplate(ProducerFactory<Integer, String> producerFactory) {

        // 覆盖ProducerFactory原有设置
        Map<String, Object> configsOverride = new HashMap<>();
        configsOverride.put(ProducerConfig.BATCH_SIZE_CONFIG, 200);

        KafkaTemplate<Integer, String> template = new KafkaTemplate<Integer, String>(
                producerFactory, configsOverride
        );
        return template;
    }

}

2.4. 创建同步和异步发送消息到Topic的接口方法

  • 同步接口
@RestController
public class KafkaSyncProducerController {

    @Autowired
    private KafkaTemplate<Integer, String> template;

    @RequestMapping("send/sync/{message}")
    public String send(@PathVariable String message) {

        final ListenableFuture<SendResult<Integer, String>> future = template.send("zbbmeta-01", null, message);
        // 同步发送消息
        try {
            final SendResult<Integer, String> sendResult = future.get();
            final RecordMetadata metadata = sendResult.getRecordMetadata();

            System.out.println(metadata.topic() + "\t" + metadata.partition() + "\t" + metadata.offset());

        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }

        return "success";
    }

}
  • 异步接口
@RestController
public class KafkaAsyncProducerController {

    @Autowired
    private KafkaTemplate<Integer, String> template;


    @RequestMapping("send/async/{message}")
    public String send(@PathVariable String message) {

        final ListenableFuture<SendResult<Integer, String>> future = this.template.send("zbbmeta-01", null, message);

        // 设置回调函数,异步等待broker端的返回结果
        future.addCallback(new ListenableFutureCallback<SendResult<Integer, String>>() {
            @Override
            public void onFailure(Throwable throwable) {
                System.out.println("发送消息失败:" + throwable.getMessage());
            }

            @Override
            public void onSuccess(SendResult<Integer, String> result) {
                final RecordMetadata metadata = result.getRecordMetadata();

                System.out.println("发送消息成功:" + metadata.topic() + "\t" + metadata.partition() + "\t" + metadata.offset());
            }
        });

        return "success";
    }

}

2.5 创建消费者

在springboot中消费者实际上就是监听,代码如下

@Component
public class MyConsumer {

    @KafkaListener(topics = "zbbmeta-01")
    public void onMessage(ConsumerRecord<Integer, String> record) {
        System.out.println("消费者收到的消息:"
                + record.topic() + "\t"
              +"分区 :"  + record.partition() + "\t"
                + record.offset() + "\t"
                + record.key() + "\t"
                + record.value());
    }

}

2.6 测试

启动SpringBoot项目,多次发送请求,到同步和异步接口

  • 同步:http://localhost:8899/send/async/111112
  • 异步:http://localhost:8899/send/sync/111112

https://www.xamrdz.com/lan/5xy1960964.html

相关文章: