当前位置: 首页>后端>正文

Zookeeper Leader选举源码剖析

1.寻找启动类

首先从启动脚本找启动主类:


Zookeeper Leader选举源码剖析,第1张

ZOOMAIN是啥?

if [ "x$JMXDISABLE" = "x" ] || [ "$JMXDISABLE" = 'false' ]
then
  echo "ZooKeeper JMX enabled by default" >&2
  if [ "x$JMXPORT" = "x" ]
  then
    # for some reason these two options are necessary on jdk6 on Ubuntu
    #   accord to the docs they are not necessary, but otw jconsole cannot
    #   do a local attach
    ZOOMAIN="-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.local.only=$JMXLOCALONLY org.apache.zookeeper.server.quorum.QuorumPeerMain"
  else
    if [ "x$JMXAUTH" = "x" ]
    then
      JMXAUTH=false
    fi
    if [ "x$JMXSSL" = "x" ]
    then
      JMXSSL=false
    fi
    if [ "x$JMXLOG4J" = "x" ]
    then
      JMXLOG4J=true
    fi
    echo "ZooKeeper remote JMX Port set to $JMXPORT" >&2
    echo "ZooKeeper remote JMX authenticate set to $JMXAUTH" >&2
    echo "ZooKeeper remote JMX ssl set to $JMXSSL" >&2
    echo "ZooKeeper remote JMX log4j set to $JMXLOG4J" >&2
    ZOOMAIN="-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.port=$JMXPORT -Dcom.sun.management.jmxremote.authenticate=$JMXAUTH -Dcom.sun.management.jmxremote.ssl=$JMXSSL -Dzookeeper.jmx.log4j.disable=$JMXLOG4J org.apache.zookeeper.server.quorum.QuorumPeerMain"
  fi
else
    echo "JMX disabled by user request" >&2
    ZOOMAIN="org.apache.zookeeper.server.quorum.QuorumPeerMain"
fi

核心是org.apache.zookeeper.server.quorum.QuorumPeerMain类。

QuorumPeerMain#main

  • 1)解析配置文件config.parse(args[0]);
  • 2)启动定时清理任务:DatadirCleanupManager purgeMgr,并purgeMgr.start();
  • 3)分布式则启动runFromConfig(config);
  • 4)单机启动ZooKeeperServerMain.main(args);
    public static void main(String[] args) {
        QuorumPeerMain main = new QuorumPeerMain();
        try {
            main.initializeAndRun(args);
    protected void initializeAndRun(String[] args)
        throws ConfigException, IOException, AdminServerException
    {
        QuorumPeerConfig config = new QuorumPeerConfig();
        if (args.length == 1) {
            config.parse(args[0]);
        }

        // Start and schedule the the purge task
        DatadirCleanupManager purgeMgr = new DatadirCleanupManager(config
                .getDataDir(), config.getDataLogDir(), config
                .getSnapRetainCount(), config.getPurgeInterval());
        purgeMgr.start();

        if (args.length == 1 && config.isDistributed()) {
            runFromConfig(config);
        } else {
            LOG.warn("Either no config or no quorum defined in config, running "
                    + " in standalone mode");
            // there is only server in the quorum -- run as standalone
            ZooKeeperServerMain.main(args);
        }
    }

QuorumPeerMain#runFromConfig

  • 1)cnxnFactory = ServerCnxnFactory.createFactory();默认是NIOServerCnxnFactory,官方推荐Netty,需要配置以下NettyServerCnxnFactory。
  • 2)配置客户端端口号2181, cnxnFactory.configure(config.getClientPortAddress(), config.getMaxClientCnxns(), false);
  • 3)创建QuorumPeer,并设置各种参数,比如设置内存数据库ZKDatabase
  • 4)quorumPeer.initialize(); 权限相关authServer和authLearner;
  • 5)quorumPeer.start();
    5-1)loadDataBase(); 从snapLog加载数据到内存数据库,从currentEpoch文件读取currentEpoch,从acceptedEpoch文件读取acceptedEpoch;
    5-2)startServerCnxnFactory();调用NIOServerCnxnFactory#start或者NettyServerCnxnFactory#start;
    5-3)adminServer.start();adminServer启动Jetty服务器在8080端口,包括zk服务端的一些信息;
    5-4)startLeaderElection();
    ?A)LOOKING状态,创建选票 currentVote = new Vote(myid, getLastLoggedZxid(), getCurrentEpoch());
    ?B)this.electionAlg = createElectionAlgorithm(electionType);这里算法类型传入的是3。
    ??B-1)创建QuorumCnxManager;
    ??B-2)QuorumCnxManager.Listener#start,创建BIO通道,接收选票。创建ServerSocket ss,等待连接client = ss.accept();,然后receiveConnection(client);
    ??a)QuorumCnxManager#handleConnection
    ???a-1)获取客户端的sid,如果sid < self.getId(),则关闭连接,小的sid不能连接大的。
    ???a-2)sid == self.getId(),错误
    ???a-3)sid > self.getId(),正常情况,创建SendWorker和RecvWorker并启动,将sid和sw对应起来放到senderWorkerMap,将sid和ArrayBlockingQueue对应起来放到queueSendMap。
    ???一)QuorumCnxManager.SendWorker#run,从queueSendMap获取sid对应的阻塞队列bq,从bq中获取数据b,lastMessageSent.put(sid, b),然后send(b)。
    ???二)QuorumCnxManager.RecvWorker#run,从DataInputStream读取数据Message,放到recvQueue.add(msg)中,ArrayBlockingQueue。
    ??B-3)创建FastLeaderElection,创建了sendqueue和recvqueue,都是LinkedBlockingQueue,创建了Messenger messenger;
    ??B-4)FastLeaderElection#start,启动wsThread和wrThread(WorkerSender and WorkerReceiver)。收发选票的两个线程。
    ??a)FastLeaderElection.Messenger.WorkerSender#run,从sendqueue.poll()获取数据m,然后进行处理process(m);
    ???a-1)this.mySid == sid,自己发给自己的消息,recvQueue.add(msg);放到recvQueue即可。
    ???a-2)发给其他server的,则从queueSendMap根据sid获取对应的阻塞队列oldq,addToSendQueue(oldq, b);将消息放到oldq中。
    ??b)FastLeaderElection.Messenger.WorkerReceiver#run,从recvQueue.poll()获取响应response,然后进行处理。
    5-5)super.start();调用QuorumPeer#run
    ?A)LOOKING状态
    ?A-1)FastLeaderElection#lookForLeader
    ??a)logicalclock.incrementAndGet();
    ??b)updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());
    ??c)sendNotifications();发送选票sendqueue.offer(notmsg);
    ??d)recvqueue.poll()获取消息n
    ??e)如果n为空,manager.connectAll();调用QuorumCnxManager#initiateConnectionAsync,执行QuorumCnxManager.QuorumConnectionReqThread#run,在这里连接sock.connect(electionAddr, cnxTO);然后startConnection(sock, sid);这里会判断,sid小的不能主动连接sid大的。
    ??f)如果消息n不为空,且n中sid和leader有效
    ???f-1)n为LOOKING状态
    ???情况1)n.electionEpoch > logicalclock.get()
    ???情况2)除1)外,n.electionEpoch < logicalclock.get()
    ???情况3)除1)2)外,totalOrderPredicate()对比收到的选票n和自己选票,依次比较epoch、zxid、myid,有一个n比自己大,则将自己选票更新为收到的选票。并将更新后的选票发送给其他服务器。再进行大多数判定,如果收到选票中有某个投票超过了一半,则就可以终止选举,更新状态为LEADING或者FOLLOWING。然后将该选票返回,下面会设置到currentVote中。
    ???f-2)OBSERVING
    ???f-3)FOLLOWING和LEADING
    ?A-2)将A-1)返回的Vote设置到currentVote中。
    ?B)OBSERVING
    ?C)FOLLOWING
    ?D)LEADING

adminServer启动Jetty服务器在8080端口,包括zk服务端的一些信息:


Zookeeper Leader选举源码剖析,第2张
    public void runFromConfig(QuorumPeerConfig config)
            throws IOException, AdminServerException
    {
      try {
          ManagedUtil.registerLog4jMBeans();
      } catch (JMException e) {
          LOG.warn("Unable to register log4j JMX control", e);
      }

      LOG.info("Starting quorum peer, myid=" + config.getServerId());
      try {
          ServerCnxnFactory cnxnFactory = null;
          ServerCnxnFactory secureCnxnFactory = null;

          if (config.getClientPortAddress() != null) {
              cnxnFactory = ServerCnxnFactory.createFactory();
              cnxnFactory.configure(config.getClientPortAddress(),
                      config.getMaxClientCnxns(),
                      false);
          }

          if (config.getSecureClientPortAddress() != null) {
              secureCnxnFactory = ServerCnxnFactory.createFactory();
              secureCnxnFactory.configure(config.getSecureClientPortAddress(),
                      config.getMaxClientCnxns(),
                      true);
          }

          quorumPeer = getQuorumPeer();
          quorumPeer.setTxnFactory(new FileTxnSnapLog(
                      config.getDataLogDir(),
                      config.getDataDir()));
          quorumPeer.enableLocalSessions(config.areLocalSessionsEnabled());
          quorumPeer.enableLocalSessionsUpgrading(
              config.isLocalSessionsUpgradingEnabled());
          //quorumPeer.setQuorumPeers(config.getAllMembers());
          quorumPeer.setElectionType(config.getElectionAlg());
          quorumPeer.setMyid(config.getServerId());
          quorumPeer.setTickTime(config.getTickTime());
          quorumPeer.setMinSessionTimeout(config.getMinSessionTimeout());
          quorumPeer.setMaxSessionTimeout(config.getMaxSessionTimeout());
          quorumPeer.setInitLimit(config.getInitLimit());
          quorumPeer.setSyncLimit(config.getSyncLimit());
          quorumPeer.setConfigFileName(config.getConfigFilename());
          quorumPeer.setZKDatabase(new ZKDatabase(quorumPeer.getTxnFactory()));
          quorumPeer.setQuorumVerifier(config.getQuorumVerifier(), false);
          if (config.getLastSeenQuorumVerifier()!=null) {
              quorumPeer.setLastSeenQuorumVerifier(config.getLastSeenQuorumVerifier(), false);
          }
          quorumPeer.initConfigInZKDatabase();
          quorumPeer.setCnxnFactory(cnxnFactory);
          quorumPeer.setSecureCnxnFactory(secureCnxnFactory);
          quorumPeer.setSslQuorum(config.isSslQuorum());
          quorumPeer.setUsePortUnification(config.shouldUsePortUnification());
          quorumPeer.setLearnerType(config.getPeerType());
          quorumPeer.setSyncEnabled(config.getSyncEnabled());
          quorumPeer.setQuorumListenOnAllIPs(config.getQuorumListenOnAllIPs());
          if (config.sslQuorumReloadCertFiles) {
              quorumPeer.getX509Util().enableCertFileReloading();
          }

          // sets quorum sasl authentication configurations
          quorumPeer.setQuorumSaslEnabled(config.quorumEnableSasl);
          if(quorumPeer.isQuorumSaslAuthEnabled()){
              quorumPeer.setQuorumServerSaslRequired(config.quorumServerRequireSasl);
              quorumPeer.setQuorumLearnerSaslRequired(config.quorumLearnerRequireSasl);
              quorumPeer.setQuorumServicePrincipal(config.quorumServicePrincipal);
              quorumPeer.setQuorumServerLoginContext(config.quorumServerLoginContext);
              quorumPeer.setQuorumLearnerLoginContext(config.quorumLearnerLoginContext);
          }
          quorumPeer.setQuorumCnxnThreadsSize(config.quorumCnxnThreadsSize);
          quorumPeer.initialize();

          quorumPeer.start();
          quorumPeer.join();
      } catch (InterruptedException e) {
          // warn, but generally this is ok
          LOG.warn("Quorum Peer interrupted", e);
      }
    }

2.网络通信架构

网络通信架构包括几部分:

  • 1)客户端与ZK服务端通信,NIOServerCnxnFactory或者NettyServerCnxnFactory,端口号默认是2181;
  • 2)服务端之间Leader选举通信通道,BIO连接;
  • 3)Leader和Follower之间数据同步以及发送心跳,BIO连接。

QuorumPeer.AddressTuple

    public static final class AddressTuple {
        public final InetSocketAddress quorumAddr;
        public final InetSocketAddress electionAddr;
        public final InetSocketAddress clientAddr;

3.选举PK

整个zookeeper选举底层可以分为选举应用层和消息传输层,应用层有自己的队列统一接收和发送选票,传输层也设计了自己的队列,但是按发送的机器分了队列,避免给每台机器发送消息时相互影响,比如某台机器如果出问题发送不成功则不会影响对正常机器的消息发送。

3.1 Leader选举多层队列架构

Zookeeper Leader选举源码剖析,第3张
Zookeeper Leader选举源码剖析,第4张

为什么搞一个线程一个队列?万一某个机器坏掉阻塞,会影响其他机器收发选票,所以针对每一个机器都搞一个线程和队列,相互之间不影响

3.2 选举过程

Zookeeper Leader选举源码剖析,第5张

关于选举的logicalclock,参考 https://blog.csdn.net/qq_41688840/article/details/109840026

选举轮次,也就是逻辑时钟,即logicalclock。这个值,不会频繁变化,一次选举,自增一次。一次选举过程中,可能包括多次投票,投票不涉及逻辑时钟的自增。

Zookeeper Leader选举源码剖析,第6张

4.选举完成后,后面加入服务器

流程(机器1、机器2已经选举出2位Leader,机器3启动发起投票选举):

  • 1)机器3向机器1和机器2发送投票,比如机器2收到选票,WorkerReceriver#run从队列recvQueue取出选票进行处理,发现自己不是LOOKING状态,表明已经选出Leader,取出currentVote(也就是Leader的选票),如果发送方式LOOKING状态,则将该选票发送给机器3,放到sendqueue中。
  • 2)机器3收到选票后,发现选票状态是FOLLOWING或者LEADING,则说明已经选出了Leader。则会设置本机状态为FOLLOWING。

5.Leader、Follower连接、同步

QuorumPeer#run

    public void run() {
        updateThreadName();

        LOG.debug("Starting quorum peer");
        try {
            jmxQuorumBean = new QuorumBean(this);
            MBeanRegistry.getInstance().register(jmxQuorumBean, null);
            for(QuorumServer s: getView().values()){
                ZKMBeanInfo p;
                if (getId() == s.id) {
                    p = jmxLocalPeerBean = new LocalPeerBean(this);
                    try {
                        MBeanRegistry.getInstance().register(p, jmxQuorumBean);
                    } catch (Exception e) {
                        LOG.warn("Failed to register with JMX", e);
                        jmxLocalPeerBean = null;
                    }
                } else {
                    RemotePeerBean rBean = new RemotePeerBean(this, s);
                    try {
                        MBeanRegistry.getInstance().register(rBean, jmxQuorumBean);
                        jmxRemotePeerBean.put(s.id, rBean);
                    } catch (Exception e) {
                        LOG.warn("Failed to register with JMX", e);
                    }
                }
            }
        } catch (Exception e) {
            LOG.warn("Failed to register with JMX", e);
            jmxQuorumBean = null;
        }

        try {
            /*
             * Main loop
             */
            while (running) {
                switch (getPeerState()) {
                case LOOKING:
                    LOG.info("LOOKING");

                    if (Boolean.getBoolean("readonlymode.enabled")) {
                        LOG.info("Attempting to start ReadOnlyZooKeeperServer");

                        // Create read-only server but don't start it immediately
                        final ReadOnlyZooKeeperServer roZk =
                            new ReadOnlyZooKeeperServer(logFactory, this, this.zkDb);

                        // Instead of starting roZk immediately, wait some grace
                        // period before we decide we're partitioned.
                        //
                        // Thread is used here because otherwise it would require
                        // changes in each of election strategy classes which is
                        // unnecessary code coupling.
                        Thread roZkMgr = new Thread() {
                            public void run() {
                                try {
                                    // lower-bound grace period to 2 secs
                                    sleep(Math.max(2000, tickTime));
                                    if (ServerState.LOOKING.equals(getPeerState())) {
                                        roZk.startup();
                                    }
                                } catch (InterruptedException e) {
                                    LOG.info("Interrupted while attempting to start ReadOnlyZooKeeperServer, not started");
                                } catch (Exception e) {
                                    LOG.error("FAILED to start ReadOnlyZooKeeperServer", e);
                                }
                            }
                        };
                        try {
                            roZkMgr.start();
                            reconfigFlagClear();
                            if (shuttingDownLE) {
                                shuttingDownLE = false;
                                startLeaderElection();
                            }
                            setCurrentVote(makeLEStrategy().lookForLeader());
                        } catch (Exception e) {
                            LOG.warn("Unexpected exception", e);
                            setPeerState(ServerState.LOOKING);
                        } finally {
                            // If the thread is in the the grace period, interrupt
                            // to come out of waiting.
                            roZkMgr.interrupt();
                            roZk.shutdown();
                        }
                    } else {
                        try {
                           reconfigFlagClear();
                            if (shuttingDownLE) {
                               shuttingDownLE = false;
                               startLeaderElection();
                               }
                            setCurrentVote(makeLEStrategy().lookForLeader());
                        } catch (Exception e) {
                            LOG.warn("Unexpected exception", e);
                            setPeerState(ServerState.LOOKING);
                        }
                    }
                    break;
                case OBSERVING:
                    try {
                        LOG.info("OBSERVING");
                        setObserver(makeObserver(logFactory));
                        observer.observeLeader();
                    } catch (Exception e) {
                        LOG.warn("Unexpected exception",e );
                    } finally {
                        observer.shutdown();
                        setObserver(null);
                       updateServerState();
                    }
                    break;
                case FOLLOWING:
                    try {
                       LOG.info("FOLLOWING");
                        setFollower(makeFollower(logFactory));
                        follower.followLeader();
                    } catch (Exception e) {
                       LOG.warn("Unexpected exception",e);
                    } finally {
                       follower.shutdown();
                       setFollower(null);
                       updateServerState();
                    }
                    break;
                case LEADING:
                    LOG.info("LEADING");
                    try {
                        setLeader(makeLeader(logFactory));
                        leader.lead();
                        setLeader(null);
                    } catch (Exception e) {
                        LOG.warn("Unexpected exception",e);
                    } finally {
                        if (leader != null) {
                            leader.shutdown("Forcing shutdown");
                            setLeader(null);
                        }
                        updateServerState();
                    }
                    break;
                }
                start_fle = Time.currentElapsedTime();
            }
        } finally {
            LOG.warn("QuorumPeer main thread exited");
            MBeanRegistry instance = MBeanRegistry.getInstance();
            instance.unregister(jmxQuorumBean);
            instance.unregister(jmxLocalPeerBean);

            for (RemotePeerBean remotePeerBean : jmxRemotePeerBean.values()) {
                instance.unregister(remotePeerBean);
            }

            jmxQuorumBean = null;
            jmxLocalPeerBean = null;
            jmxRemotePeerBean = null;
        }
    }

核心:

  • 1)Leader
    1-1)会创建ServerSocket;
    1-2)leader.lead()会加载数据并且启动线程LearnerCnxAcceptor,这里会等待Follower连接,然后接收Follower数据并开启线程LearnerHandler进行处理;
    1-3)startZkServer()构建Leader请求处理责任链;
    1-4)Leader向所有Follower发送PING请求保持长连接。
  • 2)Follower
    2-1)主动向leader发起socket连接;
    2-2)注册自己到Leader;
    2-3)从Leader同步数据:syncWithLeader();
    2-4)while死循环接收Leader同步过来的数据并进行处理。

6.Leader崩溃选举

如果Leader崩溃,上面Follower会在QuorumPeer#run中检测到,此时Follower会跳出while死循环,抛出异常,然后执行到finally逻辑:

                case FOLLOWING:
                    try {
                       LOG.info("FOLLOWING");
                        setFollower(makeFollower(logFactory));
                        follower.followLeader();
                    } catch (Exception e) {
                       LOG.warn("Unexpected exception",e);
                    } finally {
                       follower.shutdown();
                       setFollower(null);
                       updateServerState();
                    }

此时updateServerState()会更新状态为LOOKING,也就会重新发起选举。


https://www.xamrdz.com/backend/3a31941324.html

相关文章: