Zookeeper(2)-分布式锁的基础实现
Zookeeper 分布式锁
什么是分布式锁?
在进行分布式锁操作之前,我们得知道什么是分布式锁。在单体应用中,使用 API 自带的 javaLock 或者是 synchronize 就可以解决线程多带来的并发问题。但是在集群环境中,上述的方法并不能解决服务与服务之间的并发问题。
分布式锁一般用在分布式系统或者多个应用中,用来控制同一任务是否执行或者任务的执行顺序。在项目中,部署了多了应用,在执行定时任务时就会遇到同一任务可能执行多次的情况,我们可以借助分布式锁,保证在同一时间只有一个tomcat应用执行了定时任务
实现分布式锁的方式
数据库实现(效率低,不推荐)
redis 实现(使用 redission 实现,当需要考虑死锁和释放问题,比较繁琐)
Zookeeper 实现(使用临时节点,效率高)
spring Cloud 实现全局锁(内置的)
Zookeeper 实现分布式锁
实现原理
使用 Zookeeper 创建临时顺序节点,判断自己是不是当前节点下的最小节点,是的话就是获取到了锁,直接执行业务代码。不是的话,便对前一个节点进行监听。获取到锁,执行完业务代码后,delete 节点释放当前锁,然后下面的节点接收到通知。
案例实战
原生 Zookeeper 案例
编写分布式锁的代码
测试代码
输出信息
在上面获取 Zookeeper 连接的代码中自定义 ZKClientConfig 配置信息,将 ENABLE_CLIENT_SASL_KEY 改成 false。
Curator 案例
导入 POM 文件
实战代码
输出信息
使用 Curator 出现这个问题的方案还是和上面原生的是一样,因为其本质还是通过 Zookeeper 的客户端代码去进行一个连接。
创建自定义 ZookeeperFactory
}
使用 CuratorFrameworkFactory 创建连接的时候导入自定义 ZookeeperFactory
小伙伴们有兴趣想了解内容和更多相关学习资料的请点赞收藏+评论转发+关注我,后面会有很多干货。

#面试复盘#
什么是分布式锁?
在进行分布式锁操作之前,我们得知道什么是分布式锁。在单体应用中,使用 API 自带的 javaLock 或者是 synchronize 就可以解决线程多带来的并发问题。但是在集群环境中,上述的方法并不能解决服务与服务之间的并发问题。
分布式锁一般用在分布式系统或者多个应用中,用来控制同一任务是否执行或者任务的执行顺序。在项目中,部署了多了应用,在执行定时任务时就会遇到同一任务可能执行多次的情况,我们可以借助分布式锁,保证在同一时间只有一个tomcat应用执行了定时任务
实现分布式锁的方式
数据库实现(效率低,不推荐)
redis 实现(使用 redission 实现,当需要考虑死锁和释放问题,比较繁琐)
Zookeeper 实现(使用临时节点,效率高)
spring Cloud 实现全局锁(内置的)
Zookeeper 实现分布式锁
实现原理
使用 Zookeeper 创建临时顺序节点,判断自己是不是当前节点下的最小节点,是的话就是获取到了锁,直接执行业务代码。不是的话,便对前一个节点进行监听。获取到锁,执行完业务代码后,delete 节点释放当前锁,然后下面的节点接收到通知。
案例实战
原生 Zookeeper 案例
编写分布式锁的代码
public class DistributedLock { private final String connectString = "192.168.3.33:2181"; private final int Timeout = 2000; private final ZooKeeper zooKeeper; private final String rootNode = "locks"; private final String subNode = "seq-"; private CountDownLatch connectLatch = new CountDownLatch(1); private CountDownLatch waitLatch = new CountDownLatch(1); private String waitPath; private String currentNode; public DistributedLock() throws IOException, KeeperException, InterruptedException { // 获取连接 zooKeeper = new ZooKeeper(connectString, sessionTimeout, new Watcher() { @Override public void process(WatchedEvent event) { // 连接建立时, 打开 latch, 唤醒 wait 在该 latch 上的线程 if (event.getState() == Event.KeeperState.SyncConnected) { connectLatch.countDown(); } // 发生了 waitPath 的删除事件 if (event.getType() == Event.EventType.NodeDeleted && event.getPath().equals(waitPath)) { waitLatch.countDown(); } } }); connectLatch.await(); // 判断节点/locks 是否存在 Stat stat = zooKeeper.exists("/" + rootNode, false); // 如果根节点不存在则创建永久根节点 if (stat == null) { System.out.println("根节点不存在!"); zooKeeper.create("/" + rootNode, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } } // 获取锁 public void zkLock() throws KeeperException, InterruptedException { // 在根节点下创建临时顺序节点,返回值为创建的节点路径 currentNode = zooKeeper.create("/" + rootNode + "/" + subNode, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); // 获取所有的节点 List <String> children = zooKeeper.getChildren("/" + rootNode, false); // 列表中只有一个节点,就直接获取到锁 if (children.size() == 0) { return; } else { // 对节点进行排序 Collections.sort(children); //当前节点名称 String thisNode = currentNode.substring(("/" + rootNode + "/").length()); // 获取当前节点在数组中的位置 int indexOf = children.indexOf(thisNode); if (indexOf == -1) { System.out.println("数据异常"); } else if (indexOf == 0) { // index == 0 说明 thisNode 在列表中最小,当前 client 获取锁 return; } else { // 获得排名比 currentNode 前 1 位的节点 this.waitPath = "/" + rootNode + "/" + children.get(indexOf - 1); // 在 waitPath 上注册**, 当 waitPath 被删除时,zookeeper 会回调**的 process 方法 zooKeeper.getData(waitPath, true, new Stat()); waitLatch.await(); return; } } } // 释放锁 public void unZkLock() { try { zooKeeper.delete(this.currentNode, -1); } catch (InterruptedException | KeeperException e) { e.printStackTrace(); } } }
测试代码
public class DistributedLockTest { public static void main(String[] args) throws InterruptedException, IOException, KeeperException { // 创建分布式锁 1 final DistributedLock lock1 = new DistributedLock(); // 创建分布式锁 2 final DistributedLock lock2 = new DistributedLock(); new Thread(new Runnable() { @Override public void run() { try { lock1.zkLock(); System.out.println("线程 1 获取锁"); Thread.sleep(5 * 1000); lock1.unZkLock(); System.out.println("线程 1 释放锁"); } catch (Exception e) { e.printStackTrace(); } } }).start(); new Thread(new Runnable() { @Override public void run() { try { lock2.zkLock(); System.out.println("线程 2 获取锁"); Thread.sleep(5 * 1000); lock2.unZkLock(); System.out.println("线程 2 释放锁"); } catch (Exception e) { e.printStackTrace(); } } }).start(); } }
输出信息
线程 1 获取到锁了! 线程 1 再次获取到锁了! 休息一下! 线程 1 释放锁了! 线程 1 释放锁了! 线程 2 获取到锁了! 线程 2 再次获取到锁了! 休息一下! 线程 2 释放锁了! 线程 2 释放锁了! 解决方案
在上面获取 Zookeeper 连接的代码中自定义 ZKClientConfig 配置信息,将 ENABLE_CLIENT_SASL_KEY 改成 false。
ZKClientConfig config = new ZKClientConfig(); config.setProperty(ZKClientConfig.ENABLE_CLIENT_SASL_KEY, "false"); // 获取连接 zooKeeper = new ZooKeeper(connectString, sessionTimeout, new Watcher() { @Override public void process(WatchedEvent event) { // 连接建立时, 打开 latch, 唤醒 wait 在该 latch 上的线程 if (event.getState() == Event.KeeperState.SyncConnected) { connectLatch.countDown(); } // 发生了 waitPath 的删除事件 if (event.getType() == Event.EventType.NodeDeleted && event.getPath().equals(waitPath)) { waitLatch.countDown(); } } }, config);
Curator 案例
导入 POM 文件
<dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-framework</artifactId> <version>4.3.0</version> </dependency> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-recipes</artifactId> <version>4.3.0</version> </dependency> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-client</artifactId> <version>4.3.0</version> </dependency>
实战代码
public class CuratorLockTest { // 测试代码 public static void main(String[] args) throws Exception { // 创建分布式锁1 InterProcessMutex locks1 = new InterProcessMutex(getCuratorFramework(), "/locks"); // 创建分布式锁2 InterProcessMutex locks2 = new InterProcessMutex(getCuratorFramework(), "/locks"); new Thread(new Runnable() { @Override public void run() { try { // 获取到锁 locks1.acquire(); System.out.println("线程 1 获取到锁了!"); locks1.acquire(); System.out.println("线程 1 再次获取到锁了!"); System.out.println("休息一下!"); Thread.sleep(5 * 1000); locks1.release(); System.out.println("线程 1 释放锁了!"); locks1.release(); System.out.println("线程 1 释放锁了!"); } catch (Exception e) { e.printStackTrace(); } } }).start(); new Thread(new Runnable() { @Override public void run() { try { // 获取到锁 locks2.acquire(); System.out.println("线程 2 获取到锁了!"); locks2.acquire(); System.out.println("线程 2 再次获取到锁了!"); System.out.println("休息一下!"); Thread.sleep(5 * 1000); locks2.release(); System.out.println("线程 2 释放锁了!"); locks2.release(); System.out.println("线程 2 释放锁了!"); } catch (Exception e) { e.printStackTrace(); } } }).start(); } // 创建连接 private static CuratorFramework getCuratorFramework() throws Exception { ExponentialBackoffRetry backoffRetry = new ExponentialBackoffRetry(3000, 3); DefaultZookeeperFactory zookeeperFactory = new DefaultZookeeperFactory(); CuratorFramework client = CuratorFrameworkFactory.builder().connectString("192.168.3.33:2181") .sessionTimeoutMs(2000).retryPolicy(backoffRetry)..build(); client.start(); System.out.println("客户端启动成功!"); return client; } }
输出信息
线程 2 获取到锁了! 线程 2 再次获取到锁了! 休息一下! 线程 2 释放锁了! 线程 2 释放锁了! 线程 1 获取到锁了! 线程 1 再次获取到锁了! 休息一下! 线程 1 释放锁了! 线程 1 释放锁了! 解决方案
使用 Curator 出现这个问题的方案还是和上面原生的是一样,因为其本质还是通过 Zookeeper 的客户端代码去进行一个连接。
创建自定义 ZookeeperFactory
public class DefaultZookeeperFactory implements ZookeeperFactory { @Override public ZooKeeper newZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean b) throws Exception { // 自定义 ZKClientConfig 配置 ZKClientConfig config = new ZKClientConfig(); config.setProperty(ZKClientConfig.ENABLE_CLIENT_SASL_KEY, "false"); return new ZooKeeper(connectString, sessionTimeout, watcher, b, config); }
}
使用 CuratorFrameworkFactory 创建连接的时候导入自定义 ZookeeperFactory
CuratorFramework client = CuratorFrameworkFactory.builder().connectString("192.168.3.33:2181") .sessionTimeoutMs(2000).retryPolicy(backoffRetry). zookeeperFactory(zookeeperFactory).build();
小伙伴们有兴趣想了解内容和更多相关学习资料的请点赞收藏+评论转发+关注我,后面会有很多干货。
#面试复盘#