当前位置: 首页>后端>正文

zookeeper 添加节点 zookeeper怎么创建节点

问题背景

zookeeper是一个分布式协调服务,zookeeper可以保证数据的一致性是因为所有的写请求都会被Follower节点转发到Leader节点执行。因此创建节点的请求也是一样的,只会别Leader节点创建新的节点,然后把数据同步到其他的Follower节点,那么它是如何保证创建的节点是唯一的呢?

zookeeper源码级保证原子性

zookeeper中创建节点是由DataTree的createNode方法来执行的

public String createNode(String path, byte data[], List<ACL> acl,long ephemeralOwner, int parentCVersion, long zxid, long time)
            throws KeeperException.NoNodeException,
            KeeperException.NodeExistsException {
        int lastSlash = path.lastIndexOf('/');
        String parentName = path.substring(0, lastSlash);
        String childName = path.substring(lastSlash + 1);
        StatPersisted stat = new StatPersisted();
        stat.setCtime(time);
        stat.setMtime(time);
        stat.setCzxid(zxid);
        stat.setMzxid(zxid);
        stat.setPzxid(zxid);
        stat.setVersion(0);
        stat.setAversion(0);
        stat.setEphemeralOwner(ephemeralOwner);
        DataNode parent = nodes.get(parentName);
        if (parent == null) {
            throw new KeeperException.NoNodeException();
        }
        synchronized (parent) {
            Set<String> children = parent.getChildren();
            if (children != null) {
                if (children.contains(childName)) {
                    throw new KeeperException.NodeExistsException();
                }
            }
            
            if (parentCVersion == -1) {
                parentCVersion = parent.stat.getCversion();
                parentCVersion++;
            }    
            parent.stat.setCversion(parentCVersion);
            parent.stat.setPzxid(zxid);
            Long longval = convertAcls(acl);
            DataNode child = new DataNode(parent, data, longval, stat);
            parent.addChild(childName);
            nodes.put(path, child);
            if (ephemeralOwner != 0) {
                HashSet<String> list = ephemerals.get(ephemeralOwner);
                if (list == null) {
                    list = new HashSet<String>();
                    ephemerals.put(ephemeralOwner, list);
                }
                synchronized (list) {
                    list.add(path);
                }
            }
        }
        // now check if its one of the zookeeper node child
        if (parentName.startsWith(quotaZookeeper)) {
            // now check if its the limit node
            if (Quotas.limitNode.equals(childName)) {
                // this is the limit node
                // get the parent and add it to the trie
                pTrie.addPath(parentName.substring(quotaZookeeper.length()));
            }
            if (Quotas.statNode.equals(childName)) {
                updateQuotaForPath(parentName
                        .substring(quotaZookeeper.length()));
            }
        }
        // also check to update the quotas for this node
        String lastPrefix;
        if((lastPrefix = getMaxPrefixWithQuota(path)) != null) {
            // ok we have some match and need to update
            updateCount(lastPrefix, 1);
            updateBytes(lastPrefix, data == null ? 0 : data.length);
        }
        dataWatches.triggerWatch(path, Event.EventType.NodeCreated);
        childWatches.triggerWatch(parentName.equals("") ? "/" : parentName,
                Event.EventType.NodeChildrenChanged);
        return path;
    }

createNode方法会获取到所要创建的新节点的name和其父节点的parentName,并且会创建一个StatPersisted对象存储新节点的一些状态属性信息。随后从名为nodes的ConcurrentHashMap(用于存放所有的节点,key是path字符串,value是DataNode对象)中获取到父节点,然后用synchronized锁住父节点,避免其他请求出现并发问题。获取到父节点之后会将新节点的路径add到父节点的children的Set集合中,之后新节点就会被put到nodes的ConcurrentHashMap中。

java源码级CAS

到此为止所有的工作交给了ConcurrentHashMap的put方法,现在来看一下ConcurrentHashMap的put方法是如何保证只创建一个节点的。
这里先简单说一下ConcurrentHashMap有关的知识,ConcurrentHashMap是在HashMap的基础上实现的,HashMap与ConcurrentHashMap的区别是ConcurrentHashMap在并发的情况下保证了线程安全,两者的底层数据结构都是一样的,但是不同的JDK版本中是不一样的,在JDK7及以前是数组+链表实现的,在JDK8以后是数组+链表+红黑树(链表长度到达8以后自动扩展为红黑树)实现的。

ConcurrentHashMap的put方法在JDK8是通过CAS+Synchronized取代Segment+ReentrantLock来实现的,这是一个很重要的知识点,这里暂时不展开来说。ConcurrentHashMap的put方法是先利用自旋锁+Synchronized来table操作数组,然后再用CAS的方式来提供写入到内存中去。

public V put(K key, V value) {
        return putVal(key, value, false);
 }
final V putVal(K key, V value, boolean onlyIfAbsent) {
        if (key == null || value == null) throw new NullPointerException();
        int hash = spread(key.hashCode());
        int binCount = 0;
        for (Node<K,V>[] tab = table;;) {
            Node<K,V> f; int n, i, fh;
            if (tab == null || (n = tab.length) == 0)
                tab = initTable();
            else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
                if (casTabAt(tab, i, null,
                             new Node<K,V>(hash, key, value, null)))
                    break;                   // no lock when adding to empty bin
            }
            else if ((fh = f.hash) == MOVED)
                tab = helpTransfer(tab, f);
            else {
                V oldVal = null;
                synchronized (f) {
                    if (tabAt(tab, i) == f) {
                        if (fh >= 0) {
                            binCount = 1;
                            for (Node<K,V> e = f;; ++binCount) {
                                K ek;
                                if (e.hash == hash &&
                                    ((ek = e.key) == key ||
                                     (ek != null && key.equals(ek)))) {
                                    oldVal = e.val;
                                    if (!onlyIfAbsent)
                                        e.val = value;
                                    break;
                                }
                                Node<K,V> pred = e;
                                if ((e = e.next) == null) {
                                    pred.next = new Node<K,V>(hash, key,
                                                              value, null);
                                    break;
                                }
                            }
                        }
                        else if (f instanceof TreeBin) {
                            Node<K,V> p;
                            binCount = 2;
                            if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                           value)) != null) {
                                oldVal = p.val;
                                if (!onlyIfAbsent)
                                    p.val = value;
                            }
                        }
                    }
                }
                if (binCount != 0) {
                    if (binCount >= TREEIFY_THRESHOLD)
                        treeifyBin(tab, i);
                    if (oldVal != null)
                        return oldVal;
                    break;
                }
            }
        }
        addCount(1L, binCount);
        return null;
    }

put方法中的CAS是调用了UnSafe类的方法来实现的,UnSafe来是看不到源码的。因此需要下载Openjdk的源码来看。但是看到UnSafe源码之后我们会发现它的方法都是native方法,这是因为Java不能直接访问操作系统底层,是通过本地方法来访问,而Unsafe类的native方法提供了硬件级别的原子操作,native方法是通过c++代码来实现的

static inline bool compareAndSwap (volatile jint *addr, jint old, jint new_val)
 {
  jboolean result = false;
  spinlock lock;
   if ((result = (*addr == old)))
     *addr = new_val;
   return result;
 }

C++则是通过汇编指令CMPXCHG来实现的,因此接下来我们一起来看一下汇编是如何保证原子性的。

汇编级别CAS

以下是几个和原子操作有关的汇编指令(所涉及的汇编知识不详细解释,只需要知道通过汇编语言可以实现原子操作即可):

指令

作用

XADD

先交换两个操作数的值,再进行算术加法操作。多处理器安全,在80486及以上CPU中支持

CMPXCHG

比较交换指令,第一操作数先和AL/AX/EAX比较,如果相等ZF置1,第二操作数赋给第一操作数,否则ZF清0,第一操作数赋给AL/AX/EAX。多处理器安全,在80486及以上CPU中支持

XCHG

交换两个操作数,其中至少有一个是寄存器寻址.其他寄存器和标志位不受影响

LOCK

这是一个指令前缀,在所对应的指令操作期间使此指令的目标操作数指定的存储区域锁定,以得到保护

当今几乎100%CPU都支持这些指令,因此用标准C和C++可以写出一系列几乎可以跨平台的原子操作函数。

操作系统(处理器)级别的CAS

到了这里我们再来看看操作系统底层(处理器)是如何保证原子操作的呢?
首先处理器能自动保证基本的内存操作是原子性的。表示当一个处理器读取一个字节时,其他处理器不能访问这个字节的内存地址。但是对于复杂的内存操作处理器是不能自动保证其原子性的,比如跨总线宽度、跨多个缓存行和跨页表的访问。但是,处理器提供了总线锁定和缓存锁定两个机制来保证复杂内存操作的原子性。

总线锁定机制:通过总线锁保证原子性。如果多个处理器同时对共享变量进行读改写操作(i++就是经典的读改写操作),那么共享变量就会被多个处理器同时进行操作,这样读改写操作就不是原子的,操作完之后的共享变量的值会和期望的不一致。最简答的就是i++问题,当两个处理器CPU1和CPU2处理变量i的时候,想要保证读改写共享变量的操作是原子的,就必须保证CPU1读改写共享变量的时候,CPU2不能操作缓存了该共享变量内存地址的缓存。处理器使用总线锁就是来解决这个问题的。所谓总线锁就是使用处理器提供的一个LOCLK#信号,当一个处理器在总线上输出此信号时,其他处理器的请求将被阻塞住,那么该处理器可以独占共享内存。

缓存锁定机制:通过缓存锁定来保证原子性。在同一时刻,只需保证对某个内存地址的操作是原子性即可,但总线锁定把CPU和内存之间的通信锁住了,这使得锁定期间,其它处理器不能操作其他内存地址的数据,所以总线锁定的开销比较大,目前处理器在某些场合下使用缓存锁定代替总线锁定来进行优化。频繁使用的内存会缓存在处理器的L1、L2和L3高速缓存里,那么原子操作就可以直接在处理器内部缓存中进行,并不需要声明总线锁。所谓“缓存锁定”是指如果共享内存如果被换存在处理器的缓存行中,并且在Lock操作期间被锁定,那么当它执行锁操作回写到内存时,处理器不在总线上声言LOCK#信号,而是修改内部的内存地址,并允许它的缓存一致性机制来保证操作的原子性,因为缓存一致性机制会阻止同时修改由两个以上处理器缓存的内存区域数据,当其他处理器回写已被锁定的缓存行的数据时,会使缓存行无效。

针对以上两个机制,我们就可以通过Intel处理器提供了很多Lock前缀的指令来实现。例如,位测试和修改指令:BTS、BTR、BTC;交换指令XADD、CMPXCHG,以及其他一些操作数和逻辑指令。被这些指令操作的内存区域就会加锁,导致其他处理器不能同时访问它。

到此为止我们探究zookeeper创建的节点是唯一的过程就结束了,有些知识点没有展开讲,可能是没有必要或者本人能力有限的原因,欢迎大家指正批评!



https://www.xamrdz.com/backend/3yd1934791.html

相关文章: