参考资料
<<从PAXOS到ZOOKEEPER分布式一致性原理与实践>>
zookeeper-3.0.0
主从关系概述
在概述了主从角色的初始化流程之后,本文主要来梳理分析一下Zookeeper服务端的逻辑关系,包括选举的情况的分析,客户端数据与服务端的数据交互等情况。
Zookeeper主从的运行流程
运行时的主要的主从逻辑关系如图所示。
- 无论是leader还是follower,只有启动之后都会启动一个线程监控接受选举的接口数据。
- 客户端发送过来的数据,涉及到数据的更新修改都会转发到主处理。
- 所有的事务请求完成之后,都会通过主将修改同步到从处理。
选举情况概述
在推荐的集群数量中,我们默认启动的是五台,奇数个服务器数量。
服务器启动期间的选举
此时大家都是一起启动,大致流程如下:
- 每个Server会发出投票。由于是初始情况下,每台Server都会将自己作为leader服务器进行投票,首先都会投自己一票,每次投票的的内容包括自己服务器的id和事物id。
- 接受每个服务器的投票。因为每台服务器启动的时候都会启动接受其他服务器的投票,会检查该投票是否有效,并保存投票信息。
- 统计当前接受到的服务器的投票。针对接受到的所有投票进行统计,统计规则就是优先检查事务ID,ZXID最大的作为leader,如果zxid相同则比较服务器编号的myid,哪个最大作为leader服务器。在统计完成之后就计算优胜的服务器的票数是否超过半数,如果超过半数则修改服务器状态设置。
- 修改服务器状态。因为每个节点统计得胜的票数的方式是一样的,假如事务ID都为0的情况下,肯定是myid最大的服务器获胜,这在每台服务器得出的结论都是一样的,然后再检查自己是否是myid这个机器,如果不是改为follower如果是则修改状态为leader。
如上就是初次选举的执行流程。
服务器运行期间选举
假如在运行期间,leader服务器挂了,此刻follower连接leader失败就会推出follower中的循环接受数据,此时就会修改状态为LOOKING。
- 变更状态。在leader挂掉后,此时状态会被重置到LOOKING状态,此时就会进入选举流程。
- 每个Server发送投票。因为在运行过程中,此时的事务ID有可能不同,但是投票的方式还是一样,第一票都会投给自己,然后将自己的票发送给其他机器。
- 接受投票并处理投票。此时接受的投票跟初始化启动时,逻辑一样,只不过这次就是选择事务最大的ID进行投票。
- 修改状态。
此时基本的流程如上所示。如果在运行期间新加入一台机器的话,在发送选举请求的时候也会计算出当前的主的myid和主的事务id。
客户端与服务端数据交互
一旦确定了角色之后,此时客户端连接连接的数据处理流程分为连接leader和连接follower。
在前面的文章中,简单的描述了一下监听客户端连接服务器的基本内容,就是NIOServerCnxn里面的run方法。
public void run() {
while (!ss.socket().isClosed()) { // 检查连接是否关闭
try {
selector.select(1000); // IO复用
Set<SelectionKey> selected;
synchronized (this) {
selected = selector.selectedKeys(); // 加锁 获取 当前的触发事件描述符
}
ArrayList<SelectionKey> selectedList = new ArrayList<SelectionKey>(
selected);
Collections.shuffle(selectedList);
for (SelectionKey k : selectedList) { // 遍历 该列表
if ((k.readyOps() & SelectionKey.OP_ACCEPT) != 0) { // 如果是新的请求进来
SocketChannel sc = ((ServerSocketChannel) k
.channel()).accept(); // 接受新连接
sc.configureBlocking(false); // 设置非阻塞
SelectionKey sk = sc.register(selector,
SelectionKey.OP_READ); // 注册读事件
NIOServerCnxn cnxn = createConnection(sc, sk); // 初始化一个NIOServerCnxn类
sk.attach(cnxn); // 添加到列表中
addCnxn(cnxn);
} else if ((k.readyOps() & (SelectionKey.OP_READ | SelectionKey.OP_WRITE)) != 0) { // 如果是读事件或者写事件则获取触发内容
NIOServerCnxn c = (NIOServerCnxn) k.attachment();
c.doIO(k); // 回调执行处理该事件
}
}
selected.clear(); // 清空
} catch (Exception e) {
LOG.error("FIXMSG",e); // 如果报错则打印错误日志
}
}
ZooTrace.logTraceMessage(LOG, ZooTrace.getTextTraceLevel(),
"NIOServerCnxn factory exitedloop.");
clear();
LOG.error("=====> Goodbye cruel world <======");
// System.exit(0);
}
对于每个客户端的请求进来都生成一个NIOServerCnxn,并保存处理。当监听的事件出现时就调用doIO方法来处理。
void doIO(SelectionKey k) throws InterruptedException {
try {
if (sock == null) { // 如果sock为空则返回
return;
}
if (k.isReadable()) { // 如果是可读
int rc = sock.read(incomingBuffer); // 读取数据
if (rc < 0) {
throw new IOException("Read error"); // 如果读取长度小于0则报错
}
if (incomingBuffer.remaining() == 0) { // 检查是否为空
incomingBuffer.flip();
if (incomingBuffer == lenBuffer) {
readLength(k); // 读取数据
} else if (!initialized) { // 检查是否初始化完成
stats.packetsReceived++;
ServerStats.getInstance().incrementPacketsReceived();
readConnectRequest(); // 如果没初始化完成则检查
lenBuffer.clear();
incomingBuffer = lenBuffer;
} else {
stats.packetsReceived++;
ServerStats.getInstance().incrementPacketsReceived();
readRequest(); // 如果初始化完成则处理请求
lenBuffer.clear();
incomingBuffer = lenBuffer;
}
}
}
if (k.isWritable()) { // 是否是写入数据
// ZooLog.logTraceMessage(LOG,
// ZooLog.CLIENT_DATA_PACKET_TRACE_MASK
// "outgoingBuffers.size() = " +
// outgoingBuffers.size());
if (outgoingBuffers.size() > 0) { // 检查是否有可发送的数据
// ZooLog.logTraceMessage(LOG,
// ZooLog.CLIENT_DATA_PACKET_TRACE_MASK,
// "sk " + k + " is valid: " +
// k.isValid());
/*
* This is going to reset the buffer position to 0 and the
* limit to the size of the buffer, so that we can fill it
* with data from the non-direct buffers that we need to
* send.
*/
ByteBuffer directBuffer = factory.directBuffer;
directBuffer.clear();
for (ByteBuffer b : outgoingBuffers) {
if (directBuffer.remaining() < b.remaining()) {
/*
* When we call put later, if the directBuffer is to
* small to hold everything, nothing will be copied,
* so we've got to slice the buffer if it's too big.
*/
b = (ByteBuffer) b.slice().limit(
directBuffer.remaining());
}
/*
* put() is going to modify the positions of both
* buffers, put we don't want to change the position of
* the source buffers (we'll do that after the send, if
* needed), so we save and reset the position after the
* copy
*/
int p = b.position();
directBuffer.put(b);
b.position(p);
if (directBuffer.remaining() == 0) {
break;
}
}
/*
* Do the flip: limit becomes position, position gets set to
* 0. This sets us up for the write.
*/
directBuffer.flip();
int sent = sock.write(directBuffer); // 写数据
ByteBuffer bb;
// Remove the buffers that we have sent
while (outgoingBuffers.size() > 0) {
bb = outgoingBuffers.peek();
if (bb == closeConn) {
throw new IOException("closing");
}
int left = bb.remaining() - sent;
if (left > 0) {
/*
* We only partially sent this buffer, so we update
* the position and exit the loop.
*/
bb.position(bb.position() + sent);
break;
}
stats.packetsSent++;
/* We've sent the whole buffer, so drop the buffer */
sent -= bb.remaining();
ServerStats.getInstance().incrementPacketsSent();
outgoingBuffers.remove();
}
// ZooLog.logTraceMessage(LOG,
// ZooLog.CLIENT_DATA_PACKET_TRACE_MASK, "after send,
// outgoingBuffers.size() = " + outgoingBuffers.size());
}
synchronized (this) {
if (outgoingBuffers.size() == 0) {
if (!initialized
&& (sk.interestOps() & SelectionKey.OP_READ) == 0) {
throw new IOException("Responded to info probe");
}
sk.interestOps(sk.interestOps()
& (~SelectionKey.OP_WRITE)); // 重重新注册除了写事件
} else {
sk.interestOps(sk.interestOps()
| SelectionKey.OP_WRITE); // 追加写事件
}
}
}
} catch (CancelledKeyException e) {
close();
} catch (IOException e) {
// LOG.error("FIXMSG",e);
close();
}
}
该函数的核心逻辑就是readRequest处理请求函数和发送数据出去,当数据发送完成之后,就重新注册事件到列表中,无论是读事件还是写事件。
private void readRequest() throws IOException {
// We have the request, now process and setup for next
InputStream bais = new ByteBufferInputStream(incomingBuffer); // 初始化数据
BinaryInputArchive bia = BinaryInputArchive.getArchive(bais);
RequestHeader h = new RequestHeader();
h.deserialize(bia, "header"); // 解析头部数据
// Through the magic of byte buffers, txn will not be
// pointing
// to the start of the txn
incomingBuffer = incomingBuffer.slice();
if (h.getType() == OpCode.auth) { // 如果是认证信息 则信息认证的处理
AuthPacket authPacket = new AuthPacket();
ZooKeeperServer.byteBuffer2Record(incomingBuffer, authPacket);
String scheme = authPacket.getScheme();
AuthenticationProvider ap = ProviderRegistry.getProvider(scheme);
if (ap == null
|| ap.handleAuthentication(this, authPacket.getAuth()) != KeeperException.Code.Ok) {
if (ap == null)
LOG.error("No authentication provider for scheme: "
+ scheme + " has " + ProviderRegistry.listProviders());
else
LOG.debug("Authentication failed for scheme: "
+ scheme);
// send a response...
ReplyHeader rh = new ReplyHeader(h.getXid(), 0,
KeeperException.Code.AuthFailed);
sendResponse(rh, null, null);
// ... and close connection
sendBuffer(NIOServerCnxn.closeConn);
disableRecv();
} else {
LOG.debug("Authentication succeeded for scheme: "
+ scheme);
ReplyHeader rh = new ReplyHeader(h.getXid(), 0,
KeeperException.Code.Ok);
sendResponse(rh, null, null);
}
return;
} else {
zk.submitRequest(this, sessionId, h.getType(), h.getXid(),
incomingBuffer, authInfo); // 如果不是认证则进入提交请求出去
}
if (h.getXid() >= 0) { // 获取当前的ID
synchronized (this) {
outstandingRequests++;
// check throttling
if (zk.getInProcess() > factory.outstandingLimit) { // 如果超过阈值则停止接受
LOG.warn("Throttling recv " + zk.getInProcess());
disableRecv();
// following lines should not be needed since we are already
// reading
// } else {
// enableRecv();
}
}
}
}
其中最核心的就是zk.submitRequest(this, sessionId, h.getType(), h.getXid(), incomingBuffer, authInfo)该函数的处理过程。此时的zk就分为LeaderZooKeeperServer(客户端连接leader)或是FollowerZooKeeperServer(客户端连接follower),因为这两个类都继承自ZooKeeperServer,都调用了submitRequest方法。
public void submitRequest(ServerCnxn cnxn, long sessionId, int type,
int xid, ByteBuffer bb, List<Id> authInfo) {
Request si = new Request(cnxn, sessionId, xid, type, bb, authInfo); // 生成一个request实例
submitRequest(si); // 处理该request
}
public void submitRequest(Request si) {
if (firstProcessor == null) {
synchronized (this) {
try {
while (!running) { // 检查是否在运行
wait(1000); // 如果没有则等待
}
} catch (InterruptedException e) {
LOG.error("FIXMSG",e);
}
if (firstProcessor == null) {
throw new RuntimeException("Not started");
}
}
}
try {
touch(si.cnxn); // 检查当前session是否过期,根据不同的类都会重写该函数
boolean validpacket = Request.isValid(si.type); // 检查类型是否合法
if (validpacket) {
firstProcessor.processRequest(si); // 通过注册调用链来处理请求
if (si.cnxn != null) {
incInProcess();
}
} else {
LOG.warn("Dropping packet at server of type " + si.type);
// if unvalid packet drop the packet.
}
} catch (IOException e) {
LOG.error("FIXMSG",e);
}
}
至此可以知道了调用了zk里面的firstProcessor类的处理流程。接下来我们就分不同的流程分析。
客户端直连leader
LeaderZooKeeperServer函数的处理,该类在setupRequestProcessors时的过程如下。
@Override
protected void setupRequestProcessors() {
RequestProcessor finalProcessor = new FinalRequestProcessor(this); // 向客户端回复处理情况
RequestProcessor toBeAppliedProcessor = new Leader.ToBeAppliedRequestProcessor(
finalProcessor, getLeader().toBeApplied); // 回复事务
commitProcessor = new CommitProcessor(toBeAppliedProcessor,
Integer.toString(getClientPort()), false); // 提交事务
RequestProcessor proposalProcessor = new ProposalRequestProcessor(this,
commitProcessor); // 请求事务
firstProcessor = new PrepRequestProcessor(this, proposalProcessor); // 注册处理的初始流程
}
PrepRequestProcessor类
首先查看一下PrepRequestProcessor类的初始化的流程。
public PrepRequestProcessor(ZooKeeperServer zks,
RequestProcessor nextProcessor) {
super("ProcessThread:" + zks.getClientPort());
this.nextProcessor = nextProcessor; // 设置调用链的下一个
this.zks = zks; // 设置zks
start(); // 开启线程run方法执行
}
该函数的执行方法;
public void processRequest(Request request) {
// request.addRQRec(">prep="+zks.outstandingChanges.size());
submittedRequests.add(request); // 添加到运行的线程中去运行
}
处理请求只是将request添加到submittedRequests列表中,然后会传入到该类的线程执行中去处理。即该类的run函数中执行。
@Override
public void run() {
try {
while (true) {
Request request = submittedRequests.take(); // 从队列中获取数据
long traceMask = ZooTrace.CLIENT_REQUEST_TRACE_MASK;
if (request.type == OpCode.ping) {
traceMask = ZooTrace.CLIENT_PING_TRACE_MASK;
}
ZooTrace.logRequest(LOG, traceMask, 'P', request, ""); // 记录到日志
if (Request.requestOfDeath == request) { // 如果终止的请求是这个则停止
break;
}
pRequest(request); // 调用pRequest处理
}
} catch (InterruptedException e) {
LOG.error("FIXMSG",e);
}
ZooTrace.logTraceMessage(LOG, ZooTrace.getTextTraceLevel(),
"PrepRequestProcessor exited loop!");
}
@SuppressWarnings("unchecked")
protected void pRequest(Request request) {
// LOG.info("Prep>>> cxid = " + request.cxid + " type = " +
// request.type + " id = 0x" + Long.toHexString(request.sessionId));
TxnHeader txnHeader = null;
Record txn = null;
try {
switch (request.type) {
case OpCode.create: // 如果是创建则创建数据
txnHeader = new TxnHeader(request.sessionId, request.cxid, zks
.getNextZxid(), zks.getTime(), OpCode.create);
zks.sessionTracker.checkSession(request.sessionId);
CreateRequest createRequest = new CreateRequest();
ZooKeeperServer.byteBuffer2Record(request.request,
createRequest);
String path = createRequest.getPath();
int lastSlash = path.lastIndexOf('/');
if (lastSlash == -1 || path.indexOf('proposalProcessor类
') != -1) {
throw new KeeperException.BadArgumentsException();
}
if (!fixupACL(request.authInfo, createRequest.getAcl())) {
throw new KeeperException.InvalidACLException();
}
String parentPath = path.substring(0, lastSlash);
ChangeRecord parentRecord = getRecordForPath(parentPath);
checkACL(zks, parentRecord.acl, ZooDefs.Perms.CREATE,
request.authInfo);
int parentCVersion = parentRecord.stat.getCversion();
CreateMode createMode = CreateMode.fromFlag(createRequest.getFlags());
if (createMode.isSequential()) {
path = path + String.format("%010d", parentCVersion);
}
try {
if (getRecordForPath(path) != null) {
throw new KeeperException.NodeExistsException();
}
} catch (KeeperException.NoNodeException e) {
// ignore this one
}
boolean ephemeralParent = parentRecord.stat.getEphemeralOwner() != 0;
if (ephemeralParent) {
throw new KeeperException.NoChildrenForEphemeralsException();
}
txn = new CreateTxn(path, createRequest.getData(),
createRequest.getAcl(),
createMode.isEphemeral());
StatPersisted s = new StatPersisted();
if (createMode.isEphemeral()) {
s.setEphemeralOwner(request.sessionId);
}
parentRecord = parentRecord.duplicate(txnHeader.getZxid());
parentRecord.childCount++;
parentRecord.stat
.setCversion(parentRecord.stat.getCversion() + 1);
addChangeRecord(parentRecord);
addChangeRecord(new ChangeRecord(txnHeader.getZxid(), path, s,
0, createRequest.getAcl()));
break;
case OpCode.delete: // 如果是删除
txnHeader = new TxnHeader(request.sessionId, request.cxid, zks
.getNextZxid(), zks.getTime(), OpCode.delete);
zks.sessionTracker.checkSession(request.sessionId);
DeleteRequest deleteRequest = new DeleteRequest();
ZooKeeperServer.byteBuffer2Record(request.request,
deleteRequest);
path = deleteRequest.getPath();
lastSlash = path.lastIndexOf('/');
if (lastSlash == -1 || path.indexOf('public void processRequest(Request request) {
// LOG.warn("Ack>>> cxid = " + request.cxid + " type = " +
// request.type + " id = " + request.sessionId);
// request.addRQRec(">prop");
/* In the following IF-THEN-ELSE block, we process syncs on the leader.
* If the sync is coming from a follower, then the follower
* handler adds it to syncHandler. Otherwise, if it is a client of
* the leader that issued the sync command, then syncHandler won't
* contain the handler. In this case, we add it to syncHandler, and
* call processRequest on the next processor.
*/
if(request instanceof FollowerSyncRequest){ // 如果是同步数据的请求
zks.getLeader().processSync((FollowerSyncRequest)request); // 直接调用leader的同步数据的处理函数
} else {
nextProcessor.processRequest(request); // 如果不是则先调用下一个处理函数
if (request.hdr != null) {
// We need to sync and get consensus on any transactions
zks.getLeader().propose(request); // 如果是事务 则提交搞事务
syncProcessor.processRequest(request); // 同步该请求
}
}
}
') != -1
|| path.equals("/")) {
throw new KeeperException.BadArgumentsException();
}
parentPath = path.substring(0, lastSlash);
parentRecord = getRecordForPath(parentPath);
ChangeRecord nodeRecord = getRecordForPath(path);
checkACL(zks, parentRecord.acl, ZooDefs.Perms.DELETE,
request.authInfo);
int version = deleteRequest.getVersion();
if (version != -1 && nodeRecord.stat.getVersion() != version) {
throw new KeeperException.BadVersionException();
}
if (nodeRecord.childCount > 0) {
throw new KeeperException.NotEmptyException();
}
txn = new DeleteTxn(path);
parentRecord = parentRecord.duplicate(txnHeader.getZxid());
parentRecord.childCount--;
parentRecord.stat
.setCversion(parentRecord.stat.getCversion() + 1);
addChangeRecord(parentRecord);
addChangeRecord(new ChangeRecord(txnHeader.getZxid(), path,
null, -1, null));
break;
case OpCode.setData: // 如果是设置
txnHeader = new TxnHeader(request.sessionId, request.cxid, zks
.getNextZxid(), zks.getTime(), OpCode.setData);
zks.sessionTracker.checkSession(request.sessionId);
SetDataRequest setDataRequest = new SetDataRequest();
ZooKeeperServer.byteBuffer2Record(request.request,
setDataRequest);
path = setDataRequest.getPath();
nodeRecord = getRecordForPath(path);
checkACL(zks, nodeRecord.acl, ZooDefs.Perms.WRITE,
request.authInfo);
version = setDataRequest.getVersion();
int currentVersion = nodeRecord.stat.getVersion();
if (version != -1 && version != currentVersion) {
throw new KeeperException.BadVersionException();
}
version = currentVersion + 1;
txn = new SetDataTxn(path, setDataRequest.getData(), version);
nodeRecord = nodeRecord.duplicate(txnHeader.getZxid());
nodeRecord.stat.setVersion(version);
addChangeRecord(nodeRecord);
break;
case OpCode.setACL: // 如果是设置权限
txnHeader = new TxnHeader(request.sessionId, request.cxid, zks
.getNextZxid(), zks.getTime(), OpCode.setACL);
zks.sessionTracker.checkSession(request.sessionId);
SetACLRequest setAclRequest = new SetACLRequest();
ZooKeeperServer.byteBuffer2Record(request.request,
setAclRequest);
if (!fixupACL(request.authInfo, setAclRequest.getAcl())) {
throw new KeeperException.InvalidACLException();
}
path = setAclRequest.getPath();
nodeRecord = getRecordForPath(path);
checkACL(zks, nodeRecord.acl, ZooDefs.Perms.ADMIN,
request.authInfo);
version = setAclRequest.getVersion();
currentVersion = nodeRecord.stat.getAversion();
if (version != -1 && version != currentVersion) {
throw new KeeperException.BadVersionException();
}
version = currentVersion + 1;
txn = new SetACLTxn(path, setAclRequest.getAcl(), version);
nodeRecord = nodeRecord.duplicate(txnHeader.getZxid());
nodeRecord.stat.setAversion(version);
addChangeRecord(nodeRecord);
break;
case OpCode.createSession: // 如果是创建会话
txnHeader = new TxnHeader(request.sessionId, request.cxid, zks
.getNextZxid(), zks.getTime(), OpCode.createSession);
request.request.rewind();
int to = request.request.getInt();
txn = new CreateSessionTxn(to);
request.request.rewind();
zks.sessionTracker.addSession(request.sessionId, to);
break;
case OpCode.closeSession: // 如果是关闭会话
txnHeader = new TxnHeader(request.sessionId, request.cxid, zks
.getNextZxid(), zks.getTime(), OpCode.closeSession);
HashSet<String> es = zks.dataTree
.getEphemerals(request.sessionId);
synchronized (zks.outstandingChanges) {
for (ChangeRecord c : zks.outstandingChanges) {
if (c.stat == null) {
// Doing a delete
es.remove(c.path);
} else if (c.stat.getEphemeralOwner() == request.sessionId) {
es.add(c.path);
}
}
for (String path2Delete : es) {
addChangeRecord(new ChangeRecord(txnHeader.getZxid(),
path2Delete, null, 0, null));
}
}
LOG.info("Processed session termination request for id: 0x"
+ Long.toHexString(request.sessionId));
break;
case OpCode.sync:
case OpCode.exists:
case OpCode.getData:
case OpCode.getACL:
case OpCode.getChildren:
case OpCode.ping:
case OpCode.setWatches:
break;
}
} catch (KeeperException e) {
if (txnHeader != null) {
txnHeader.setType(OpCode.error);
txn = new ErrorTxn(e.getCode());
}
} catch (Exception e) {
LOG.error("*********************************" + request);
StringBuffer sb = new StringBuffer();
ByteBuffer bb = request.request;
if(bb!=null){
bb.rewind();
while (bb.hasRemaining()) {
sb.append(Integer.toHexString(bb.get() & 0xff));
}
}else
sb.append("request buffer is null");
LOG.error(sb.toString());
LOG.error("Unexpected exception", e);
if (txnHeader != null) {
txnHeader.setType(OpCode.error);
txn = new ErrorTxn(Code.MarshallingError);
}
}
request.hdr = txnHeader; // 设置处理信息
request.txn = txn;
request.zxid = zks.getZxid(); // 获取新的事物ID
nextProcessor.processRequest(request); // 调用下一个类处理该数据
}
从流程中可以看出,相关的创建操作都集中在了该类中处理,处理完成之后就调用下一个类去处理reqeust,该下一个类是proposalProcessor类。
public Proposal propose(Request request) {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos);
try {
request.hdr.serialize(boa, "hdr"); // 解析头部
if (request.txn != null) {
request.txn.serialize(boa, "txn");
}
baos.close();
} catch (IOException e) {
LOG.warn("This really should be impossible", e);
}
QuorumPacket pp = new QuorumPacket(Leader.PROPOSAL, request.zxid, baos
.toByteArray(), null); // 生成事务包
Proposal p = new Proposal();
p.packet = pp;
p.request = request; // 设置数据
synchronized (this) {
if (LOG.isDebugEnabled()) {
LOG.debug("Proposing:: " + request);
}
outstandingProposals.add(p); // 将该事务添加到队列中 在后一步做确认
lastProposed = p.packet.getZxid();
sendPacket(pp); // 将该事务发送出去
}
return p;
}
CommitProcessor类
如果需要事务请求则通过propose将数据发送到follower中。
synchronized public void processRequest(Request request) {
// request.addRQRec(">commit");
if (LOG.isDebugEnabled()) { // 是否打印调试信息
LOG.debug("Processing request:: " + request);
}
if (!finished) { // 如果没有结束则将该请求添加到队列中
queuedRequests.add(request);
notifyAll();
}
}
@Override
public void run() {
try {
Request nextPending = null;
ArrayList<Request> toProcess = new ArrayList<Request>();
while (!finished) { // 判断是否结束
int len = toProcess.size(); // 检查是否为空
for (int i = 0; i < len; i++) {
nextProcessor.processRequest(toProcess.get(i)); // 如果不为空则传递处理该请求
}
toProcess.clear();
synchronized (this) {
if ((queuedRequests.size() == 0 || nextPending != null) // 检查是否为空 或者 是否还有需要处理的数据
&& committedRequests.size() == 0) {
wait();
continue; // 等待并继续
}
// First check and see if the commit came in for the pending
// request
if ((queuedRequests.size() == 0 || nextPending != null)
&& committedRequests.size() > 0) { // 检查是否队列为空但是 提交列表不为空
Request r = committedRequests.remove(); // 获取该事务
/*
* We match with nextPending so that we can move to the
* next request when it is committed. We also want to
* use nextPending because it has the cnxn member set
* properly.
*/
if (nextPending != null
&& nextPending.sessionId == r.sessionId
&& nextPending.cxid == r.cxid) {
// we want to send our version of the request.
// the pointer to the connection in the request
nextPending.hdr = r.hdr;
nextPending.txn = r.txn;
nextPending.zxid = r.zxid; // 设置nextPending
toProcess.add(nextPending); // 添加到该请求中
nextPending = null;
} else {
// this request came from someone else so just
// send the commit packet
toProcess.add(r); // 添加到队列中
}
}
}
// We haven't matched the pending requests, so go back to
// waiting
if (nextPending != null) {
continue;
}
synchronized (this) {
// Process the next requests in the queuedRequests
while (nextPending == null && queuedRequests.size() > 0) { // 检查queuedRequests是否为空
Request request = queuedRequests.remove(); // 获取一个
switch (request.type) {
case OpCode.create:
case OpCode.delete:
case OpCode.setData:
case OpCode.setACL:
case OpCode.createSession:
case OpCode.closeSession:
nextPending = request;
break;
case OpCode.sync:
if (matchSyncs) {
nextPending = request; // 设置nextPending
} else {
toProcess.add(request);
}
break;
default:
toProcess.add(request); // 添加到处理队列中
}
}
}
}
} catch (Exception e) {
LOG.error("FIXMSG",e);
}
ZooTrace.logTraceMessage(LOG, ZooTrace.getTextTraceLevel(),
"CommitProcessor exited loop!");
}
然后再检查是否需要做快照保存数据。接下来我们分析一下下一个调用类commitProcessor
toBeAppliedProcessor类
public void processRequest(Request request) {
// request.addRQRec(">tobe");
next.processRequest(request); // 先处理该请求
Proposal p = toBeApplied.peek(); // 从队列中获取数据
if (p != null && p.request != null
&& p.request.zxid == request.zxid) { // 如果事务不为空 事务id相同 则移除该事务表示该事务已经处理完成
toBeApplied.remove();
}
}
此时主要就是调用下一个去处理请求。下一个类就是toBeAppliedProcessor
FinalRequestProcessor类
public void processRequest(Request request) {
if (LOG.isDebugEnabled()) { // 检查是否需要打印调试信息
LOG.debug("Processing request:: " + request);
}
// request.addRQRec(">final");
long traceMask = ZooTrace.CLIENT_REQUEST_TRACE_MASK;
if (request.type == OpCode.ping) {
traceMask = ZooTrace.SERVER_PING_TRACE_MASK;
}
ZooTrace.logRequest(LOG, traceMask, 'E', request, ""); // 打印日志
ProcessTxnResult rc = null;
synchronized (zks.outstandingChanges) { // 检查该队列是否为空
while (!zks.outstandingChanges.isEmpty()
&& zks.outstandingChanges.get(0).zxid <= request.zxid) { // 从该队列中获取事务id
if (zks.outstandingChanges.get(0).zxid < request.zxid) {
LOG.warn("Zxid outstanding "
+ zks.outstandingChanges.get(0).zxid
+ " is less than current " + request.zxid);
}
zks.outstandingChanges.remove(0); // 移除比当前事务id小的事务
}
if (request.hdr != null) {
rc = zks.dataTree.processTxn(request.hdr, request.txn); // 设置头部信息
if (request.type == OpCode.createSession) { // 检查是否是创建会话
if (request.txn instanceof CreateSessionTxn) {
CreateSessionTxn cst = (CreateSessionTxn) request.txn;
zks.sessionTracker.addSession(request.sessionId, cst
.getTimeOut()); // 设置会话
} else {
LOG.warn("*****>>>>> Got "
+ request.txn.getClass() + " "
+ request.txn.toString());
}
} else if (request.type == OpCode.closeSession) {
zks.sessionTracker.removeSession(request.sessionId); // 如果是关闭会话则移除该会话
}
}
// do not add non quorum packets to the queue.
if (Request.isQuorum(request.type)) {
zks.addCommittedProposal(request);
}
}
if (request.hdr != null && request.hdr.getType() == OpCode.closeSession) {
Factory scxn = zks.getServerCnxnFactory();
// this might be possible since
// we might just be playing diffs from the leader
if (scxn != null) {
scxn.closeSession(request.sessionId);
}
}
if (request.cnxn == null) {
return;
}
zks.decInProcess();
int err = 0;
Record rsp = null;
try {
if (request.hdr != null && request.hdr.getType() == OpCode.error) { // 检查请求类型是否报错 如果报错则抛出错误
throw KeeperException.create(((ErrorTxn) request.txn).getErr());
}
if (LOG.isDebugEnabled()) { // 是否调试
LOG.debug(request);
}
switch (request.type) {
case OpCode.ping:
request.cnxn.sendResponse(new ReplyHeader(-2,
zks.dataTree.lastProcessedZxid, 0), null, "response"); // 如果是ping 则返送最后一个事务id做返回
return;
case OpCode.createSession:
request.cnxn.finishSessionInit(true); // 如果是创建会话则初始化
return;
case OpCode.create:
rsp = new CreateResponse(rc.path); // 如果是创建则创建数据
err = rc.err;
break;
case OpCode.delete:
err = rc.err;
break;
case OpCode.setData:
rsp = new SetDataResponse(rc.stat); // 如果是设置数据则设置
err = rc.err;
break;
case OpCode.setACL:
rsp = new SetACLResponse(rc.stat); // 如果是设置访问权限
err = rc.err;
break;
case OpCode.closeSession:
err = rc.err;
break;
case OpCode.sync:
SyncRequest syncRequest = new SyncRequest(); // 同步数据请求
ZooKeeperServer.byteBuffer2Record(request.request,
syncRequest);
rsp = new SyncResponse(syncRequest.getPath()); // 返回一个请求
break;
case OpCode.exists:
// TODO we need to figure out the security requirement for this!
ExistsRequest existsRequest = new ExistsRequest(); // 检查是否存在
ZooKeeperServer.byteBuffer2Record(request.request,
existsRequest);
String path = existsRequest.getPath();
if (path.indexOf('synchronized public void sendResponse(ReplyHeader h, Record r, String tag) {
if (closed) {
return;
}
ByteArrayOutputStream baos = new ByteArrayOutputStream();
// Make space for length
BinaryOutputArchive bos = BinaryOutputArchive.getArchive(baos);
try {
baos.write(fourBytes); // 写入到缓冲区
bos.writeRecord(h, "header");
if (r != null) {
bos.writeRecord(r, tag);
}
baos.close();
} catch (IOException e) {
LOG.error("Error serializing response");
}
byte b[] = baos.toByteArray();
ByteBuffer bb = ByteBuffer.wrap(b);
bb.putInt(b.length - 4).rewind();
sendBuffer(bb); // 发送数据
if (h.getXid() > 0) {
synchronized (this.factory) {
outstandingRequests--;
// check throttling
if (zk.getInProcess() < factory.outstandingLimit
|| outstandingRequests < 1) {
sk.selector().wakeup();
enableRecv();
}
}
}
}
') != -1) {
throw new KeeperException.BadArgumentsException();
}
Stat stat = zks.dataTree.statNode(path, existsRequest
.getWatch() ? request.cnxn : null);
rsp = new ExistsResponse(stat);
break;
case OpCode.getData:
GetDataRequest getDataRequest = new GetDataRequest(); // 查询数据
ZooKeeperServer.byteBuffer2Record(request.request,
getDataRequest);
DataNode n = zks.dataTree.getNode(getDataRequest.getPath());
if (n == null) {
throw new KeeperException.NoNodeException();
}
PrepRequestProcessor.checkACL(zks, zks.dataTree.convertLong(n.acl),
ZooDefs.Perms.READ,
request.authInfo);
stat = new Stat();
byte b[] = zks.dataTree.getData(getDataRequest.getPath(), stat,
getDataRequest.getWatch() ? request.cnxn : null);
rsp = new GetDataResponse(b, stat);
break;
case OpCode.setWatches:
SetWatches setWatches = new SetWatches();
// XXX We really should NOT need this!!!!
request.request.rewind();
ZooKeeperServer.byteBuffer2Record(request.request, setWatches);
long relativeZxid = setWatches.getRelativeZxid();
zks.dataTree.setWatches(relativeZxid,
setWatches.getDataWatches(),
setWatches.getExistWatches(),
setWatches.getChildWatches(), request.cnxn);
break;
case OpCode.getACL:
GetACLRequest getACLRequest = new GetACLRequest(); // 获取访问权限
ZooKeeperServer.byteBuffer2Record(request.request,
getACLRequest);
stat = new Stat();
List<ACL> acl =
zks.dataTree.getACL(getACLRequest.getPath(), stat);
rsp = new GetACLResponse(acl, stat);
break;
case OpCode.getChildren: // 获取节点下的路径
GetChildrenRequest getChildrenRequest = new GetChildrenRequest();
ZooKeeperServer.byteBuffer2Record(request.request,
getChildrenRequest);
stat = new Stat();
n = zks.dataTree.getNode(getChildrenRequest.getPath());
if (n == null) {
throw new KeeperException.NoNodeException();
}
PrepRequestProcessor.checkACL(zks, zks.dataTree.convertLong(n.acl),
ZooDefs.Perms.READ,
request.authInfo);
List<String> children = zks.dataTree.getChildren(
getChildrenRequest.getPath(), stat, getChildrenRequest
.getWatch() ? request.cnxn : null);
rsp = new GetChildrenResponse(children);
break;
}
} catch (KeeperException e) {
err = e.getCode();
} catch (Exception e) {
LOG.warn("****************************** " + request);
StringBuffer sb = new StringBuffer();
ByteBuffer bb = request.request;
bb.rewind();
while (bb.hasRemaining()) {
sb.append(Integer.toHexString(bb.get() & 0xff));
}
LOG.warn(sb.toString());
LOG.error("FIXMSG",e);
err = Code.MarshallingError;
}
ReplyHeader hdr = new ReplyHeader(request.cxid, request.zxid, err); // 生成响应投
ServerStats.getInstance().updateLatency(request.createTime);
try {
request.cnxn.sendResponse(hdr, rsp, "response"); // 返回响应数据
} catch (IOException e) {
LOG.error("FIXMSG",e);
}
}
主要处理流程如上。接下来调用的就是最后一个FinalRequestProcessor类。
客户端连接follower
@Override
protected void setupRequestProcessors() {
RequestProcessor finalProcessor = new FinalRequestProcessor(this); // 最终发送数据
commitProcessor = new CommitProcessor(finalProcessor,
Integer.toString(getClientPort()), true); // 处理提交事务
firstProcessor = new FollowerRequestProcessor(this, commitProcessor); // 第一个处理数据
syncProcessor = new SyncRequestProcessor(this,
new SendAckRequestProcessor(getFollower())); // 同步数据请求
}
从该类的流程可知,该类就是最终将数据处理完成后发送出去的类,最终通过request.cnxn.sendResponse方法将处理完成的数据发送出去。
FollowerRequestProcessor类
至此,大致的服务端数据处理流程基本完成,里面涉及到的事务相关的提交分发都是通过调用链的设计模式来进行处理的。
@Override
public void run() {
try {
while (!finished) {
Request request = queuedRequests.take(); // 获取队列中的数据
ZooTrace.logRequest(LOG, ZooTrace.CLIENT_REQUEST_TRACE_MASK,
'F', request, "");
if (request == Request.requestOfDeath) { // 检查是否是停止
break;
}
// We want to queue the request to be processed before we submit
// the request to the leader so that we are ready to receive
// the response
nextProcessor.processRequest(request); // 调用下一个去处理
// We now ship the request to the leader. As with all
// other quorum operations, sync also follows this code
// path, but different from others, we need to keep track
// of the sync operations this follower has pending, so we
// add it to pendingSyncs.
switch (request.type) { // 判断类型
case OpCode.sync:
zks.pendingSyncs.add(request); // 如果是同步 则添加到pendingSyncs队列中
case OpCode.create:
case OpCode.delete:
case OpCode.setData:
case OpCode.setACL:
case OpCode.createSession:
case OpCode.closeSession:
zks.getFollower().request(request); // 调用follower的request方法
break;
}
}
} catch (Exception e) {
LOG.error("FIXMSG",e);
}
ZooTrace.logTraceMessage(LOG, ZooTrace.getTextTraceLevel(),
"FollowerRequestProcessor exited loop!");
}
public void processRequest(Request request) {
if (!finished) {
queuedRequests.add(request); // 将请求添加到queuedRequests队列中
}
}
void request(Request request) throws IOException {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
DataOutputStream oa = new DataOutputStream(baos);
oa.writeLong(request.sessionId); // 写入会话
oa.writeInt(request.cxid);
oa.writeInt(request.type);
if (request.request != null) { // 检查请求是否为空
request.request.rewind();
int len = request.request.remaining();
byte b[] = new byte[len];
request.request.get(b);
request.request.rewind();
oa.write(b);
}
oa.close();
QuorumPacket qp = new QuorumPacket(Leader.REQUEST, -1, baos
.toByteArray(), request.authInfo); // 写入包中
// QuorumPacket qp;
// if(request.type == OpCode.sync){
// qp = new QuorumPacket(Leader.SYNC, -1, baos
// .toByteArray(), request.authInfo);
// }
// else{
// qp = new QuorumPacket(Leader.REQUEST, -1, baos
// .toByteArray(), request.authInfo);
// }
writePacket(qp); // 将该包发送出去
}
此时第一个进入的请求处理是FollowerRequestProcessor类,
总结
此时如果是create删除等类型的数据会转发到leader去处理,通过zks.getFollower().request(request)转发到leader;
此时数据就发送到了leader处理,此时leader的处理流程就是FollowerHandler中的run方法进行处理,此时就是将客户端的请求数据例如修改删除等数据交友leader处理。接下来如查询的相关操作就直接返回数据。剩余的两个类的执行流程已经基本概述再次就补在赘述。
本文主要描述了主从角色关系之间的一些关系,在运行期间选举角色的改变是如何运作的,然后概述了数据在主从之间的交互的流程,主要就是事务等数据变更都会给leader处理,像数据读取如果客户端连接上来可以直接处理返回。里面还有follower的接受leader传来的数据的流程因为篇幅原因就没有依次列举,但是执行流程在前文中也做过描述。由于本人才疏学浅,如有错误请批评指正。