当前位置: 首页>移动开发>正文

zookeeper 客户端 命令 详细 zookeeper c客户端

  由于我们公司主要使用的C语言客户端,并且由于业务需要和稳定性需要,对于zk服务增加了基于taas(内部认证系统)的认证和quota管理,所以代码修改了握手时候的协议,不过大体还是和原先相同的。

主要数据结构

有zhandle,adaptor_thread和completion_list_t,分别代表zk的一些全局共享信息,线程控制信息和回调watcher信息。各个struct的主要内容如下:




struct zhandle { 
     int fd;             // socket fd
     sockaddr_storage *addrs; // zk server ip地址列表
     watcher_fn watcher; // Global watcher
     buffer_list_t *input_buffer;// 接收缓存
     buffer_head_t to_process; // IO接收数据队列
     buffer_head_t to_send; // IO发送队列 
     completion_head_t sent_request; // 未处理请求队列,已发送而为返回的request队列,包含了回调和watcher。
     completion_head_t completions_to_process; // 正在处理的watcher和回调
     clientid_t;            // session id和zk server返回的passwd,用于恢复session
     buffer_list primer_buffer; // 和服务器的握手包缓存
     buffer_list primer_storage;// 服务器握手包接收缓存
     void* context;         // watcher的参数

     zh_hashtable* active_node_watchers; // 激活的watcher列表
     zh_hashtable* active_exist_watchers;
     zh_hashtable* active_child_watchers;

     common::CredentialGenerator* generator;// 用于认证客户端身份
 }

struct adaptor_threads {
      pthread_t io; // IO线程
      pthread_t completion;    // event现场
      int threadsToWait; // 线程等待barrier
      pthread_cond_t cond; // 条件变量
      pthread_mutex_t lock; // ... and a lock
      pthread_mutex_t zh_lock; // critical section lock
      int self_pipe[2];
 }

typedef struct _completion_list {
     int xid; // 网络序列号
     int completion_type; // 回调类型
     union {        // 回调的函数指针
         void_completion_t void_result;
         stat_completion_t stat_result;
         data_completion_t data_result;
         strings_completion_t strings_result;
         strings_stat_completion_t strings_stat_result;
         acl_completion_t acl_result;
         string_completion_t string_result;
         struct watcher_object_list*watcher_result;
     } c;
     const void *data; // 回调函数的参数
     buffer_list_t *buffer; // watcherEvent的序列化结果,用于event线程执行watcher函数。
     struct _completion_list *next;
     watcher_registration_t* watcher; // wacher函数相关
 } completion_list_t;


线程模型:






zookeeper 客户端 命令 详细 zookeeper c客户端,zookeeper 客户端 命令 详细 zookeeper c客户端_客户端,第1张

zk c客户端每一个zhandle创建两个线程,IO线程(do_io)负责连接zk server以及收发包,competion线程(do_competion)处理异步回调函数和watcher调用。

IO线程发送to_send队列数据,接收服务器返回数据至to_process,并根据收到的回复把sent_request队列中的回调和watcher挪到completion_to_process队列。

用户线程通过客户端接口把请求Request插入to_send队列,把回调和watcher写入sent_request

competion线程调用completion_to_process队列中的回调和watcher。

状态转换:

zookeeper 客户端 命令 详细 zookeeper c客户端,zookeeper 客户端 命令 详细 zookeeper c客户端_zookeeper 客户端 命令 详细_02,第2张

①若从连接状态发送to_send失败返回connect loss错误。若2/3个timeout没有收到任何包,则返回connect timeout。

这两种情况都会执行sent_request(pengding request)的同步和异步调用,并设置error code为相应值。

主要流程:

用户在操作zk znode之前,需要调用zookeeper_init初始化一个zhandle,之后在每个方法调用中传入这个handle。

以get为例说明主要流程:

1.调用zoo_get方法把回调和watcher写入send_request队列。如果是同步则写入一个标记SYNCHRONOUS_MARKER。(add_data_completion)

2.序列化request,加入到to_send队列。

3.通过往pipe的fd写操作唤醒IO线程发送数据。

4.如果是异步则结束,如果是同步等待处理结束。

IO线程步骤:

1.挑选一个ip连接zk server并发送ConnectRequest(握手包,用于创建session),如果已经连接,则在1/3超时时发送ping。(zookeeper_interest)

2.发送to_send队列,接收服务器消息到to_process队列,更新最后收发包时间。(check_events)

3.如果是异步操作和watcher,则把sent_request队列中的回调和watcher移到completion_to_process队列。结束。(zookeeper_process)

4.如果是同步操作根据to_process队列操作唤醒用户线程。

Completion线程

1.若completion_to_process队列为空则等待。

2.调用队列中的回调函数和watcher。

 

zookeeper c客户端bug修复

1.fd泄露的bug

zookeeper.cc的zookeeper_process方法在收到乱序回应时,断掉连接重连时,没有减少zhandle的引用计数,这时zookeeper_close只会停止线程而不会释放zhandle的资源。

只有当引用计数为0时,才回destroy整个handle,造成了fd泄露的问题。




if (cptr->xid!= hdr.xid) { 
     LOG_DEBUG(("Processing unexpected or out-of-order response!"));

     // received unexpected (or out-of-order) response
     close_buffer_iarchive(&ia);
     free_buffer(bptr);
     // put the completion back on the queue (so it gets properly
     // signaled and deallocated) and disconnect from the server
     queue_completion(&zh->sent_requests,cptr,1);
     return api_epilog(zh, handle_socket_error_msg(zh, __LINE__,ZRUNTIMEINCONSISTENCY,
               "unexpected server response: expected %#x, but received %#x",
                hdr.xid,cptr->xid));
  }


2.watcher_object的内存泄露 (zookeeper-c-client-3.4.6 中已修复)

对于同一路径上注册watcher超过两个时,后面的watcher全部内存泄露。

因为activateWatcher时在堆上创建了wacher,而后面add_to_list调用clone了该watcher object,在watcher调用完成时释放的是clone的,所以原始的就泄露了。

3.Log_evn的getpwuid_r方法返回0不一定代表了pw结构体被找到,pw还可能是NULL,core在这里

 

主要缺点:

1.每初始化一个zhandle都会启动两个线程,线程不能共享。

2.c代码难以维护

3.回调串行处理,某个回调时间长容易堵住线程,可以采用线程池。

4.session expire必须由客户端重连到服务端才能发现。

5.重连时不sleep,因此实际应用中发生过DDOS


https://www.xamrdz.com/mobile/4fs1944537.html

相关文章: