当前位置: 首页>数据库>正文

kafka restful api文档 kafka-preferred-replica-election.sh

分区leader选举脚本执行介绍

脚本参数配置

kafka触发副本选举的脚本是bin/kafka-leader-election.sh
配置参数为:

序号

参数

说明

1

bootstrap-server

kafka集群地址清单

2

admin.config

指定客户端config配置文件

3

path-to-json-file

指定选举副本JSON文件

4

topic

path-to-json-file或all-topic-partitions或指定topic

5

partition

path-to-json-file或all-topic-partitions或指定topic对应的partition

6

all-topic-partitions

对所有的topicPartition进行选举

7

election-type

指定选举的类型,preferred or unclean

脚本执行样例

对topic_1来执行副本选举,在执行之前zk中的数据如下:
- topic_1副本数据为{“version”:2,“partitions”:{“0”:[2,1,3]},“adding_replicas”:{},“removing_replicas”:{}}
- state节点数据为{“controller_epoch”:253,“leader”:1,“version”:1,“leader_epoch”:10,“isr”:[1,2]}
可以看到topic_1副本数据中0号分区的副本为[2,1,3]这个就是我们常说的ar,优先副本为2,目前leader为1,我们通过执行脚本将leader改成2,准备json文件如下:

{
  "partitions": [
    {
      "topic": "topic_1",
      "partition": 0
    }
  ]
}
  • 执行如下脚本
sh bin/kafka-leader-election.sh --bootstrap-server 127.0.0.1:9092 --path-to-json-file config/leaderElection/election
.json --election-type preferred
  • 执行后控制台打印如下日志表示执行成功
Successfully completed leader election (PREFERRED) for partitions topic_1-0
  • 完成选举后zk中针对topic_1的state节点的数据如下,leader会切换到2号副本
{"controller_epoch":253,"leader":2,"version":1,"leader_epoch":12,"isr":[1,2]}

分区leader流程

kafka restful api文档 kafka-preferred-replica-election.sh,kafka restful api文档 kafka-preferred-replica-election.sh_json,第1张

分区leader选举源码解析

入口在kafka.controller.KafkaController#processReplicaLeaderElection

private def processReplicaLeaderElection(
    partitionsFromAdminClientOpt: Option[Set[TopicPartition]],
    electionType: ElectionType,
    electionTrigger: ElectionTrigger,
    callback: ElectLeadersCallback
  ): Unit = {
    //可以看到有两种触发方式,一种是通过执行脚本触发,另一种是写入zk节点admin/preferred_replica_election来触发
    if (electionTrigger == AdminClientTriggered || zkClient.registerZNodeChangeHandlerAndCheckExistence(preferredReplicaElectionHandler)) {
        val partitions = partitionsFromAdminClientOpt match {
          case Some(partitions) => partitions
          case None => zkClient.getPreferredReplicaElection
        }
        //校验分区是否有效
        val (knownPartitions, unknownPartitions) = partitions.partition(tp => controllerContext.allPartitions.contains(tp))
        unknownPartitions.foreach { p =>
          info(s"Skipping replica leader election ($electionType) for partition $p by $electionTrigger since it doesn't exist.")
        }
      //如果该topicPartition在删除队列中则忽略
        val (partitionsBeingDeleted, livePartitions) = knownPartitions.partition(partition =>
            topicDeletionManager.isTopicQueuedUpForDeletion(partition.topic))
        if (partitionsBeingDeleted.nonEmpty) {
          warn(s"Skipping replica leader election ($electionType) for partitions $partitionsBeingDeleted " +
            s"by $electionTrigger since the respective topics are being deleted")
        }

        // 根据传入的electionType来校验数据,如果是优先副本选举则在当前leader不是优先副本的话才会触发优先副本选举,如果是unclean的话需要当前leader不存在或者已下线
        val (electablePartitions, alreadyValidLeader) = livePartitions.partition { partition =>
          electionType match {
            case ElectionType.PREFERRED =>
              val assignedReplicas = controllerContext.partitionReplicaAssignment(partition)
              val preferredReplica = assignedReplicas.head
              val currentLeader = controllerContext.partitionLeadershipInfo(partition).leaderAndIsr.leader
              currentLeader != preferredReplica

            case ElectionType.UNCLEAN =>
              val currentLeader = controllerContext.partitionLeadershipInfo(partition).leaderAndIsr.leader
              currentLeader == LeaderAndIsr.NoLeader || !controllerContext.liveBrokerIds.contains(currentLeader)
          }
        }
        //2.1 进行副本选举
        val results = onReplicaElection(electablePartitions, electionType, electionTrigger).map {
          case (k, Left(ex)) =>
            if (ex.isInstanceOf[StateChangeFailedException]) {
              val error = if (electionType == ElectionType.PREFERRED) {
                Errors.PREFERRED_LEADER_NOT_AVAILABLE
              } else {
                Errors.ELIGIBLE_LEADERS_NOT_AVAILABLE
              }
              k -> Left(new ApiError(error, ex.getMessage))
            } else {
              k -> Left(ApiError.fromThrowable(ex))
            }
          case (k, Right(leaderAndIsr)) => k -> Right(leaderAndIsr.leader)
        } ++
        alreadyValidLeader.map(_ -> Left(new ApiError(Errors.ELECTION_NOT_NEEDED))) ++
        partitionsBeingDeleted.map(
          _ -> Left(new ApiError(Errors.INVALID_TOPIC_EXCEPTION, "The topic is being deleted"))
        ) ++
        unknownPartitions.map(
          _ -> Left(new ApiError(Errors.UNKNOWN_TOPIC_OR_PARTITION, "The partition does not exist."))
        )

        debug(s"Waiting for any successful result for election type ($electionType) by $electionTrigger for partitions: $results")
        callback(results)
      }
  }

kafka.controller.KafkaController#onReplicaElection

可以看到这个代码逻辑也很简单,就是设置leader选举策略,然后调用分区状态机来执行leader选举

private[this] def onReplicaElection(
    partitions: Set[TopicPartition],
    electionType: ElectionType,
    electionTrigger: ElectionTrigger
  ): Map[TopicPartition, Either[Throwable, LeaderAndIsr]] = {
    info(s"Starting replica leader election ($electionType) for partitions ${partitions.mkString(",")} triggered by $electionTrigger")
    try {
      //根据传入的electionType来设置选举策略,如果是PREFERRED则为优先副本分区leader选举策略,如果是UNCLEAN则为离线分区leader选举策略
      val strategy = electionType match {
        case ElectionType.PREFERRED => PreferredReplicaPartitionLeaderElectionStrategy
        case ElectionType.UNCLEAN =>
          /* Let's be conservative and only trigger unclean election if the election type is unclean and it was
           * triggered by the admin client
           */
          OfflinePartitionLeaderElectionStrategy(allowUnclean = electionTrigger == AdminClientTriggered)
      }
      //直接调用分区状态机修改分区状态
      val results = partitionStateMachine.handleStateChanges(
        partitions.toSeq,
        OnlinePartition,
        Some(strategy)
      )
      if (electionTrigger != AdminClientTriggered) {
        results.foreach {
          case (tp, Left(throwable)) =>
            if (throwable.isInstanceOf[ControllerMovedException]) {
              info(s"Error completing replica leader election ($electionType) for partition $tp because controller has moved to another broker.", throwable)
              throw throwable
            } else {
              error(s"Error completing replica leader election ($electionType) for partition $tp", throwable)
            }
          case (_, Right(_)) => // Ignored; No need to log or throw exception for the success cases
        }
      }

      results
    } finally {
      if (electionTrigger != AdminClientTriggered) {
        removePartitionsFromPreferredReplicaElection(partitions, electionTrigger == AutoTriggered)
      }
    }
  }

kafka.controller.ZkPartitionStateMachine#doHandleStateChanges

在这里我们再次回顾分区状态机中的代码,着重看一下与leader选举相关的部分,因为这边是修改分区leader,所以走的是状态机中OnlinePartition->OnlinePartition部分

private def doElectLeaderForPartitions(
    partitions: Seq[TopicPartition],
    partitionLeaderElectionStrategy: PartitionLeaderElectionStrategy
  ): (Map[TopicPartition, Either[Exception, LeaderAndIsr]], Seq[TopicPartition]) = {
    val getDataResponses = try {
      zkClient.getTopicPartitionStatesRaw(partitions)
    } catch {
      case e: Exception =>
        return (partitions.iterator.map(_ -> Left(e)).toMap, Seq.empty)
    }
    val failedElections = mutable.Map.empty[TopicPartition, Either[Exception, LeaderAndIsr]]
    val validLeaderAndIsrs = mutable.Buffer.empty[(TopicPartition, LeaderAndIsr)]

    getDataResponses.foreach { getDataResponse =>
      val partition = getDataResponse.ctx.get.asInstanceOf[TopicPartition]
      val currState = partitionState(partition)
      if (getDataResponse.resultCode == Code.OK) {
        //校验controller的纪元
        TopicPartitionStateZNode.decode(getDataResponse.data, getDataResponse.stat) match {
          case Some(leaderIsrAndControllerEpoch) =>
            if (leaderIsrAndControllerEpoch.controllerEpoch > controllerContext.epoch) {
              val failMsg = s"Aborted leader election for partition $partition since the LeaderAndIsr path was " +
                s"already written by another controller. This probably means that the current controller $controllerId went through " +
                s"a soft failure and another controller was elected with epoch ${leaderIsrAndControllerEpoch.controllerEpoch}."
              failedElections.put(partition, Left(new StateChangeFailedException(failMsg)))
            } else {
              validLeaderAndIsrs += partition -> leaderIsrAndControllerEpoch.leaderAndIsr
            }

          case None =>
            val exception = new StateChangeFailedException(s"LeaderAndIsr information doesn't exist for partition $partition in $currState state")
            failedElections.put(partition, Left(exception))
        }

      } else if (getDataResponse.resultCode == Code.NONODE) {
        val exception = new StateChangeFailedException(s"LeaderAndIsr information doesn't exist for partition $partition in $currState state")
        failedElections.put(partition, Left(exception))
      } else {
        failedElections.put(partition, Left(getDataResponse.resultException.get))
      }
    }

    if (validLeaderAndIsrs.isEmpty) {
      return (failedElections.toMap, Seq.empty)
    }
    //可以看到这里有四种选举策略,初始化的时候使用的是OfflinePartitionLeaderElectionStrategy策略
    val (partitionsWithoutLeaders, partitionsWithLeaders) = partitionLeaderElectionStrategy match {
        //离线分区leader选举策略:优先选取isr中还存活的第一个副本作为leader,如果传入的allowUnclean为true或者topic中的配置允许选举,则可选其他副本作为leader
      case OfflinePartitionLeaderElectionStrategy(allowUnclean) =>
        val partitionsWithUncleanLeaderElectionState = collectUncleanLeaderElectionState(
          validLeaderAndIsrs,
          allowUnclean
        )
        leaderForOffline(controllerContext, partitionsWithUncleanLeaderElectionState).partition(_.leaderAndIsr.isEmpty)
      case ReassignPartitionLeaderElectionStrategy =>
        //分区重分配leader选举策略:选取isr中还存活的第一个副本作为leader
        leaderForReassign(controllerContext, validLeaderAndIsrs).partition(_.leaderAndIsr.isEmpty)
      case PreferredReplicaPartitionLeaderElectionStrategy =>
        //优先分区副本选举策略:也是isr中还存活的第一个副本作为leader
        leaderForPreferredReplica(controllerContext, validLeaderAndIsrs).partition(_.leaderAndIsr.isEmpty)
      case ControlledShutdownPartitionLeaderElectionStrategy =>
        //controller宕机分区leader选举策略:不在宕机broker之列的isr中还存活的第一个副本作为leader
        leaderForControlledShutdown(controllerContext, validLeaderAndIsrs).partition(_.leaderAndIsr.isEmpty)
    }
    partitionsWithoutLeaders.foreach { electionResult =>
      val partition = electionResult.topicPartition
      val failMsg = s"Failed to elect leader for partition $partition under strategy $partitionLeaderElectionStrategy"
      failedElections.put(partition, Left(new StateChangeFailedException(failMsg)))
    }
    val recipientsPerPartition = partitionsWithLeaders.map(result => result.topicPartition -> result.liveReplicas).toMap
    val adjustedLeaderAndIsrs = partitionsWithLeaders.map(result => result.topicPartition -> result.leaderAndIsr.get).toMap
    //将前面得到的isr信息写入zk
    val UpdateLeaderAndIsrResult(finishedUpdates, updatesToRetry) = zkClient.updateLeaderAndIsr(
      adjustedLeaderAndIsrs, controllerContext.epoch, controllerContext.epochZkVersion)
    //写入zk之后更新cache中的partitionLeadershipInfo信息,并给每个分区还存活的副本发送leaderAndIsr请求,给每个broker发送更新元数据请求
    finishedUpdates.foreach { case (partition, result) =>
      result.right.foreach { leaderAndIsr =>
        val replicaAssignment = controllerContext.partitionFullReplicaAssignment(partition)
        val leaderIsrAndControllerEpoch = LeaderIsrAndControllerEpoch(leaderAndIsr, controllerContext.epoch)
        controllerContext.partitionLeadershipInfo.put(partition, leaderIsrAndControllerEpoch)
        controllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers(recipientsPerPartition(partition), partition,
          leaderIsrAndControllerEpoch, replicaAssignment, isNew = false)
      }
    }

    (finishedUpdates ++ failedElections, updatesToRetry)
  }

分区状态机中主要做了以下几件事
1、根据相应的策略选出leader,我们这里涉及到两种策略:

  • OfflinePartitionLeaderElectionStrategy:离线分区leader选举策略,优先选取isr中还存活的副本为leader,根据入参或者topic配置支持不同步的副本选举为leader
  • PreferredReplicaPartitionLeaderElectionStrategy:优先分区副本选举策略,仅支持ar中的第一个副本在isr中且存活的副本为leader
    2、将选举的结果写入zk中,对应节点为brokers/topics/{topic}/partitions/{partition}/state,数据格式为{“controller_epoch”:253,“leader”:2,“version”:1,“leader_epoch”:12,“isr”:[1,2]}
    3、更新controllerContext中的partitionLeadershipInfo信息,key为TopicPartition,value为LeaderIsrAndControllerEpoch
    (包含了LeaderAndIsr及controller的纪元)
    4、给每个还存活的副本发送leaderAndIsr请求
    5、给集群中每个broker发送updateMetadata请求

kafka.server.KafkaApis#handleLeaderAndIsrRequest

  • 针对leader主要是停掉fetch线程,然后处理日志
  • 针对follower就需要增加对leader的fetch线程,同步日志



https://www.xamrdz.com/database/66h1937410.html

相关文章: