分区leader选举脚本执行介绍
脚本参数配置
kafka触发副本选举的脚本是bin/kafka-leader-election.sh
配置参数为:
序号 | 参数 | 说明 |
1 | bootstrap-server | kafka集群地址清单 |
2 | admin.config | 指定客户端config配置文件 |
3 | path-to-json-file | 指定选举副本JSON文件 |
4 | topic | path-to-json-file或all-topic-partitions或指定topic |
5 | partition | path-to-json-file或all-topic-partitions或指定topic对应的partition |
6 | all-topic-partitions | 对所有的topicPartition进行选举 |
7 | election-type | 指定选举的类型,preferred or unclean |
脚本执行样例
对topic_1来执行副本选举,在执行之前zk中的数据如下:
- topic_1副本数据为{“version”:2,“partitions”:{“0”:[2,1,3]},“adding_replicas”:{},“removing_replicas”:{}}
- state节点数据为{“controller_epoch”:253,“leader”:1,“version”:1,“leader_epoch”:10,“isr”:[1,2]}
可以看到topic_1副本数据中0号分区的副本为[2,1,3]这个就是我们常说的ar,优先副本为2,目前leader为1,我们通过执行脚本将leader改成2,准备json文件如下:
{
"partitions": [
{
"topic": "topic_1",
"partition": 0
}
]
}
- 执行如下脚本
sh bin/kafka-leader-election.sh --bootstrap-server 127.0.0.1:9092 --path-to-json-file config/leaderElection/election
.json --election-type preferred
- 执行后控制台打印如下日志表示执行成功
Successfully completed leader election (PREFERRED) for partitions topic_1-0
- 完成选举后zk中针对topic_1的state节点的数据如下,leader会切换到2号副本
{"controller_epoch":253,"leader":2,"version":1,"leader_epoch":12,"isr":[1,2]}
分区leader流程
分区leader选举源码解析
入口在kafka.controller.KafkaController#processReplicaLeaderElection
private def processReplicaLeaderElection(
partitionsFromAdminClientOpt: Option[Set[TopicPartition]],
electionType: ElectionType,
electionTrigger: ElectionTrigger,
callback: ElectLeadersCallback
): Unit = {
//可以看到有两种触发方式,一种是通过执行脚本触发,另一种是写入zk节点admin/preferred_replica_election来触发
if (electionTrigger == AdminClientTriggered || zkClient.registerZNodeChangeHandlerAndCheckExistence(preferredReplicaElectionHandler)) {
val partitions = partitionsFromAdminClientOpt match {
case Some(partitions) => partitions
case None => zkClient.getPreferredReplicaElection
}
//校验分区是否有效
val (knownPartitions, unknownPartitions) = partitions.partition(tp => controllerContext.allPartitions.contains(tp))
unknownPartitions.foreach { p =>
info(s"Skipping replica leader election ($electionType) for partition $p by $electionTrigger since it doesn't exist.")
}
//如果该topicPartition在删除队列中则忽略
val (partitionsBeingDeleted, livePartitions) = knownPartitions.partition(partition =>
topicDeletionManager.isTopicQueuedUpForDeletion(partition.topic))
if (partitionsBeingDeleted.nonEmpty) {
warn(s"Skipping replica leader election ($electionType) for partitions $partitionsBeingDeleted " +
s"by $electionTrigger since the respective topics are being deleted")
}
// 根据传入的electionType来校验数据,如果是优先副本选举则在当前leader不是优先副本的话才会触发优先副本选举,如果是unclean的话需要当前leader不存在或者已下线
val (electablePartitions, alreadyValidLeader) = livePartitions.partition { partition =>
electionType match {
case ElectionType.PREFERRED =>
val assignedReplicas = controllerContext.partitionReplicaAssignment(partition)
val preferredReplica = assignedReplicas.head
val currentLeader = controllerContext.partitionLeadershipInfo(partition).leaderAndIsr.leader
currentLeader != preferredReplica
case ElectionType.UNCLEAN =>
val currentLeader = controllerContext.partitionLeadershipInfo(partition).leaderAndIsr.leader
currentLeader == LeaderAndIsr.NoLeader || !controllerContext.liveBrokerIds.contains(currentLeader)
}
}
//2.1 进行副本选举
val results = onReplicaElection(electablePartitions, electionType, electionTrigger).map {
case (k, Left(ex)) =>
if (ex.isInstanceOf[StateChangeFailedException]) {
val error = if (electionType == ElectionType.PREFERRED) {
Errors.PREFERRED_LEADER_NOT_AVAILABLE
} else {
Errors.ELIGIBLE_LEADERS_NOT_AVAILABLE
}
k -> Left(new ApiError(error, ex.getMessage))
} else {
k -> Left(ApiError.fromThrowable(ex))
}
case (k, Right(leaderAndIsr)) => k -> Right(leaderAndIsr.leader)
} ++
alreadyValidLeader.map(_ -> Left(new ApiError(Errors.ELECTION_NOT_NEEDED))) ++
partitionsBeingDeleted.map(
_ -> Left(new ApiError(Errors.INVALID_TOPIC_EXCEPTION, "The topic is being deleted"))
) ++
unknownPartitions.map(
_ -> Left(new ApiError(Errors.UNKNOWN_TOPIC_OR_PARTITION, "The partition does not exist."))
)
debug(s"Waiting for any successful result for election type ($electionType) by $electionTrigger for partitions: $results")
callback(results)
}
}
kafka.controller.KafkaController#onReplicaElection
可以看到这个代码逻辑也很简单,就是设置leader选举策略,然后调用分区状态机来执行leader选举
private[this] def onReplicaElection(
partitions: Set[TopicPartition],
electionType: ElectionType,
electionTrigger: ElectionTrigger
): Map[TopicPartition, Either[Throwable, LeaderAndIsr]] = {
info(s"Starting replica leader election ($electionType) for partitions ${partitions.mkString(",")} triggered by $electionTrigger")
try {
//根据传入的electionType来设置选举策略,如果是PREFERRED则为优先副本分区leader选举策略,如果是UNCLEAN则为离线分区leader选举策略
val strategy = electionType match {
case ElectionType.PREFERRED => PreferredReplicaPartitionLeaderElectionStrategy
case ElectionType.UNCLEAN =>
/* Let's be conservative and only trigger unclean election if the election type is unclean and it was
* triggered by the admin client
*/
OfflinePartitionLeaderElectionStrategy(allowUnclean = electionTrigger == AdminClientTriggered)
}
//直接调用分区状态机修改分区状态
val results = partitionStateMachine.handleStateChanges(
partitions.toSeq,
OnlinePartition,
Some(strategy)
)
if (electionTrigger != AdminClientTriggered) {
results.foreach {
case (tp, Left(throwable)) =>
if (throwable.isInstanceOf[ControllerMovedException]) {
info(s"Error completing replica leader election ($electionType) for partition $tp because controller has moved to another broker.", throwable)
throw throwable
} else {
error(s"Error completing replica leader election ($electionType) for partition $tp", throwable)
}
case (_, Right(_)) => // Ignored; No need to log or throw exception for the success cases
}
}
results
} finally {
if (electionTrigger != AdminClientTriggered) {
removePartitionsFromPreferredReplicaElection(partitions, electionTrigger == AutoTriggered)
}
}
}
kafka.controller.ZkPartitionStateMachine#doHandleStateChanges
在这里我们再次回顾分区状态机中的代码,着重看一下与leader选举相关的部分,因为这边是修改分区leader,所以走的是状态机中OnlinePartition->OnlinePartition部分
private def doElectLeaderForPartitions(
partitions: Seq[TopicPartition],
partitionLeaderElectionStrategy: PartitionLeaderElectionStrategy
): (Map[TopicPartition, Either[Exception, LeaderAndIsr]], Seq[TopicPartition]) = {
val getDataResponses = try {
zkClient.getTopicPartitionStatesRaw(partitions)
} catch {
case e: Exception =>
return (partitions.iterator.map(_ -> Left(e)).toMap, Seq.empty)
}
val failedElections = mutable.Map.empty[TopicPartition, Either[Exception, LeaderAndIsr]]
val validLeaderAndIsrs = mutable.Buffer.empty[(TopicPartition, LeaderAndIsr)]
getDataResponses.foreach { getDataResponse =>
val partition = getDataResponse.ctx.get.asInstanceOf[TopicPartition]
val currState = partitionState(partition)
if (getDataResponse.resultCode == Code.OK) {
//校验controller的纪元
TopicPartitionStateZNode.decode(getDataResponse.data, getDataResponse.stat) match {
case Some(leaderIsrAndControllerEpoch) =>
if (leaderIsrAndControllerEpoch.controllerEpoch > controllerContext.epoch) {
val failMsg = s"Aborted leader election for partition $partition since the LeaderAndIsr path was " +
s"already written by another controller. This probably means that the current controller $controllerId went through " +
s"a soft failure and another controller was elected with epoch ${leaderIsrAndControllerEpoch.controllerEpoch}."
failedElections.put(partition, Left(new StateChangeFailedException(failMsg)))
} else {
validLeaderAndIsrs += partition -> leaderIsrAndControllerEpoch.leaderAndIsr
}
case None =>
val exception = new StateChangeFailedException(s"LeaderAndIsr information doesn't exist for partition $partition in $currState state")
failedElections.put(partition, Left(exception))
}
} else if (getDataResponse.resultCode == Code.NONODE) {
val exception = new StateChangeFailedException(s"LeaderAndIsr information doesn't exist for partition $partition in $currState state")
failedElections.put(partition, Left(exception))
} else {
failedElections.put(partition, Left(getDataResponse.resultException.get))
}
}
if (validLeaderAndIsrs.isEmpty) {
return (failedElections.toMap, Seq.empty)
}
//可以看到这里有四种选举策略,初始化的时候使用的是OfflinePartitionLeaderElectionStrategy策略
val (partitionsWithoutLeaders, partitionsWithLeaders) = partitionLeaderElectionStrategy match {
//离线分区leader选举策略:优先选取isr中还存活的第一个副本作为leader,如果传入的allowUnclean为true或者topic中的配置允许选举,则可选其他副本作为leader
case OfflinePartitionLeaderElectionStrategy(allowUnclean) =>
val partitionsWithUncleanLeaderElectionState = collectUncleanLeaderElectionState(
validLeaderAndIsrs,
allowUnclean
)
leaderForOffline(controllerContext, partitionsWithUncleanLeaderElectionState).partition(_.leaderAndIsr.isEmpty)
case ReassignPartitionLeaderElectionStrategy =>
//分区重分配leader选举策略:选取isr中还存活的第一个副本作为leader
leaderForReassign(controllerContext, validLeaderAndIsrs).partition(_.leaderAndIsr.isEmpty)
case PreferredReplicaPartitionLeaderElectionStrategy =>
//优先分区副本选举策略:也是isr中还存活的第一个副本作为leader
leaderForPreferredReplica(controllerContext, validLeaderAndIsrs).partition(_.leaderAndIsr.isEmpty)
case ControlledShutdownPartitionLeaderElectionStrategy =>
//controller宕机分区leader选举策略:不在宕机broker之列的isr中还存活的第一个副本作为leader
leaderForControlledShutdown(controllerContext, validLeaderAndIsrs).partition(_.leaderAndIsr.isEmpty)
}
partitionsWithoutLeaders.foreach { electionResult =>
val partition = electionResult.topicPartition
val failMsg = s"Failed to elect leader for partition $partition under strategy $partitionLeaderElectionStrategy"
failedElections.put(partition, Left(new StateChangeFailedException(failMsg)))
}
val recipientsPerPartition = partitionsWithLeaders.map(result => result.topicPartition -> result.liveReplicas).toMap
val adjustedLeaderAndIsrs = partitionsWithLeaders.map(result => result.topicPartition -> result.leaderAndIsr.get).toMap
//将前面得到的isr信息写入zk
val UpdateLeaderAndIsrResult(finishedUpdates, updatesToRetry) = zkClient.updateLeaderAndIsr(
adjustedLeaderAndIsrs, controllerContext.epoch, controllerContext.epochZkVersion)
//写入zk之后更新cache中的partitionLeadershipInfo信息,并给每个分区还存活的副本发送leaderAndIsr请求,给每个broker发送更新元数据请求
finishedUpdates.foreach { case (partition, result) =>
result.right.foreach { leaderAndIsr =>
val replicaAssignment = controllerContext.partitionFullReplicaAssignment(partition)
val leaderIsrAndControllerEpoch = LeaderIsrAndControllerEpoch(leaderAndIsr, controllerContext.epoch)
controllerContext.partitionLeadershipInfo.put(partition, leaderIsrAndControllerEpoch)
controllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers(recipientsPerPartition(partition), partition,
leaderIsrAndControllerEpoch, replicaAssignment, isNew = false)
}
}
(finishedUpdates ++ failedElections, updatesToRetry)
}
分区状态机中主要做了以下几件事
1、根据相应的策略选出leader,我们这里涉及到两种策略:
- OfflinePartitionLeaderElectionStrategy:离线分区leader选举策略,优先选取isr中还存活的副本为leader,根据入参或者topic配置支持不同步的副本选举为leader
- PreferredReplicaPartitionLeaderElectionStrategy:优先分区副本选举策略,仅支持ar中的第一个副本在isr中且存活的副本为leader
2、将选举的结果写入zk中,对应节点为brokers/topics/{topic}/partitions/{partition}/state,数据格式为{“controller_epoch”:253,“leader”:2,“version”:1,“leader_epoch”:12,“isr”:[1,2]}
3、更新controllerContext中的partitionLeadershipInfo信息,key为TopicPartition,value为LeaderIsrAndControllerEpoch
(包含了LeaderAndIsr及controller的纪元)
4、给每个还存活的副本发送leaderAndIsr请求
5、给集群中每个broker发送updateMetadata请求
kafka.server.KafkaApis#handleLeaderAndIsrRequest
- 针对leader主要是停掉fetch线程,然后处理日志
- 针对follower就需要增加对leader的fetch线程,同步日志