当前位置: 首页>编程语言>正文

Ceph Monitor 启动流程分析

Ceph Pacific v16.2.13

Monitor Startup

Monitor 的启动流程分为两个阶段,第一个阶段为 mkfs,完成后退出;然后重新启动 monitor。monitor 的数据存储目录结构如下所示:

~/go/src/ceph/build (test ?) MDS=0 RGW=1 ../src/vstart.sh -d -l -n --bluestore
~/go/src/ceph/build (test ?) tree dev/mon.a
dev/mon.a
├── keyring
├── kv_backend
├── min_mon_release
└── store.db
    ├── 000017.sst
    ├── 000019.sst
    ├── 000207.log
    ├── CURRENT
    ├── IDENTITY
    ├── LOCK
    ├── MANIFEST-000206
    └── OPTIONS-000209

mkfs

    if (mkfs) {
      ...
      
      MonitorDBStore store(g_conf()->mon_data); // 创建存储 monitor 的 store,默认为 rocksdb   
      ostringstream oss;                                                                                  
      int r = store.create_and_open(oss);                                                           
      if (oss.tellp())                                                                                    
        derr << oss.str() << dendl;                                                                       
      if (r < 0) {                                                                                        
        derr << argv[0] << ": error opening mon data directory at '"                                      
             << g_conf()->mon_data << "': " << cpp_strerror(r) << dendl;                          
        exit(1); status:                                                                                  
      }                                                                                                   
      ceph_assert(r == 0);                                                                                
                                                                                                          
      Monitor mon(g_ceph_context, g_conf()->name.get_id(), &store, 0, 0, &monmap); // src/mon/Monitor.cc 
      r = mon.mkfs(osdmapbl); &:                                                                          
      if (r < 0) {                                                                                        
        derr << argv[0] << ": error creating monfs: " << cpp_strerror(r) << dendl;                  
        exit(1);                                                                             
      }                                                                                                   
      store.close();                                                                                      
      dout(0) << argv[0] << ": created monfs at " << g_conf()->mon_data                                   
          << " for " << g_conf()->name << dendl;                                                          
      return 0; // mkfs 结束后直接返回了
    }
  • mkfs 首先在 store 中记录 CEPH_MON_ONDISK_MAGIC,可以通过 ceph-kvstore-tool 进行查看,如下所示:
~/go/src/ceph/build (test ?) ceph-kvstore-tool rocksdb dev/mon.b/store.db/ get monitor magic              
(monitor, magic)
00000000  63 65 70 68 20 6d 6f 6e  20 76 6f 6c 75 6d 65 20  |ceph mon volume |
00000010  76 30 31 32 0a                                    |v012.|
00000015
  • 然后记录其支持的 feature set
~/go/src/ceph/build (test ?) ceph-kvstore-tool rocksdb dev/mon.b/store.db/ get monitor feature_set     
(monitor, feature_set)
00000000  00 00 00 00 00 00 00 00  00 00 00 00 00 00 00 00  |................|
00000010  00 00 00 00 00 00 00 00  fa 3f 00 00 00 00 00 00  |.........?......|
00000020  0c 00 00 00 01 00 00 00  00 00 00 00 1b 00 00 00  |................|
00000030  69 6e 69 74 69 61 6c 20  66 65 61 74 75 72 65 20  |initial feature |
00000040  73 65 74 20 28 7e 76 2e  31 38 29 03 00 00 00 00  |set (~v.18).....|
00000050  00 00 00 22 00 00 00 73  69 6e 67 6c 65 20 70 61  |..."...single pa|
00000060  78 6f 73 20 77 69 74 68  20 6b 2f 76 20 73 74 6f  |xos with k/v sto|
00000070  72 65 20 28 76 30 2e 3f  29 04 00 00 00 00 00 00  |re (v0.?).......|
00000080  00 1a 00 00 00 73 75 70  70 6f 72 74 20 65 72 61  |.....support era|
00000090  73 75 72 65 20 63 6f 64  65 20 70 6f 6f 6c 73 05  |sure code pools.|
000000a0  00 00 00 00 00 00 00 19  00 00 00 6e 65 77 2d 73  |...........new-s|
000000b0  74 79 6c 65 20 6f 73 64  6d 61 70 20 65 6e 63 6f  |tyle osdmap enco|
000000c0  64 69 6e 67 06 00 00 00  00 00 00 00 1c 00 00 00  |ding............|
000000d0  73 75 70 70 6f 72 74 20  69 73 61 2f 6c 72 63 20  |support isa/lrc |
000000e0  65 72 61 73 75 72 65 20  63 6f 64 65 07 00 00 00  |erasure code....|
000000f0  00 00 00 00 19 00 00 00  73 75 70 70 6f 72 74 20  |........support |
00000100  73 68 65 63 20 65 72 61  73 75 72 65 20 63 6f 64  |shec erasure cod|
00000110  65 08 00 00 00 00 00 00  00 17 00 00 00 73 75 70  |e............sup|
00000120  70 6f 72 74 20 6d 6f 6e  6d 61 70 20 66 65 61 74  |port monmap feat|
00000130  75 72 65 73 09 00 00 00  00 00 00 00 16 00 00 00  |ures............|
00000140  6c 75 6d 69 6e 6f 75 73  20 6f 6e 64 69 73 6b 20  |luminous ondisk |
00000150  6c 61 79 6f 75 74 0a 00  00 00 00 00 00 00 13 00  |layout..........|
00000160  00 00 6d 69 6d 69 63 20  6f 6e 64 69 73 6b 20 6c  |..mimic ondisk l|
00000170  61 79 6f 75 74 0b 00 00  00 00 00 00 00 16 00 00  |ayout...........|
00000180  00 6e 61 75 74 69 6c 75  73 20 6f 6e 64 69 73 6b  |.nautilus ondisk|
00000190  20 6c 61 79 6f 75 74 0c  00 00 00 00 00 00 00 15  | layout.........|
000001a0  00 00 00 6f 63 74 6f 70  75 73 20 6f 6e 64 69 73  |...octopus ondis|
000001b0  6b 20 6c 61 79 6f 75 74  0d 00 00 00 00 00 00 00  |k layout........|
000001c0  15 00 00 00 70 61 63 69  66 69 63 20 6f 6e 64 69  |....pacific ondi|
000001d0  73 6b 20 6c 61 79 6f 75  74                       |sk layout|
000001d9
  • 接着记录 monmap 信息,也可以通过 get mkfs monmap 进行查看。注意,monitor 启动后会将其删除,其只存在于 mkfs 阶段。
  • 如果启动时有指定 --osdmap 参数,则会从其中读取 osdmap 信息,然后保存在 store 中,可以通过 get mkfs osdmap 进行查看。
  • 记录 keyring 信息,可以通过 get mkfs keyring 进行查看。
  • 记录 fsid,可以通过 get monitor cluster_uuid 进行查看。
  /*                                                                                                  
   * this is the closest thing to a traditional 'mkfs' for ceph.                                      
   * initialize the monitor state machines to their initial values.                                   
   */                                                                                                 
  int Monitor::mkfs(bufferlist& osdmapbl)                                                             
  {                                                                                                   
    auto t(std::make_shared<MonitorDBStore::Transaction>());                                          
                                                                                                      
    // verify cluster fsid                                                                            
    int r = check_fsid();                                                                             
    if (r < 0 && r != -ENOENT)                                                                        
      return r;                                                                                       
                                                                                                      
    bufferlist magicbl;                                                                               
    magicbl.append(CEPH_MON_ONDISK_MAGIC);                                                     
    magicbl.append("\n");                                                                         
    t->put(MONITOR_NAME, "magic", magicbl);// 记录 mon magic number, 即写入 monitor magic 信息 
                                                                                                      
                                                                                                      
    features = get_initial_supported_features();                                                      
    write_features(t);                                                                                
                                                                                                      
    // save monmap, osdmap, keyring.                                                                  
    bufferlist monmapbl;                                                                              
    monmap->encode(monmapbl, CEPH_FEATURES_ALL); &blist: con_features:                                
    monmap->set_epoch(0);     // must be 0 to avoid confusing first MonmapMonitor::update_from_paxos()                                 
    t->put("mkfs", "monmap", monmapbl); // 记录 mkfs monmap 信息                  
                                                                                                      
    if (osdmapbl.length()) {                                                                          
      // make sure it's a valid osdmap                                                                
      try {                                                                                           
        OSDMap om;                                                                                    
        om.decode(osdmapbl);                                                                     
      }
      catch (ceph::buffer::error& e) {                                                                
        derr << "error decoding provided osdmap: " << e.what() << dendl;                              
        return -EINVAL;                                                                               
      }                                                                                               
      t->put("mkfs", "osdmap", osdmapbl); // 记录 mkfs osdmap 信息                 
    }                                                                                                 
                                                                                                      
    if (is_keyring_required()) {                                                                      
      KeyRing keyring;                                                                                                                       
      string keyring_filename;                                                                        
                                                                                                      
      ...                                                                                               
                                                                                                      
      // put mon. key in external keyring; seed with everything else.                                 
      extract_save_mon_key(keyring);                                                            
                                                                                                      
      bufferlist keyringbl;                                                                           
      keyring.encode_plaintext(keyringbl);                                                     
      t->put("mkfs", "keyring", keyringbl); // 写入 mkfs keyring 信息          
    }                                                                                                 
    write_fsid(t); // 写 fsid                                                                         
    store->apply_transaction(t);                                                                      
                                                                                                      
    return 0;                                                                                         
  }

startup

    mon = new Monitor(g_ceph_context, g_conf()->name.get_id(), store,                 
              msgr, mgr_msgr, &monmap);  // 实例化 Monitor                                        
                                                                                                          
    mon->orig_argc = argc;                                                                                    
    mon->orig_argv = argv;                                                                                
                                                                                                          
    if (force_sync) {                                                                                     
      derr << "flagging a forced sync ..." << dendl;                                                      
      ostringstream oss;                                                                                  
      JSONFormatter jf(true);                                                                         
      mon->sync_force(&jf);                                                                          
      derr << "out:\n";                                                                                   
      jf.flush(*_dout);                                                                         
      *_dout << dendl;                                                                                    
    }                                                                                                     
                                                                                                          
    err = mon->preinit(); // mon 初始化                                                                   
    if (err < 0) {                                                                                        
      derr << "failed to initialize" << dendl;                                                            
      prefork.exit(1); r:                                                                                 
    }                                                                                                     
                                                                                                          
    if (compact || g_conf()->mon_compact_on_start) {                                                      
      derr << "compacting monitor store ..." << dendl;                                                    
      mon->store->compact();                                                                              
      derr << "done compacting" << dendl;                                                                 
    }   
    
    ...
    
    msgr->start();
    mgr_msgr->start();
    
    mon->set_mon_crush_location(crush_loc);                                                 
    mon->init(); // 继续 mon 初始化 
    ...

实例化 Monitor

  Monitor::Monitor(CephContext* cct_, string nm, MonitorDBStore *s,                   
           Messenger *m, Messenger *mgr_m, MonMap *map) :                                             
    Dispatcher(cct_),                                                                                 
    AuthServer(cct_),                                                                         
    name(nm),                                                                                         
    rank(-1),  // 初始化为 -1                                                                         
    messenger(m),                                                                                     
    con_self(m m->get_loopback_connection() : NULL),                                                
    timer(cct_, lock),                                                                                
    finisher(cct_, "mon_finisher", "fin"),                                                  
    cpu_tp(cct, "Monitor::cpu_tp", "cpu_tp", g_conf()->mon_cpu_threads),           
    has_ever_joined(false),                                                                           
    logger(NULL), cluster_logger(NULL), cluster_logger_registered(false),                             
    monmap(map),                                                                                      
    log_client(cct_, messenger, monmap, LogClient::FLAG_MON),                      
    key_server(cct, &keyring), cct_: extra_secrets:                                                   
    auth_cluster_required(cct,                                                                        
              cct->_conf->auth_supported.empty()                                              
              cct->_conf->auth_cluster_required : cct->_conf->auth_supported),                        
    auth_service_required(cct,                                                                               
              cct->_conf->auth_supported.empty()                                              
              cct->_conf->auth_service_required : cct->_conf->auth_supported),                        
    mgr_messenger(mgr_m),                                                                                     
    mgr_client(cct_, mgr_m, monmap),                                                        
    gss_ktfile_client(cct->_conf.get_val<std::string>("gss_ktab_client_file")),                  
    store(s),                                                                                         
                                                                                                      
    elector(this, map->strategy), // 初始化 elector,会设置 strategy               
    required_features(0),                                                                             
    leader(0),                                                                                        
    quorum_con_features(0),                                                                           
    // scrub                                                                                                                                 
    scrub_version(0),                                                                                 
    scrub_event(NULL),
    scrub_timeout_event(NULL),                                                                        
                                                                                                      
    // sync state                                                                                     
    sync_provider_count(0),                                                                           
    sync_cookie(0),                                                                                   
    sync_full(false),                                                                                 
    sync_start_version(0),                                                                            
    sync_timeout_event(NULL),                                                                         
    sync_last_committed_floor(0),                                                                     
                                                                                                      
    timecheck_round(0),                                                                               
    timecheck_acks(0),                                                                                
    timecheck_rounds_since_clean(0),                                                                  
    timecheck_event(NULL),                                                                            
                                                                                                      
    admin_hook(NULL),                                                                                 
    routed_request_tid(0),                                                                            
    op_tracker(cct, g_conf().get_val<bool>("mon_enable_op_tracker"), 1) 
  {
    ....
    paxos = std::make_unique<Paxos>(*this, "paxos");                                                  
    // 所有的 paxos 服务                                                                              
    paxos_service[PAXOS_MDSMAP].reset(new MDSMonitor(*this, *paxos, "mdsmap"));
    paxos_service[PAXOS_MONMAP].reset(new MonmapMonitor(*this, *paxos, "monmap")); 
    paxos_service[PAXOS_OSDMAP].reset(new OSDMonitor(cct, *this, *paxos, "osdmap")); 
    paxos_service[PAXOS_LOG].reset(new LogMonitor(*this, *paxos, "logm"));  
    paxos_service[PAXOS_AUTH].reset(new AuthMonitor(*this, *paxos, "auth")); 
    paxos_service[PAXOS_MGR].reset(new MgrMonitor(*this, *paxos, "mgr"));  
    paxos_service[PAXOS_MGRSTAT].reset(new MgrStatMonitor(*this, *paxos, "mgrstat")); 
    paxos_service[PAXOS_HEALTH].reset(new HealthMonitor(*this, *paxos, "health")); 
    paxos_service[PAXOS_CONFIG].reset(new ConfigMonitor(*this, *paxos, "config"));
    paxos_service[PAXOS_KV].reset(new KVMonitor(*this, *paxos, "kv"));

Monitor Preinit

-> Monitor::preinit()
    -> Monitor::read_features()
    -> Monitor::init_paxos() // 初始化 paxos
        -> Paxos::init() // 分别从 store 中读取 paxos last_pn, accepted_pn, last_committed, first_committed
        -> 遍历 paxos service 并初始化
        -> Monitor::refresh_from_paxos(NULL)
            -> 获取 fingerprint
            -> 遍历 paxos service 并执行 refresh
            -> 遍历 paxos service 并执行 post_refresh
            -> Monitor::load_metadata()

Monitor Init

-> Monitor::init()
    -> Finisher::start()
    -> timer.init() // 启动 timer
    -> Monitor::new_tick()
        -> timer 中加入回调 Monitor::tick()
    -> messenger->add_dispatcher_tail(this)
    -> monitor 的状态初始化为 probing
    -> Monitor::bootstrap() // 详细见下面分析

monitor tick

-> Monitor::init()
        -> finisher.start()
        -> timer.init()
        -> Monitor::new_tick()
                -> timer 中加入回调: Monitor::tick()
                        -> 如果是 leader 节点
                                -> health check
                        -> 遍历所有的 paxos service
                                -> MonmapMonitor::tick()
                                -> ConfigMonitor::tick()
                                -> MDSMonitor::tick()
                                -> AuthMonitor::tick()
                                -> HealthMonitor::tick()
                                -> MgrMonitor::tick()
                                -> PaxosService::maybe_trim()
                        -> MgrClient::update_daemon_health()
                        -> Monitor::new_tick()


-> Paxos::trigger_propose()


# leader
-> Paxos::begin()

Monitor bootstrap

-> Monitor::bootstrap()
        -> Monitor::wait_for_paxos_write()
        -> MonMap::calc_legacy_ranks() // epoch 为 0 时
        -> 设置 rank,state 为 probing,并重置 monitor
        -> Monitor::reset_probe_timeout()
        -> Monitor::send_mon_message(message, rank) // 向其他 monitor 发送 probe 消息,其中 op 为 MMonProbe::OP_PROPOSE,message  type 为 MSG_MON_PROBE


-> Monitor::_ms_dispatch()
        -> Monitor::dispatch_op()
                -> Monitor::handle_probe()  // MSG_MON_PROBE
                        -> Monitor::handle_probe_probe() // MMonProbe::OP_PROBE
                                -> send_message(MMonProbe(MMonProbe::OP_REPLY)) // 其中 op 为 MMonProbe::OP_REPLY,message  type 为 MSG_MON_PROBE


-> Monitor::_ms_dispatch()
        -> Monitor::dispatch_op()
                -> Monitor::handle_probe()  // MSG_MON_PROBE
                        -> Monitor::handle_probe_reply() // MMonProbe::OP_REPLY
                                -> Monitor::start_election()
                                        -> Monitor::wait_for_paxos_write()
                                        -> Monitor::_reset()
                                        -> call_election()
                                                -> ElectionLogic::start()
                                                        -> ElectionLogic::init()
                                                        -> ElectionLogic::bump_epoch(epoch+1) // 这里会判断 epoch 是否是偶数,如果是偶数会先+1
                                                                -> Elector::notify_bump_epoch()
                                                                        -> Monitor::join_election()
                                                        -> Elector::propose_to_peers(epoch)
                                                                -> Monitor::send_mon_message(message, rank)  // 其中 op 为 MMonElection::OP_PROPOSE,message  type 为 MSG_MON_ELECTION
                                                        -> Elector::_start()


-> Monitor::_ms_dispatch()
        -> Monitor::dispatch_op()
                -> Elector::dispatch(op)  # MSG_MON_ELECTION
                        -> Elector::begin_peer_ping(int peer)
                        -> Elector::handle_propose(op) # MMonElection::OP_PROPOSE
                                -> ElectionLogic::receive_propose(from, epoch)
                                        -> ElectionLogic::propose_classic_handler(from, epoch) # CLASSIC strategy
                                                -> ElectionLogic::propose_classic_prefix(from, epoch)
                                                        -> ElectionLogic::defer(from)                  // 投票给 from
                                                                -> Elector::_defer_to(from)
                                                                        -> Monitor::send_mon_message(message, from) // 其中 op 为 MMonElection::OP_ACK,message  type 为 MSG_MON_ELECTION


-> Monitor::_ms_dispatch()
        -> Monitor::dispatch_op()
                -> Elector::dispatch(op)  // MSG_MON_ELECTION
                        -> Elector::handle_ack(op) // MMonElection::OP_ACK
                                -> ElectionLogic::receive_ack(from, m->epoch)
                                        -> ElectionLogic::declare_victory() // 宣布获得选举胜利
                                                -> ElectionLogic::bump_epoch(epoch+1) // 这里会判断 epoch 是否是奇数,如果是奇数会先+1,代表选举结束
                                                        -> Elector::notify_bump_epoch()
                                                                -> Monitor::join_election()
                                                -> Elector::message_victory(new_quorum)
                                                        -> Elector::cancel_timer()
                                                        -> Monitor::send_mon_message(message, rank) // 向 quorum 中的每个 peer 发送 OP_VICTORY 消息。其中 op 为 MMonElection::OP_VICTORY,message  type 为 MSG_MON_ELECTION
                                                        -> Monitor::win_election(epoch, quorum, ...)
                                                                -> 修改 state,leader, quorum 等等
                                                                -> Paxos::leader_init()
                                                                        -> Paxos::collect(0) // 详细见下方
                                                                -> PaxosService::election_finished() 
                                                                        -> PaxosService::_active() 
                                                                -> Monitor::_finish_svc_election()
                                                                        -> PaxosService::election_finished() // 遍历 paxos service,依次调用
                                                                                -> PaxosService::_active() 
                                                                -> Monitor::finish_election()
                                                                -> Monitor::scrub_event_start()


-> Monitor::_ms_dispatch()
        -> Monitor::dispatch_op()
                -> Elector::dispatch(op)  # MSG_MON_ELECTION
                        -> Elector::handle_victory(op) // peer 收到 leader 发送的 MMonElection::OP_VICTORY 消息
                                -> ElectionLogic::receive_victory_claim(from, epoch)
                                        -> ElectionLogic::victory_makes_sense(from)
                                        -> ElectionLogic::bump_epoch(epoch)
                                ->  Monitor::lose_election(epoch, quorum, ...)
                                        -> 修改 state,leader, quorum 等等
                                        -> Paxos::peon_init()
                                        -> Monitor::_finish_svc_election()
                                                -> 遍历 paxos service,依次调用 election_finished
                                        -> Monitor::finish_election()

# leader 节点
-> Paxos::collect(oldpn)
        -> Paxos::get_new_proposal_number(gt) // 获取新的提案号
        -> Monitor::send_mon_message(message, rank) // 向 quorum 中的每个 peer 发送 OP_COLLECT 消息。其中 op 为 MMonPaxos::OP_COLLECT,message  type 为 MSG_MON_PAXOS
        -> timer 中加入回调: collect_timeout()
                -> Monitor::bootstrap()


# peon 节点
-> Paxos::dispatch(op)
        -> Paxos::handle_collect(op) // MMonPaxos::OP_COLLECT
                -> Paxos::reset_lease_timeout()
                        -> mon.timer.cancel_event(lease_timeout_event)  //  重置 peon lease timeout event 回调
                        -> timer 中加入回调: lease_timeout() // 即超时之后发起新的选举
                                -> Monitor::bootstrap()
                -> 将新的提案号持久化到 store 的 paxos accepted_pn
                -> send_message(MMonPaxos(MMonPaxos::OP_LAST)) // 给对端回复 OP_LAST 消息,其中 op 为 MMonPaxos::OP_LAST,message  type 为 MSG_MON_PAXOS

# leader 节点
-> Paxos::dispatch(op)
        -> Paxos::handle_last(op) // MMonPaxos::OP_LAST
                -> 记录下对端的 first committed 和 last committed
                -> Paxos::store_state(MMonPaxos *m)
                -> Monitor::send_mon_message(message, rank) // 如果对端的 last_committed 比 leader 的小,则发送 MMonPaxos::OP_COMMIT 消息到对端 monitor 。其中 op 为 MMonPaxos::OP_COMMIT,message  type 为 MSG_MON_PAXOS
                -> 统计同意提案号的个数,如果达到 quorum,则:
                        -> mon.timer.cancel_event(collect_timeout_event)
                        -> Paxos::extend_lease()
                                -> Monitor::send_mon_message(message, rank) // 向所有的 peer monitor 广播 MMonPaxos::OP_LEASE 消息 。其中 op 为 MMonPaxos::OP_LEASE,message  type 为 MSG_MON_PAXOS
                                -> timer 中加入回调: lease_ack_timeout()
                                        -> Monitor::bootstrap()
                                -> timer 中加入回调: lease_renew_timeout()
                                        -> Paxos::extend_lease() // 定时 extend_lease

# peon 节点
-> Paxos::dispatch(op)
        -> Paxos::handle_lease(op) // MMonPaxos::OP_LEASE
                -> 更新 lease_expire
                -> 更新 paxos state 为 active
                -> send_message(MMonPaxos(MMonPaxos::OP_LEASE_ACK)) // 给对端回复 OP_LEASE_ACK 消息,其中 op 为 MMonPaxos::OP_LEASE_ACK,message  type 为 MSG_MON_PAXOS
                -> Paxos::reset_lease_timeout()
                        > mon.timer.cancel_event(lease_timeout_event)  //  重置 peon lease timeout event 回调
                        -> timer 中加入回调: lease_timeout() // 即超时之后发起新的选举
                                -> Monitor::bootstrap()

# leader 节点
-> Paxos::dispatch(op)
        -> Paxos::handle_lease_ack(op) // MMonPaxos::OP_LEASE_ACK
                -> 统计是否收到所有 quorum 的 ack,如果全部收到则重置 lease ack timeout event 回调

https://www.xamrdz.com/lan/5gz1994118.html

相关文章: