当前位置: 首页>数据库>正文

ES 集群选举的过程

如果需要一个搜索工具,lucene 完全可以胜任,但是网上大火的却是 Elasticsearch,它对 lucene 进行了分布式的赋能,lucene 解决搜索底层的数据存储,Elasticsearch 提供接口和分布式能力。而集群则是分布式的基础,那么 Elasticsearch 是如何组建集群的呢?本文深入细节,详细分解 Elasticsearch 选举过程的每个关键步骤,结合文档和代码片段,来构建一个全面的理解框架。

1. 初始化与配置
启动时设置: 当 ES 节点启动时,会初始化 GatewayMetaState,包括设置当前任期、最后接受的配置等状态信息,节点的 uuid 等,节点选举相关的配置信息保存在 lucene 文件中(详见 LucenePersistedState)。

2. 节点发现与连接
SeedHostsResolver: 解析种子主机地址,帮助发现集群中的其他节点。
PeerFinder: 基于 elasticsearch.yml 中配置的种子节点信息,发现并连接到集群中的其他节点,为选举和状态传播准备通信渠道。

 

3. 引导主节点选举
ClusterBootstrapService: 解析 cluster.initial_master_nodes 配置,当发现的节点数超过 initial_master_nodes 一半,则节点满足了选举的条件。

4. 预投票(Pre-Voting)
PreVoteCollector: 在正式选举前,潜在的 master 节点通过 PreVoteCollector 向集群其他节点发送预投票请求,以评估是否有足够的支持成为领导者。
预投票请求: 通过 REQUEST_PRE_VOTE_ACTION_NAME 发起。
响应处理: 收集预投票响应,如果预投票结果表明有足够的支持,将触发正式选举。
预投票阶段主要比较节点的任期和集群状态的版本号,选择最新的。

 

5. 正式选举
JoinHelper: 发送正式的选举请求(JoinRequest),请求其他节点的投票。
投票逻辑: 其他节点根据当前集群状态、健康状况、任期号等因素决定是否投票给请求节点。
投票计数: 收集投票,依据 ElectionStrategy 判断是否达到多数(超过半数),选举出领导者。

6. 领导者确认与状态传播
CoordinationState: 成为领导者后,节点将通过 CoordinationState 处理客户端的集群状态更新请求,准备集群状态的发布。
PublicationTransportHandler: 负责将新集群状态发布到集群中的其他节点。
PublishRequest: 领导者向所有节点广播包含新状态的PublishRequest。
AckListener: 收集其他节点的确认响应,确保多数节点已经接受新状态。

7. 状态应用与确认
ClusterApplier: 应用新的集群状态,确保集群内所有节点状态一致。
ApplyCommitRequest: 领导者在收到足够数量的确认后,向所有节点发送ApplyCommitRequest,通知它们应用新状态。
CoordinatorPublication: 处理状态的提交和确认,确保状态最终被所有节点应用。

8. 健康监测与故障恢复
LeaderChecker: 监控领导者健康,一旦领导者不可用,触发重新选举。
FollowersChecker: 监控跟随者状态,确保集群稳定。

9. 特殊节点处理
VotingOnlyNodePlugin: 对于只参与投票的节点,有专门的处理逻辑,比如不参与数据发布等,确保资源优化和集群稳定性。

 

重要的配置项:

discovery.seed_hosts

种子节点的地址,集群中的节点通过种子节点,最终发现所有节点

cluster.initial_master_nodes

这个参数非常重要,如果 ES 集群初次启动,必须设置这个属性,如果没有设置,则整个集群无法完成选举,准确的说法是在预投票阶段无法满足选举人过半的要求,从而无法选举出 master。

一旦当集群成功完成选举后,再次启动时,则不再需要设置该属性了,因为此后,节点可以从磁盘配置文件中获取集群中的节点信息。

discovery.type

集群的类型,multi-node/single-node,默认是 multi-node

 

 

 

 

 

 

代码片段:

1. 

/**
 * Loads (and maybe upgrades) cluster metadata at startup, and persistently stores cluster metadata for future restarts.
 *
 * When started, ensures that this version is compatible with the state stored on disk, and performs a state upgrade if necessary. Note that
 * the state being loaded when constructing the instance of this class is not necessarily the state that will be used as {@link
 * ClusterState#metadata()} because it might be stale or incomplete. Master-eligible nodes must perform an election to find a complete and
 * non-stale state, and master-ineligible nodes receive the real cluster state from the elected master after joining the cluster.
 */
org.elasticsearch.gateway.GatewayMetaState

2. 

org.elasticsearch.discovery.PeerFinder#PeerFinder

org.elasticsearch.cluster.coordination.Coordinator.CoordinatorPeerFinder

3. 

// org.elasticsearch.cluster.coordination.ClusterBootstrapService#onFoundPeersUpdated
@Override
public void onFoundPeersUpdated() {
    final Set<DiscoveryNode> nodes = getDiscoveredNodes();
    if (bootstrappingPermitted.get()
        && transportService.getLocalNode().isMasterNode()
        && bootstrapRequirements.isEmpty() == false
        && isBootstrappedSupplier.getAsBoolean() == false) {

        final Tuple<Set<DiscoveryNode>, List<String>> requirementMatchingResult;
        try {
            requirementMatchingResult = checkRequirements(nodes);
        } catch (IllegalStateException e) {
            logger.warn("bootstrapping cancelled", e);
            bootstrappingPermitted.set(false);
            return;
        }

        final Set<DiscoveryNode> nodesMatchingRequirements = requirementMatchingResult.v1();
        final List<String> unsatisfiedRequirements = requirementMatchingResult.v2();
        logger.trace(
            "nodesMatchingRequirements={}, unsatisfiedRequirements={}, bootstrapRequirements={}",
            nodesMatchingRequirements,
            unsatisfiedRequirements,
            bootstrapRequirements
        );

        if (nodesMatchingRequirements.contains(transportService.getLocalNode()) == false) {
            logger.info(
                "skipping cluster bootstrapping as local node does not match bootstrap requirements: {}",
                bootstrapRequirements
            );
            bootstrappingPermitted.set(false);
            return;
        }

        if (nodesMatchingRequirements.size() * 2 > bootstrapRequirements.size()) {
            startBootstrap(nodesMatchingRequirements, unsatisfiedRequirements);
        }
    }
}

4.

// org.elasticsearch.cluster.coordination.PreVoteCollector#handlePreVoteRequest
private PreVoteResponse handlePreVoteRequest(final PreVoteRequest request) {
    updateMaxTermSeen.accept(request.getCurrentTerm());

    Tuple<DiscoveryNode, PreVoteResponse> state = this.state;
    assert state != null : "received pre-vote request before fully initialised";

    final DiscoveryNode leader = state.v1();
    final PreVoteResponse response = state.v2();

    final StatusInfo statusInfo = nodeHealthService.getHealth();
    if (statusInfo.getStatus() == UNHEALTHY) {
        String message = "rejecting " + request + " on unhealthy node: [" + statusInfo.getInfo() + "]";
        logger.debug(message);
        throw new NodeHealthCheckFailureException(message);
    }

    if (leader == null) {
        return response;
    }

    if (leader.equals(request.getSourceNode())) {
        // This is a _rare_ case where our leader has detected a failure and stepped down, but we are still a follower. It's possible
        // that the leader lost its quorum, but while we're still a follower we will not offer joins to any other node so there is no
        // major drawback in offering a join to our old leader. The advantage of this is that it makes it slightly more likely that the
        // leader won't change, and also that its re-election will happen more quickly than if it had to wait for a quorum of followers
        // to also detect its failure.
        return response;
    }

    throw new CoordinationStateRejectedException("rejecting " + request + " as there is already a leader");
}


// org.elasticsearch.cluster.coordination.PreVoteCollector.PreVotingRound#handlePreVoteResponse
private void handlePreVoteResponse(final PreVoteResponse response, final DiscoveryNode sender) {
    if (isClosed.get()) {
        logger.debug("{} is closed, ignoring {} from {}", this, response, sender);
        return;
    }

    updateMaxTermSeen.accept(response.getCurrentTerm());

    if (response.getLastAcceptedTerm() > clusterState.term()
        || (response.getLastAcceptedTerm() == clusterState.term() && response.getLastAcceptedVersion() > clusterState.version())) {
        logger.debug("{} ignoring {} from {} as it is fresher", this, response, sender);
        return;
    }

    preVotesReceived.put(sender, response);

    // create a fake VoteCollection based on the pre-votes and check if there is an election quorum
    final VoteCollection voteCollection = new VoteCollection();
    final DiscoveryNode localNode = clusterState.nodes().getLocalNode();
    final PreVoteResponse localPreVoteResponse = getPreVoteResponse();

    preVotesReceived.forEach(
        (node, preVoteResponse) -> voteCollection.addJoinVote(
            new Join(
                node,
                localNode,
                preVoteResponse.getCurrentTerm(),
                preVoteResponse.getLastAcceptedTerm(),
                preVoteResponse.getLastAcceptedVersion()
            )
        )
    );

    if (electionStrategy.isElectionQuorum(
        clusterState.nodes().getLocalNode(),
        localPreVoteResponse.getCurrentTerm(),
        localPreVoteResponse.getLastAcceptedTerm(),
        localPreVoteResponse.getLastAcceptedVersion(),
        clusterState.getLastCommittedConfiguration(),
        clusterState.getLastAcceptedConfiguration(),
        voteCollection
    ) == false) {
        logger.debug("{} added {} from {}, no quorum yet", this, response, sender);
        return;
    }

    if (electionStarted.compareAndSet(false, true) == false) {
        logger.debug("{} added {} from {} but election has already started", this, response, sender);
        return;
    }

    logger.debug("{} added {} from {}, starting election", this, response, sender);
    startElection.run();
}

 

持续补充


https://www.xamrdz.com/database/6an1963409.html

相关文章: