1. 消息的发送与接收
生产者主要的对象有: **KafkaProducer , ProducerRecord 。 其中 KafkaProducer 是用于发送消息的类, ProducerRecord 类用于封装Kafka的消息。 KafkaProducer 的创建需要指定的参数和含义:
参数 | 说明 |
bootstrap.servers | 配置生产者如何与broker建立连接。该参数设置的是初始化参数。如果生产者需要连接的是Kafka集群,则这里配置集群中几个broker的地址,而不是全部,当生产者连接上此处指定的broker之后,在通过该连接发现集群中的其他节点。 |
key.serializer | 要发送信息的key数据的序列化类。 |
value.serializer | 要发送消息的alue数据的序列化类 |
acks | 默认值:all会等待所有的副本分区确认记录, acks=0生产者不等待broker对消息的确认,只要将消息放到缓冲区,就认为消息已经发送完成,acks=1 表示消息只需要写到主分区即可,然后就响应客户端,而不等待副本分区的确认 |
retries | retries重试次数 |
其他参数可以从 org.apache.kafka.clients.producer.ProducerConfig 中找到. 消费者生产消息后,需要broker端的确认,可以同步确认,也可以异步确认。 同步确认效率低,异步确认效率高,但是需要设置回调对象。
1.1. 创建kafka-01-producer-consumer项目
创建kafka-01-producer-consumer项目,并在pom.xml添加依赖
<dependencies>
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.7.0</version>
</dependency>
</dependencies>
1.2. 生产者
步骤如下:
- 1.创建用于连接Kafka的Properties配置
- 2.创建一个生产者对象KafkaProducer
- 发送1-100的消息到指定的topic中
5.关闭生产者 代码如下:
public class KafkaProducer1 {
public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {
Properties props = new Properties();
// 设置连接Kafka的初始连接用到的服务器地址
// 如果是集群,则可以通过此初始连接发现集群中的其他broker
props.put("bootstrap.servers", "kafka1:9092,kafka2:9092,kafka3:9092");
// acks=1 表示消息只需要写到主分区即可,然后就响应客户端,而不等待副本分区的 确认。
//acks=all首领分区会等待所有的ISR副本分区确认记录。该处理保证了只要有一个ISR副本分区存活,消息就不会丢失。
//这是Kafka最强的可靠性保证,等效于 acks=-1
props.put("acks", "all");
// 设置key的序列化器
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// 设置value的序列化器
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// 2. 创建一个生产者对象KafkaProducer
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String,
String>(props);
// 3. 发送1-100的消息到指定的topic中
for(int i = 0; i < 100; ++i) {
// 一、使用同步等待的方式发送消息
// // 构建一条消息,直接new ProducerRecord
// ProducerRecord<String, String> producerRecord = new ProducerRecord<>("test", null, i + "");
// Future<RecordMetadata> future = kafkaProducer.send(producerRecord);
// // 调用Future的get方法等待响应
// future.get();
// System.out.println("第" + i + "条消息写入成功!");
// 二、使用异步回调的方式发送消息
ProducerRecord<String, String> producerRecord = new ProducerRecord<>("test", null, i + "");
kafkaProducer.send(producerRecord, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
// 1. 判断发送消息是否成功
if(exception == null) {
// 发送成功
// 主题
String topic = metadata.topic();
// 分区id
int partition = metadata.partition();
// 偏移量
long offset = metadata.offset();
System.out.println("topic:" + topic + " 分区id:" + partition + " 偏移量:" + offset);
}
else {
// 发送出现错误
System.out.println("生产消息出现异常!");
// 打印异常消息
System.out.println(exception.getMessage());
// 打印调用栈
System.out.println(exception.getStackTrace());
}
}
});
}
// 4.关闭生产者
kafkaProducer.close();
}
}
1.3. 消费者
- 1.创建Kafka消费者配置
- 2.创建Kafka消费者
- 3.订阅要消费的主题
- 4.使用一个while循环,不断从Kafka的topic中拉取消息
public class KafkaConsumer1 {
public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException {
// 1.创建Kafka消费者配置
Properties props = new Properties();
props.setProperty("bootstrap.servers", "kafka1:9092,kafka2:9092,kafka3:9092");
// 消费者组(可以使用消费者组将若干个消费者组织到一起),共同消费Kafka中topic的数据
// 每一个消费者需要指定一个消费者组,如果消费者的组名是一样的,表示这几个消费者是一个组中的
props.setProperty("group.id", "test");
// 自动提交offset
props.setProperty("enable.auto.commit", "true");
// 自动提交offset的时间间隔
props.setProperty("auto.commit.interval.ms", "1000");
// 拉取的key、value数据的
props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
// 2.创建Kafka消费者
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(props);
// 3. 订阅要消费的主题
// 指定消费者从哪个topic中拉取数据
kafkaConsumer.subscribe(Arrays.asList("test"));
// 4.使用一个while循环,不断从Kafka的topic中拉取消息
while(true) {
// Kafka的消费者一次拉取一批的数据
ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(5));
// 5.将将记录(record)的offset、key、value都打印出来
for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
// 主题
String topic = consumerRecord.topic();
// offset:这条消息处于Kafka分区中的哪个位置
long offset = consumerRecord.offset();
// key\value
String key = consumerRecord.key();
String value = consumerRecord.value();
System.out.println("topic: " + topic + " offset:" + offset + " key:" + key + " value:" + value);
}
Thread.sleep(1000);
}
}
}
编写上面代码之后,大家有没有发现一个问题,我们没有使用SpringBoot进行开发,那么接下来就是要SprigBoot开发
2.SpringBoot Kafka实战
2.1 创建kafka-02-springboot项目
kafka-02-springboot依赖如下
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>spring-boot-starter-parent</artifactId>
<groupId>org.springframework.boot</groupId>
<version>2.7.18</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>kafka-02-springboot</artifactId>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<version>2.7.18</version>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
</dependencies>
</project>
2.2 添加配置项
image
在application.yml
添加kafka配置
server:
port: 8899
spring:
application:
name: kafka-springboot
kafka:
producer:
# producer用到的key和value的序列化类
key-serializer: org.apache.kafka.common.serialization.IntegerSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
batch-size: 33554432
# 用于建立初始连接的broker地址
bootstrap-servers: kafka1:9092,kafka2:9092,kafka3:9092
consumer:
## consumer用到的key和value的反序列化类
key-deserializer: org.apache.kafka.common.serialization.IntegerDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
group-id: springboot-consumer02
# 如果在kafka中找不到当前消费者的偏移量,则直接将偏移量重置为最早的
auto-offset-reset: earliest
# 消费者的偏移量是自动提交还是手动提交,此处自动提交偏移量
enable-auto-commit: true
# 消费者偏移量自动提交的时间间隔
auto-commit-interval: 1000
2.3. 创建Kafka配置类
@Configuration
public class KafkaConfig {
@Bean
public NewTopic topic1() {
return new NewTopic("zbbmeta-01", 3, (short) 1);
}
@Bean
public KafkaAdmin kafkaAdmin() {
Map<String, Object> configs = new HashMap<>();
configs.put("bootstrap.servers", "kafka1:9092,kafka2:9092,kafka3:9092");
KafkaAdmin admin = new KafkaAdmin(configs);
return admin;
}
@Bean
@Autowired
public KafkaTemplate<Integer, String> kafkaTemplate(ProducerFactory<Integer, String> producerFactory) {
// 覆盖ProducerFactory原有设置
Map<String, Object> configsOverride = new HashMap<>();
configsOverride.put(ProducerConfig.BATCH_SIZE_CONFIG, 200);
KafkaTemplate<Integer, String> template = new KafkaTemplate<Integer, String>(
producerFactory, configsOverride
);
return template;
}
}
2.4. 创建同步和异步发送消息到Topic的接口方法
- 同步接口
@RestController
public class KafkaSyncProducerController {
@Autowired
private KafkaTemplate<Integer, String> template;
@RequestMapping("send/sync/{message}")
public String send(@PathVariable String message) {
final ListenableFuture<SendResult<Integer, String>> future = template.send("zbbmeta-01", null, message);
// 同步发送消息
try {
final SendResult<Integer, String> sendResult = future.get();
final RecordMetadata metadata = sendResult.getRecordMetadata();
System.out.println(metadata.topic() + "\t" + metadata.partition() + "\t" + metadata.offset());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
return "success";
}
}
- 异步接口
@RestController
public class KafkaAsyncProducerController {
@Autowired
private KafkaTemplate<Integer, String> template;
@RequestMapping("send/async/{message}")
public String send(@PathVariable String message) {
final ListenableFuture<SendResult<Integer, String>> future = this.template.send("zbbmeta-01", null, message);
// 设置回调函数,异步等待broker端的返回结果
future.addCallback(new ListenableFutureCallback<SendResult<Integer, String>>() {
@Override
public void onFailure(Throwable throwable) {
System.out.println("发送消息失败:" + throwable.getMessage());
}
@Override
public void onSuccess(SendResult<Integer, String> result) {
final RecordMetadata metadata = result.getRecordMetadata();
System.out.println("发送消息成功:" + metadata.topic() + "\t" + metadata.partition() + "\t" + metadata.offset());
}
});
return "success";
}
}
2.5 创建消费者
在springboot中消费者实际上就是监听,代码如下
@Component
public class MyConsumer {
@KafkaListener(topics = "zbbmeta-01")
public void onMessage(ConsumerRecord<Integer, String> record) {
System.out.println("消费者收到的消息:"
+ record.topic() + "\t"
+"分区 :" + record.partition() + "\t"
+ record.offset() + "\t"
+ record.key() + "\t"
+ record.value());
}
}
2.6 测试
启动SpringBoot项目,多次发送请求,到同步和异步接口
- 同步:http://localhost:8899/send/async/111112
- 异步:http://localhost:8899/send/sync/111112