在 Kafka 的生产者设置压缩协议时,消费者不需要显式地设置压缩协议。消费者会自动解压生产者发送的压缩消息。因此,无论生产者使用的是 gzip
、snappy
、lz4
还是 zstd
压缩,消费者都会正确解压并处理消息。
为了进一步澄清这个问题,这里是一个完整的消费者示例,它能够正确处理任何压缩类型的消息,而无需额外配置压缩协议:
package main
import (
"github.com/Shopify/sarama"
"go.uber.org/zap"
)
func main() {
brokers := []string{"broker1:9092", "broker2:9092"}
topic := "your_topic"
// 创建 Sarama 配置
config := sarama.NewConfig()
config.Consumer.Return.Errors = true
config.Version = sarama.V2_1_0_0 // 使用 Kafka 版本
// 创建消费者
client, err := sarama.NewConsumer(brokers, config)
if err != nil {
zap.S().Fatal(err)
}
defer func() {
if err := client.Close(); err != nil {
zap.S().Errorf("client close error: %v", err)
}
}()
// 获取分区列表
partitions, err := client.Partitions(topic)
if err != nil {
zap.S().Fatal(err)
}
// 选择偏移量
offset := sarama.OffsetOldest
zap.S().Infof("start one offset: %d", offset)
// 消费第一个分区
claim, err := client.ConsumePartition(topic, partitions[0], offset)
if err != nil {
zap.S().Fatal(err)
}
defer func() {
zap.S().Infof("kafka is closing...")
if err := claim.Close(); err != nil {
zap.S().Errorf("partition consumer close error: %v", err)
}
}()
zap.S().Infof("start received message...")
// 消费消息
for message := range claim.Messages() {
zap.S().Infof("Message received: %s", string(message.Value))
}
}
在上述示例中:
- 配置和创建消费者:配置 Kafka 版本和返回错误信息。
- 获取分区并选择偏移量:消费指定分区的消息,从
OffsetOldest
开始。 - 处理消息:消费者会自动解压缩消息,无需额外的配置。
因此,无需在消费者代码中指定压缩协议。Kafka 的消费者会自动处理从生产者发送的任何压缩格式的消息。