说明
注意: 如未安装Zookeeper服务端,请先移步《docker安装Zookeeper(单点)》搭建简易zk环境。
使用Curator框架实现简单CRUD操作请看《Zookeeper客户端Curator使用(增删改查)》
本篇文章关于Zookeeper实现分布式锁。
概述
1. 线程锁和分布式锁的区别
线程锁:主要用来给方法、代码块加锁。当某个方法或代码使用锁,在同一时刻仅有一个线程执行该方法或该代码段。
线程锁只在同一JVM中有效果,因为线程锁的实现在根本上是依靠线程之间共享内存实现的,比如synchronized是共享对象头,显示锁Lock是共享某个变量(state)。
分布式锁:分布式锁,即分布式系统中的锁,解决了分布式系统中控制共享资源访问的问题。分布式锁是在分布式或者集群环境下,多进程可见,并且互斥的锁。
2. ZK实现分布式锁分析
客户端向zookeeper集群建立连接并在一个永久节点下创建有序的临时子节点后,根据编号顺序,最小顺序的子节点获取到锁,其他子节点由小到大监听前一个节点。
当拿到锁的节点处理完事务后,释放锁,后一个节点监听到前一个节点释放锁后,立刻申请获得锁,以此类推。
3. ZK实现分布式锁流程图
使用
1.原生Zookeeper代码实现分布式锁
按照上面zk分布式锁分析和流程图,使用原生zookeeper相关的api实现分布式锁,具体代码如下
public static class DistributedLock {
private ZooKeeper client;
/**
* 等待zk连接成功
*/
private CountDownLatch countDownLatch;
/**
* 等待节点变化
*/
private CountDownLatch waitLatch;
/**
* 当前节点
*/
private String currentNode;
/**
* 前一个节点路径
*/
private String waitPath;
/**
* 根节点
*/
private String lockPath;
public DistributedLock(String connectString, int sessionTimeOut, String path) throws Exception {
countDownLatch = new CountDownLatch(1);
waitLatch = new CountDownLatch(1);
lockPath = path;
client = new ZooKeeper(connectString, sessionTimeOut, watchedEvent -> {
// 连上ZK后,释放
if (watchedEvent.getState() == Watcher.Event.KeeperState.SyncConnected) {
countDownLatch.countDown();
}
// waitLatch 需要释放 (节点被删除并且删除的是前一个节点)
if (watchedEvent.getType() == Watcher.Event.EventType.NodeDeleted &&
watchedEvent.getPath().equals(waitPath)) {
waitLatch.countDown();
}
});
// 等待Zookeeper连接成功,连接完成继续往下走
countDownLatch.await();
Stat stat = client.exists(lockPath, false);
if (stat == null) {
// 根节点不存在,则创建
client.create(lockPath, lockPath.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
}
/**
* 加锁
*/
public void lock() {
try {
// 创建有序临时子节点
currentNode = client.create(lockPath + "/seq-", null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
// 如果是最小序号节点,则获取锁;如果不是就监听前一个节点
List<String> children = client.getChildren(lockPath, false);
// 子节点排序
Collections.sort(children);
// 截取子节点名称
String nodeName = currentNode.substring((lockPath + "/").length());
// 通过名称获取子节点在集合的位置
int index = children.indexOf(nodeName);
if (index == -1) {
System.out.println("数据异常");
} else if (index == 0) {
// 最小序号子节点,则获取锁
return;
} else {
// 监听前一个节点变化
waitPath = (lockPath + "/") + children.get(index-1);
client.getData(waitPath,true,null);
waitLatch.await();
return;
}
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
/**
* 释放锁
*
* @throws KeeperException
* @throws InterruptedException
*/
public void unlock() throws KeeperException, InterruptedException {
client.delete(currentNode, -1);
}
}
模拟两个进程抢占分布式锁流程,测试代码如下:
public static void main(String[] args) throws Exception {
String connString = "192.169.7.171:2181";
int sessionTimeOut = 3000;
String lockPath = "/jms";
// 创建两个分布式锁,模拟两个进程抢占分布式锁流程
DistributedLock lock1 = new DistributedLock(connString, sessionTimeOut, lockPath);
DistributedLock lock2 = new DistributedLock(connString, sessionTimeOut, lockPath);
new Thread(new Runnable() {
@Override
public void run() {
try {
lock1.lock();
System.out.println("线程 [" + Thread.currentThread().getName() + "]" + "->抢到分布式锁--开始工作");
Thread.sleep(5000);
lock1.unlock();
System.out.println("线程 [" + Thread.currentThread().getName() + "]" + "->释放分布式锁--结束工作");
} catch (InterruptedException | KeeperException e) {
e.printStackTrace();
}
}
}).start();
new Thread(new Runnable() {
@Override
public void run() {
try {
lock2.lock();
System.out.println("线程 [" + Thread.currentThread().getName() + "]" + "->抢到分布式锁--开始工作");
Thread.sleep(5000);
lock2.unlock();
System.out.println("线程 [" + Thread.currentThread().getName() + "]" + "->释放分布式锁--结束工作");
} catch (InterruptedException | KeeperException e) {
e.printStackTrace();
}
}
}).start();
}
2.使用Curator框架实现分布式锁
首先要引入Curator框架
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>5.4.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>5.4.0</version>
</dependency>
使用InterProcessMutex类来实现分布式锁
public static class DistributedLock {
private CuratorFramework client;
private InterProcessMutex mutex;
public DistributedLock(String connString, String lockPath) {
this(connString, lockPath, new ExponentialBackoffRetry(3000,5));
}
public DistributedLock(String connString, String lockPath, ExponentialBackoffRetry retryPolicy) {
try {
client = CuratorFrameworkFactory.builder()
.connectString(connString)
.retryPolicy(retryPolicy)
.build();
client.start();
mutex = new InterProcessMutex(client, lockPath);
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 获取分布式锁
*
* @throws Exception
*/
public void acquire() throws Exception {
mutex.acquire();
}
/**
* 获取分布式锁(指定时间)
*
* @param time 时间
* @param unit 时间单位
* @return boolean
* @throws Exception
*/
public boolean acquire(long time, TimeUnit unit) throws Exception {
return mutex.acquire(time, unit);
}
/**
* 释放分布式锁
*
* @throws Exception
*/
public void release() throws Exception {
mutex.release();
}
}
模拟50个进程抢占分布式锁,测试代码如下:
public static void main(String[] args) {
// 模拟50个进程抢占分布式锁
String connString = "192.169.7.171:2181";
int threadCount = 50;
CountDownLatch countDownLatch = new CountDownLatch(threadCount);
for (int i = 0; i < threadCount; i++) {
new Thread(new Runnable() {
@Override
public void run() {
try {
countDownLatch.await();
// 1.创建分布式锁
DistributedLock lock = new DistributedLock(connString, "/jms");
// 2.抢分布式锁
lock.acquire();
System.out.println("线程 [" + Thread.currentThread().getName() + "]" + "->抢到分布式锁--开始工作");
// 3.模拟执行业务逻辑
Thread.sleep(500L);
// 4.释放分布式锁
lock.release();
System.out.println("线程 [" + Thread.currentThread().getName() + "]" + "->释放分布式锁--结束工作");
} catch (Exception e) {
e.printStackTrace();
}
}
}).start();
countDownLatch.countDown();
}
}
源码
代码仓库地址: https://github.com/james-java/curator-example