当前位置: 首页>后端>正文

Springboot 消息生产与消费基于Kafka

Mac鐜瀹夎Kafka

Kafka渚濊禆浜巣ookeeper锛屼互涓嬫柟寮忓畨瑁匥afka鍚庤嚜甯k
brew install kafka

# Kafka鍚姩鏂瑰紡
To restart kafka after an upgrade:
  brew services restart kafka
Or, if you don't want/need a background service you can just run:
  /usr/local/opt/kafka/bin/kafka-server-start /usr/local/etc/kafka//usr/local/etc/kafka/server.properties

# Kafka瀹夎鐩綍
==> Summary
馃嵑  /usr/local/Cellar/kafka/3.2.0: 200 files, 99.4MB

# 閰嶇疆鏂囦欢璺緞
/usr/local/etc/kafka/

kafka鍙栨秷鑷姩鍒涘缓topic锛岀紪杈?usr/local/etc/kafka/server.properties锛屾坊鍔?code>auto.create.topics.enable=false

鍚姩zookeeper涓巏afka

鍚姩zkbrew services start zookeeper榛樿绔彛2181锛屽惎鍔╧afkabrew services start kafka锛岄粯璁ょ鍙?092銆傚闇€瑕佷慨鏀归粯璁ょ鍙e彿鍙互鍦?usr/local/etc/kafka/涓瓃ookeeper.properties鍜宻erver.properties涓搴斾慨鏀瑰嵆鍙€?/p>

鍒涘缓Topic

璇存槑锛氬湪杈冩柊鐗堟湰锛?.2 鍙婃洿楂樼増鏈級鐨?Kafka 涓嶅啀闇€瑕?ZooKeeper 杩炴帴瀛楃涓诧紝鍗? -zookeeper localhost:2181銆備娇鐢?Kafka Broker鐨?--bootstrap-server localhost:9092鏉ユ浛浠? -zookeeper localhost:2181銆?/p>

cd /usr/local/Cellar/kafka/3.2.0/bin
# 鍒涘缓topic
./kafka-topics --bootstrap-server localhost:9092 --create --topic domancy --partitions 2 --replication-factor 1
./kafka-topics --bootstrap-server localhost:9092 --create --topic order --partitions 2 --replication-factor 1

# 鏌ョ湅topic
./kafka-topics --bootstrap-server localhost:9092 --describe --topic domancy

# 鍒犻櫎topic
./kafka-topics --bootstrap-server localhost:9092 --delete --topic domancy

娴嬭瘯Kakfa

鐢熶骇鑰?/p>

鉃? /usr/local/Cellar/kafka/3.2.0/bin/kafka-console-producer --broker-list localhost:9092 --topic domancy
>鐧芥棩渚濆北灏?
>榛勬渤鍏ユ捣娴?
>娆茬┓鍗冮噷鐩?
>鏇翠笂涓€灞傛ゼ
>

娑堣垂鑰?/p>

鉃? bin kafka-console-consumer --bootstrap-server localhost:9092 --topic domancy --from-beginning
鐧芥棩渚濆北灏?
榛勬渤鍏ユ捣娴?
娆茬┓鍗冮噷鐩?
鏇翠笂涓€灞傛ゼ

Springboot椤圭洰

pom鏂囦欢

<dependencies>
  <!-- 楂樼増鏈殑SpringBoot闇€瑕佸簲鐢╯pring-boot-starter-web鍖咃紝
  涓嶇劧浼氬嚭鐜癆bstractDiscoveryClientOptionalArgs涓嶅瓨鍦ㄥ紓甯?->
  <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
    <version>2.0.8.RELEASE</version>
  </dependency>

  <dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
  </dependency>

  <!-- https://mvnrepository.com/artifact/cn.hutool/hutool-all -->
  <dependency>
    <groupId>cn.hutool</groupId>
    <artifactId>hutool-all</artifactId>
    <version>5.7.22</version>
  </dependency>

  <dependency>
    <groupId>com.alibaba</groupId>
    <artifactId>fastjson</artifactId>
    <version>1.2.76</version>
  </dependency>

  <!-- kafka -->
  <dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
  </dependency>
</dependencies>

resources

server:
  port: 8083

spring:
  kafka:
    consumer:
      bootstrap-servers: localhost:9092
      group-id: test-consumer
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

  application:
    name: stream-bus

娑堟伅鐢熶骇

鍗曟潯娑堟伅鍙戦€侊細http://localhost:8083/send/manual/shirts
鑷姩鎵归噺鍙戦€佹秷鎭細http://localhost:8083/send/auto

@RestController
@RequestMapping(value = "send")
public class SenderController {
    @Resource
    private KafkaTemplate<String, Object> kafkaTemplate;

    @GetMapping(value = "/manual/{msg}")
    public void send(@PathVariable("msg") String msg) {
        kafkaTemplate.send("domancy", msg);
    }


    /**
     * 澶氫釜娑堣垂鑰呭悓鏃舵秷璐逛竴鏉℃秷鎭?
     */
    @GetMapping("/auto")
    public void autoSendBatchMsg() {
        for (int i = 0; i <10; i++) {
            try {
                Thread.sleep(3);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            Long orderId = new SnowflakeGenerator().next();
            //key鍊煎彇hash鍊煎鍒嗗尯鏁伴噺鍙栨ā

            final Order order = new Order();
            order.setOrderId(orderId.toString());
            order.setCreateTime(new Date());
            order.setPhone('1' + RandomUtil.randomNumbers(10));

            //final Integer partition = Math.abs(orderId.hashCode()) % 2;
            boolean randomBoolean = RandomUtil.randomBoolean();
            int partition = randomBoolean 1 : 0;
            System.out.println("kafka 鍙戦€佹秷鎭? + orderId + "鍒嗗尯=====" + partition);

            kafkaTemplate.send("order", partition, orderId.toString(), JSON.toJSONString(order));
        }
    }
}

娑堟伅娑堣垂

  • 鎸囧畾topicPartitions娑堣垂娑堟伅
  • 鎸囧畾group娑堣垂
@Service
public class ConsumerService {
    @KafkaListener(id = "c_1", topicPartitions = {@TopicPartition(topic = "domancy", partitionOffsets = @PartitionOffset(partition = "0", initialOffset = "0"))})
    public void partition0(String msgData) {
        System.out.println("ConsumerService c_1 receive : " + msgData + ", partition: 0" );
    }

    @KafkaListener(id = "c2", topicPartitions = {@TopicPartition(topic = "domancy", partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "0"))})
    public void partition1(String msgData) {
        System.out.println("ConsumerService c_2 receive : " + msgData + ", partition: 1" );
    }

    @KafkaListener(groupId="test-consumer", topics = {"order"})
    public void consumer1(ConsumerRecord<Integer, String> record) {
        System.out.println("鍒嗙粍锛歝onsumer-id-1 " + " 涓婚锛? + record.topic() + "-" + record.partition() + "-" + record.value());
    }

    @KafkaListener(groupId="test-consumer", topics = {"order"})
    public void consumer2(ConsumerRecord<Integer, String> record) {
        System.out.println("鍒嗙粍锛歝onsumer-id-2 " + " 涓婚锛? + record.topic() + "-" + record.partition() + "-" + record.value());
    }

}

杩愯鏁堟灉

Springboot 消息生产与消费基于Kafka,第1张
鎵归噺鍙戦€侀殢鏈烘秷鎭?/div>

Springboot 消息生产与消费基于Kafka,第2张
娑堣垂鑰呰幏鍙栨秷鎭?/div>

https://www.xamrdz.com/backend/3h91939387.html

相关文章: