当前位置: 首页>后端>正文

在PHP中使用Kafka

最近项目中需要从Kafka中读取消息,记录一下。

安装扩展

https://l1905.github.io/kafka/php/2020/07/07/php-user-kafka/
apt install librdkafka-dev
pecl install rdkafka

消费(从指定的 partition)

在PHP中使用Kafka,conf = new \RdKafka\Conf();,第1张conf->set

librdkafka 配置项说明
https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md

// 配置groud.id 具有相同 group.id 的consumer 将会处理不同分区的消息,所以同一个组内的消费者数量如果订阅了一个topic, 那么消费者进程的数量多于 多于这个topic 分区的数量是没有意义的。
$conf->set('group.id', 'myConsumerGroup1');

//添加 kafka集群服务器地址
$conf->set('metadata.broker.list', '192.168.33.1:9092');

// Set where to start consuming messages when there is no initial offset in
// offset store or the desired offset is out of range.
// 'smallest': start from the beginning
//当没有初始偏移量时,从哪里开始读取
$topicConf->set('auto.offset.reset', 'smallest');

// Set the configuration to use for subscribed/assigned topics
在PHP中使用Kafka,conf->setDefaultTopicConf(,第2张topicConf);

        $conf = new \RdKafka\Conf();
        $conf->set('group.id','ali_icebox_refund');
        $rk = new \RdKafka\Consumer($conf);
        $rk->addBrokers(\Cake\Core\Configure::read('ubox_kafka_service'));

        $topicConf = new \RdKafka\TopicConf();
        $topicConf->set('auto.commit.interval.ms', 100);
        $topicConf->set('auto.offset.reset', 'smallest');

        $topic = $rk->newTopic('ali_icebox_refund', $topicConf);
        $topic->consumeStart(0, RD_KAFKA_OFFSET_STORED);
while(true) {
            // 设置消费时的时间间隔,单位毫秒,以下表示60秒消费一个
            $message = $topic->consume(0, 5000);
            if ($message) {
                echo "读取到消息\n\r";

                // 消息对象,包括消息主题,消息创建时间戳,消息分区编号,消息主体,消息键名,消息长度等
                var_dump($message);

                switch ($message->err) {
                    case RD_KAFKA_RESP_ERR_NO_ERROR:
                        echo "读取消息成功:\n\r";
                        var_dump($message->payload);
                        break;
                    case RD_KAFKA_RESP_ERR__PARTITION_EOF:
                        echo "读取消息失败\n\r";
                        break;
                    case RD_KAFKA_RESP_ERR__TIMED_OUT:
                        echo "请求超时\n\r";
                        break;
                    default:
                        throw new \Exception($message->errstr(), $message->err);
                        break;
                }
            } else {
                echo "未读取到消息\n\r";
            }

关于消费

低级消费

需要必备的字段 :

  • broke_list : 服务器节点
  • topic_name: 发布的topic队列名称
  • group.id : 组ID, 默认是 zdmDefaultConsumerGroup
  • partition: 分区索引, 即消费哪个分区的消息

必须指定group.id和partition, 因为消费偏移量是 和二者直接关联

重要的参数设置:

  • socket.timeout.ms 网络链接超时时间
  • auto.offset.reset 如果偏移量存储还没有初始化或偏移量超过范围时的处理方式, earliest 最早偏移地址,latest最晚偏移地址
  • enable.auto.commit 是否开启自动上报偏移量, 默认true
  • enable.auto.offset.store 将自动提交偏移量存储到内存中, 默认开启
  • enable.auto.commit 是否开启自动提交
  • auto.commit.interval.ms 自动提交的时间间隔
  • max.poll.interval.ms 高级消费者模式下,消费最大时长, 如果此次消费时长太长,则server端会剔除该group成员, 重新reblance 消费端分区

常见问题:

  1. 高级消费和低级消费的区别?
    高级消费即不需要指定消费的分区, sdk自动帮你选择消费的分区

  2. 希望重新消费数据,需要怎么操作
    可以重设消费偏移量 lowLevelConsume方法的option参数传递 consume_start_offset.

    • RD_KAFKA_OFFSET_BEGINNING 从最开始开始消费
    • RD_KAFKA_OFFSET_STORED 从上次消费的位置开始消费
    • RD_KAFKA_OFFSET_END 从最新位置开始消费
    • rd_kafka_offset_tail(xxx) 从最新偏移量的某个位置开始消费
  3. 是否支持批量消费.
    sdk支持,但我们的调用方法没有封装, 具体使用参考

高级消费

需要必备的字段 :

  • 不需要指定分区,其他和低级消费参数一致

重要的参数设置:

和低级消费参数一致


https://www.xamrdz.com/backend/3ep1931540.html

相关文章: