当前位置: 首页>后端>正文

分布式——分布式锁

[TOC]

前言

突然觉得想要安稳的度过一生简直可以称之为臆想,想想历史上的盛世,大都不过三四十年,如何能保证自己生活的大时间一定是在那三四十年之中(不过真的希望未来越来越好,大势要好,个人也要好)。一份稳定的工作,爱自己的人,自己爱的人感觉都有点奢求。我知道这些在生活种都会慢慢被发现,也许一眨眼就要回首往昔了,发现这些东西的选择并不是自由的,而是被时间推动的。好似随着年龄的增长越发觉得自己的渺小,过去的东西已成定局且越来越多,而未来的选择也愈少,可选择的也愈少。
唉,言归正传,本来上次写完那个CAP,感觉自己查了挺多资料的,写的还不错。现在回过头一看,写的是啥嘛!还是脑子太笨,就是属于那种记得慢,理解的慢,忘得快的那种人。所以想着这次不说怎样怎样好吧,至少要比上次的好。对了,上一篇等我有时间要详细补补,先立个flag再说。

幂等性

定义

HTTP/1.1中对幂等性的定义是:一次和多次请求某一个资源对于资源本身应该具有同样的结果(网络超时等问题除外)。用函数来表示的话就是f....f(f(x))=f(x)。

目的

1.在重要操作(如交易,转账)中防止请求重试带来的灾难性后果。

幂等的范围

请求

1.读请求——天生的幂等
2.写请求——非幂等,需要控制。

数据库层面

1.INSERT——非幂等,需要通过内容进行控制。
2.UPDATE——通过WHERE条件控制幂等性,尽量少用相对值进行操作。如UPDATE table1 SET column1 = column1 + 1 WHERE column2 = 2;
每次执行的结果都会发生变化,这种不是幂等的。而UPDATE table1 SET column1 = 1 WHERE column2 = 2;无论执行成功多少次状态都是一致的,因此也是幂等操作。
3.DELETE——同样通过WHERE条件控制,尽量少用相对值进行操作。如有
DELETE table1 WHERE column1 < now();应改为 DELETE table1 WHERE column1 < "2019-10-01";更好。

业务层面

1.在冗余部署多个服务的情况下,请求存在并发消费的情况,需要将请求由并行转化为串行。

保证幂等性的策略

保证幂等性的本质是做好资源的串行化处理,这种串行化是多方位的。
查询请求自不必说,新增请求需要做好重要控制属性的防重处理,更新请求通过乐观锁也可以得到很好的控制。
分布式系统中则通过分布式锁将请求由并行化转为串行化的处理。

锁的属性

1.可重入/不可重入

可重入:同一线程外层函数获取到锁之后,内层函数中含有获取该锁的代码不受影响,不必重新去申请锁,可直接调用执行。
不可重入:同上相反,同一个线程内外都要获取到锁才执行。很容易造成死锁。
synchronized和Reentrantlock都是可重入锁

2.公平/非公平

公平:先来先得,请求锁的线程组成队列消费锁。
非公平:来了先请求锁,请求到了就执行,未获取到就扔到等待获取锁的队列尾部。
synchronized 非公平
Reentrantlock可选择

3.读/写

读锁:可以多人读,但是读的时候不能写,上读锁。
写锁:写的时候不能读,且只能一个人写,上写锁。

4.共享/独占

独占锁:每次只能一个线程持有锁,ReentrantLock就是以独占方式实现的互斥锁。独占是一种悲观的加锁策略。
共享锁:该锁可以被多个线程持有,如ReadWriteLock。放宽了加锁策略,允许多个读操作的线程同时访问共享资源。
note : AQS(AbstractQueuedSynchronized)同时提供了互斥模式(exclusive)和共享模式(shared)两种不同的同步逻辑。

5.可中断/不可中断

可中断:锁的操作过程可被中断。ReentrantLock可中断。
不可中断:与上面相反。synchronized就是不可中断锁。

分布式锁

用来保证某一特定共享资源在非同一系统中的多进程的环境下的互斥访问。实质是通过对该资源的请求进行串行化,避免重复处理。但并不能解决请求的幂等性问题,仍需要在业务代码中进行幂等性的控制。需要在系统外部创建一个共享存储服务器,用来保存锁信息。

设计目标

安全属性

1.互斥。不管任何时候只能有一个客户端持有同一个锁。即锁的存在是全局强一致性的。
2.无死锁,具备锁失效机制。一是提供锁服务的系统本身高可用,系统稳健。多节点服务,任意节点宕机或发生网络分区不影响锁的获取;二是客户端对于锁的自动续约和自动释放。
3.对称,对于任意锁,其加锁和解锁必须是同一个客户端。

效率属性

1.容错,高可用,高性能。服务本身高可用,系统稳健。多节点服务,任意节点宕机或发生网络分区不影响锁的获取。
2.可重入,可有效减少死锁的发生情况。
3.多选择,可以选择尝试获取锁的时间。
4.客户端调用简单。要求锁的代码高度抽象,业务接入极简。

设计思路

1.对共享资源的控制。可用某一资源的标识符作为锁的key,这样可以到资源进行唯一进程的控制。
2.锁的唯一性控制。获取锁的进程单独获取一个唯一标识符(value),作为对锁释放时的条件,避免锁被别的进程释放。同时需要保证判断和del操作的原子性,防止产生误删行为。通常使用lua脚本。
3.避免进程未执行完锁被释放,资源被并行访问。进程及时续约,避免业务未完成锁被释放。
4.防止死锁的产生。锁的定期释放,设定释放时间。

边界条件

1.提供锁注册的服务本身的稳定性与一致性。
2.锁未能如期续租。如心跳续租不成功、服务启动GC,GC期间服务挂起时间超出锁的有效时间等。
3.业务假死,TTL仍在继续。

设计要点

1.锁的注册。保证锁的注册是原子性的,即判断锁是否存在和注册是串行化的。
2.锁的续期。如何在对业务代码入侵最小的情况下续期锁的租约。CAS原子性。
3.锁的释放。锁的使用者只释放自己所持有的锁。

不同的实现

redis实现

原理
1.单实例实现原理

redis的唯一线程串行处理,即本身是一个幂等线性的系统。但是这样会产生单点故障问题。如果redis使用主从模式,因为redis的复制是异步 的所以会出现:

1.客户端A在master节点获取到锁。
2.master节点在将信息写入slave之前宕机。
3.slave被提升为master。
4.客户端B获取到与A同一资源的锁。此时该资源进入多进程并行消费状态。有悖互斥原则。

当然这种情况出现的概率是非常低的,如果资源对于这种情况并不敏感,通常也是可以接受的。
或者使用单实例的redis实现方式,这种情况下即程序需对单点故障的问题容忍度较高才行。

2.RedLock算法实现

该实现方式实质是在redis集群中实现一个一致性协议来实现key的唯一。不过要求所有的节点均为redis master节点,且完全相互独立,不存在主从复制或者其他集群协调机制。

如果要获取锁,客户端进行的操作需要有:

1.客户端获取当前时间(毫秒)
2.使用相同的key和不同的value去顺序请求存在的N个master节点。(
该步骤需要客户端设置一个比自动释放时间小的多请求超时时间。比如说,当自动释放时间为10秒时,设置超时时间为5~50毫秒。这是用来防止在宕掉的节点上消耗过多的时间。一个节点不可用,应立即进行下个节点的请求。
)
3.客户端计算获取锁消耗的时间(step2消耗的时间——当前时间-step1获取到的时间)。当且仅当在N/2+1个节点获取到锁,获取锁的消耗时间小于锁的失效时间,才认为锁的获取是成功的。
4.若锁被获得,其有效时间可以被视为——初始的过期时间-获取锁的消耗时间。
5.如果客户端获取锁失败,无论什么原因(其实就两种原因,一是成功节点小于N/2+1,二是超时),都将在所有节点进行锁的释放(即便是哪些根本没有获取到锁的节点);

根据上面的算法描述来看,redis集群至少需要三个master节点。且实现上也较为繁琐。加锁开销过大。对于redis运行问题要求也较高。总的来说实现成本过高。不过好在已经有人实现了——Redisson

单实例的实现

之前看到的资料一般都是用setNX这个命令来进行锁的注册,但是这个命令无法设置自动过期时间,只能通过设置value为时间戳来进行过期控制,其实不是很好。现在redis官方更推荐使用SET命令。

SET resource-name anystring NX EX max-lock-time

自2.6.12版本SET命令支持多种操作:
    EX seconds -- Set the specified expire time, in seconds.
    PX milliseconds -- Set the specified expire time, in milliseconds.
    NX -- Only set the key if it does not already exist.
    XX -- Only set the key if it already exist.

在释放锁的时候我们则可以用lua脚本语言用来保证原子性进行锁的释放

    if redis.call("get",KEYS[1]) == ARGV[1]
    then
        return redis.call("del",KEYS[1])
    else
        return 0
    end   

具体的代码实现
这里用到的是lettuce

1.锁的实现
 /** 
 *  如果锁处于空闲状态,当前线程获取到锁 
 *  如果锁已经被其它线程持有,禁用当前线程,直到当前线程获取到锁 
 */
 @Override
 public void lock() {    
 //选定同步方式    
    while (true){        
        if (tryLock())return;        
        this.sleepByMillisecond(renewalTime >> 1);    
    }
 }
 /** 
 *  尝试获取锁 
 *  如果锁可用返回true 否则返回 false 
 * @return 
 */
 @Override
 public boolean tryLock() {    
    //使用ThreadLocal保存当前锁对象 作为可重入锁的控制
    if (threadLocal.get() != null) return true;
    String set = statefulRedisConnection.sync().set(lockKey, lockValue, new SetArgs().nx().ex(lockTime));   
    if (set != null && "OK".equals(set)){        
        System.out.println("线程id:"+Thread.currentThread().getId() + "加锁成功!时间:"+ LocalTime.now());        
        isOpenExpirationRenewal = true; 
        threadLocal.set(this);
        this.scheduleExpirationRenewal();        
        return true;    
    }    
    return false;
 }
 /** 
 *  尝试获取锁 
 *  在指定时间内可以获取到  返回true 否则返回false 
 * @param time 
 * @param unit 
 * @return 
 * @throws InterruptedException 
 */
 @Override
 public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {   
     //获取进入时间    
     LocalTime now = LocalTime.now();    
     while (now.isBefore(now.plus(time, (TemporalUnit) unit))){       
        if (tryLock())return true;    
     }    
     return false;
 }

2.锁的续租

可能存在两个问题,一是主线程挂掉,续租线程无法关闭;二是续租本身失败。目前感觉对于第一种情况最好的方式是在加锁代码中设置异常捕捉,最后finally代码块中执行解锁。第二种方式则是需要做好日志记录分析代码问题具体解决了。不过您要是有更好的实现方式,欢迎传道解惑。


@Override
protected void scheduleExpirationRenewal() {    
    Thread thread = new Thread(new ExpirationRenewal());    
    thread.start();
}
private class ExpirationRenewal implements Runnable{    
    @Override    
    public void run() {        
        while (isOpenExpirationRenewal){            
            try {                
                Thread.sleep(renewalTime);           
            } catch (InterruptedException e) {               
                e.printStackTrace();           
            }            
            String expirFromLua = "if redis.call('get', KEYS[1]) == ARGV[1]"                    
                    + " then "                    
                    + "return redis.call('expire',KEYS[1],ARGV[2])"                    
                    + " else "                   
                    + "return 0"                   
                    + " end";            
            Object eval = statefulRedisConnection.sync().eval(expirFromLua, 
                ScriptOutputType.INTEGER, new String[]{lockKey}, lockValue, lockTime.toString());            
            System.out.println("续租获取的结果值:" + eval + ((long)eval==1?" 续租成功":" 续租失败"));        
        }   
    }
}
3.锁的释放
@Override
public void unlock() {    
//关闭续租    
isOpenExpirationRenewal = false;    
    //删除锁    
    String delFromLua = "if redis.call(\"get\", KEYS[1]) == ARGV[1]"    
        + " then "    
        + "return redis.call(\"del\",KEYS[1])"    
        + " else "    
        + "return 0"    
        + " end";    
    Long eval = statefulRedisConnection.sync().eval(delFromLua, ScriptOutputType.INTEGER, new String[]{lockKey}, lockValue);    
    if (eval == 1){       
        System.out.println("锁释放成功");    
    }else {       
        //最好做好日志记录
        System.out.println("锁 早已经释放");    
    }
}

4.总结

上面只实现了可重入,需要考虑一下如何实现公平和非公平的切换,以及读写锁。实现公平锁的关键是维护一个跨进程的请求锁队列,只能用redis本身来实现。

2.RedLock算法实现
1.Redisson简介

偷个懒,直接搬个官方的简介

Based on high-performance async and lock-free Java Redis client and Netty framework.

Redisson功能还是挺强大的,推荐有时间可以去看看(有中文文档哦)。

2.RedLock算法的实现

Redissond的红锁(RedissonRedLock)对象实现了RedLock介绍的加锁算法。使用方式如下

RLock lock1 = redissonInstance1.getLock("lock1");
RLock lock2 = redissonInstance2.getLock("lock2");
RLock lock3 = redissonInstance3.getLock("lock3");

RedissonRedLock lock = new RedissonRedLock(lock1, lock2, lock3);
// 同时加锁:lock1 lock2 lock3// 红锁在大部分节点上加锁成功就算成功。
lock.lock();
...
lock.unlock();

需要创建多个redissonInstance1并创建多个RLock对象,使用这些来组成分布式锁。其在redis种选用的数据结构是hash,用来实现公平锁较方便。

使用起来还是比较麻烦的。

zookeeper实现

原理

zookeeper是一个分布协调服务,实现了一致性协议。分为Sever端和Client端。其中Server端是分布式应用。zab一致性协议,znode的临时顺序节点,watchs机制是zookeeper实现分布式锁的关键。

1.zookeeper的数据模型

hierarchal name space(层级名称空间),可以看作为一个分布式文件系统。具有树形结构,每个节点以“/”分隔。大部分unicode字符都可以作为节点名称。
zookeeper tree中的节点称为znode。znode节点中维护着一个stat的数据结构,其中保存数据更改、acl(Access Control List——访问控制列表,每个节点都存在,该列表限制谁可以做什么,相当于节点的权限列表,有:CREATE、READ、WRITE、DELETE、ADMIN)更改的版本号以及对应的时间戳。
znode的读写是原子性的。

znode的节点可以分为:

  • 持久节点 Persistent Nodes
    即使在创建该特定znode的客户端断开连接后,持久节点仍然存在。默认情况下,除非另有说明,否则所有znode都是持久的。
  • 临时节点 Ephemeral Nodes
    临时节点是伴随着客户端和服务端的会话创建的,当一个会话结束,其所对应的znode也会被删除。因此临时节点是无法创建子节点的。临时节点在leader选举中起着重要作用。
  • 顺序节点 Sequence Nodes
    顺序节点可以是持久的或临时的。当一个新的znode被创建为一个顺序节点时,ZooKeeper通过将10位的序列号附加到原始名称来设置znode的路径。该序列号是一个单调递增的数列。例如,如果将具有路径 /mynode 的znode创建为顺序节点,则ZooKeeper会将路径更改为 /mynode0000000001 ,并将下一个序列号设置为0000000002。当递增超过2147483647(2^31 - 1)时,计数器将溢出(导致名称“-2147483648”)。顺序节点在锁定和同步中起重要作用
  • Container Nodes 3.5.3版本添加
    该类型节点是为特殊请情况设置的,如leader,lock等。当容器内最后一个子节点被删除后,容器将在未来的某个时候成为服务器要删除的候选对象(个人感觉类似于redis中的定期删除)。因此在创建Container node子节点时有可能会抛出KeeperException.NoNodeException,所以当子节点时需要判断是否有该异常抛出,且在抛出后重新创建容器节点。
  • TTL Nodes 3.5.3版本添加
    该类节点只能是持久节点。在创建持久节点或持久顺序节点时,可以选择设置节点的TTL。若该节点的TTL时间未发生修改,且无子节点,那么该节点将在未来的某个时刻被服务器删除。TTL节点必须通过系统属性启用,因为它们在默认情况下是禁用的。
2.Watchs

客户端可以通过设置一个用来监听znode节点信息的Watchs来获取节点的更改信息。

实现步骤

通常实现锁的关键是临时顺序节点和Watchs机制。即通过临时顺序节点创建一个获取锁的队列,通过watchs机制监听前一数据的消费情况,判断自己是否被消费。

1.创建持久节点lock作为锁的父节点。
2.请求锁的客户端A在lock节点下创建临时顺序节点lock000000N;
3.获取lock下的节点列表,获取编号比客户端A次小的节点,若不存在,表示当前线程序号最小,获取到锁。
4.若编号比客户端A次小的节点存在,设置Watchs监听次小节点。次小节点删除,判断自己是不是最小的节点,是则获取到锁。

apache的开源库Curator提供了相应的分布式锁的实现。
不过有时间可以自己实现一个,不止使用上面的常规方式,也可使用Container节点或者TTL节点。

总结

zookeeper的实现相较于redis来讲,不必考虑锁的一致性问题,省去了一大麻烦。等我有时间看看Curator源码去。

etcd实现

什么是etcd

一个分布式键值对(K-V)存储服务。其实现了一致性算法Raft(糟了又忘了,这个一致性算法感觉写的还是不够详细,等我有时间再好好研究一下),故高可用一定是2N+1个。主要用来存储不常发生改变的数据,并提供监视查询。可使用http/https/grpc调用服务端api,grpc方式自带api,使用时不必关心续约问题。其中grpc是v3版本中添加可用。v3版本仍支持http/https的rest方式调用,但是需要注意的是v2版本同v3版本并不兼容,命令大多发生修改,数据不互通,如使用v2版本创建的数据,v3版本不可见。且v3版本对rest的调用方式支持并不是很好。

原理
数据模型

etcd是基本的Key-Value结构。

  • 逻辑视图:
    存储区的逻辑视图是一个平面二进制(flat binary 说实话我对这个词不是很明白,需要查查资料,我个人理解是一种数据的层级结构,类似树的结构。有知道的望不吝赐教,抱拳)key space。Key空间维护多个Revision。每次进行事务,将在Key空间上创建一个新的Revision。Key从创建到销毁就是Key的一个生命周期(generation)。每个Key可能拥有一个到多个生命周期(意味着保存着key多次创建修改销毁的详细记录)。若Key不存在于当前修改中,那么当前Key的版本(version)从1开始。删除Key则生成一个标志,该标志中包含当前被关闭key的代信息(通过重置version 为0)。每次修改key的version都会自增。在一个key的生命周期内version是单调递增的。一次compaction发生,任何在compaction revision之前的key生命周期记录(已结束的generation)会被删除,并删除在compaction revision之前的设置的值(最新的除外)。

  • 物理视图
    etcd的KV数据以B+书的形式持久化到磁盘种,其更新数据时并不是对原有数据结构上的更新,而是生成一个更新后的数据结构。故其读写性能无法与redis之类相比。

etcd实现分布式锁的基础
  • Lease(A short-lived renewable contract that deletes keys associated with it on its expiry)机制:即租约机制(TTL, Time To Live)。续租问题。
  • Revision(A 64-bit cluster-wide counter that is incremented each time the keyspace is modified)机制:每个Key带有一个Revision,每次进行事务将在key space创建一个新的Revision,该Revision是单调递增的,初始值为零。通过Revision大小可以知道进行写操作的顺序。在实现分布式锁时,可以依据Revision的大小进行锁获取的顺序,实现公平锁。
  • Prefix机制:即前缀机制,也称目录机制。key可以以目录的形式创建,如key1 = "/mylock/00001", key2 = "/mylock/00002"。通过前缀/mylock范围查询,获取KV列表,通过Revision的大小控制锁的获取顺序,实现公平锁。
  • Watch机制:监听机制,Watch可以监听单个key,也支持范围监听。
实现步骤

对比zookeeper和etcd我们会发现整体机制上是相同的。所以其实现方式大致也相同。
其步骤如下:

1.创建对应资源的分布式锁的全局key的前缀,如/mylock,则第一个请求的客户端创建的key则为/mylock/uuid001,第二个为/mylock/uuid002。
2.为该对应资源创建租约Lease为特定合适时间。
3.客户端put自己的key,如/mylock/uuid001。这里的value不为空即可。获取自己put后的revision号,并记录。
4.获取key前缀下的所有键值对信息,并根据revision号大小排序,小的在前。
5.对比put的revision值是否是最小的,是的话获取锁,并开启续租线程。不是,则获取前一个加锁操作索引,并创建一个Watcher,用于监听前一一个key(将处于阻塞状态)。

etcd最新版本中有提供分布式锁的实现。不必关心锁的释放和续约,当调用unlock时自动释放。关键字lock,其实现原理基本同上述所讲。
etcd的java客户端有etcd4j,jetcd等。

总结

etcd的实现方式较为简单,且其本身已经实现了分布式锁,只用一个lock命令就可以轻松的获取锁。但是etcd在java的其他生态应用中似乎不是很常用,如果仅仅是用来做分布式锁的获取的话其实并不是很划算。

结束语

终于写完了,其实是赶工的,中间的zookeeper并没有详细的去写出来,redis的那个也没有实现公平锁。现在这些东西提供的越来越全面了,有些不必自己去写,但是原理一定要懂。其实这些都是比较-续约-释放的具体实现。有机会还是要看看源码,拓展一下自己的思路,希望接下来有时间能把Redisson、Curator、Jetcd的锁实现源码看看。唉,一直立flag一直爽,一直拖也一直爽,不知道哪个能更强一头...
睡觉啦,睡觉啦...
分布式——分布式锁

参考资料:
redis:https://redis.io/topics/distlock
zookeeper:http://zookeeper.apache.org/
etcd:https://etcd.io/

幂等性:https://www.cnblogs.com/javalyy/p/8882144.html
zookeeper分布式锁的实现:https://www.cnblogs.com/javalyy/p/8882144.html
etcd锁实现:http://blogspring.cn/view/120
jetcd:https://github.com/etcd-io/jetcd


https://www.xamrdz.com/backend/3vf1940573.html

相关文章: