Zookeeper(2)-分布式锁的基础实现

Zookeeper 分布式锁
什么是分布式锁?
在进行分布式锁操作之前,我们得知道什么是分布式锁。在单体应用中,使用  API 自带的 javaLock 或者是 synchronize 就可以解决线程多带来的并发问题。但是在集群环境中,上述的方法并不能解决服务与服务之间的并发问题。

分布式锁一般用在分布式系统或者多个应用中,用来控制同一任务是否执行或者任务的执行顺序。在项目中,部署了多了应用,在执行定时任务时就会遇到同一任务可能执行多次的情况,我们可以借助分布式锁,保证在同一时间只有一个tomcat应用执行了定时任务

实现分布式锁的方式
数据库实现(效率低,不推荐)
redis 实现(使用 redission 实现,当需要考虑死锁和释放问题,比较繁琐)
Zookeeper 实现(使用临时节点,效率高)
spring Cloud 实现全局锁(内置的)
Zookeeper 实现分布式锁
实现原理
使用 Zookeeper 创建临时顺序节点,判断自己是不是当前节点下的最小节点,是的话就是获取到了锁,直接执行业务代码。不是的话,便对前一个节点进行监听。获取到锁,执行完业务代码后,delete 节点释放当前锁,然后下面的节点接收到通知。

案例实战
原生 Zookeeper 案例

编写分布式锁的代码


public class DistributedLock { private final String connectString = "192.168.3.33:2181"; private final int Timeout = 2000; private final ZooKeeper zooKeeper; private final String rootNode = "locks"; private final String subNode = "seq-"; private CountDownLatch connectLatch = new CountDownLatch(1); private CountDownLatch waitLatch = new CountDownLatch(1); private String waitPath; private String currentNode; public DistributedLock() throws IOException, KeeperException, InterruptedException { // 获取连接 zooKeeper = new ZooKeeper(connectString, sessionTimeout, new Watcher() { @Override public void process(WatchedEvent event) { // 连接建立时, 打开 latch, 唤醒 wait 在该 latch 上的线程 if (event.getState() == Event.KeeperState.SyncConnected) { connectLatch.countDown(); } // 发生了 waitPath 的删除事件 if (event.getType() == Event.EventType.NodeDeleted && event.getPath().equals(waitPath)) { waitLatch.countDown(); } } }); connectLatch.await(); // 判断节点/locks 是否存在 Stat stat = zooKeeper.exists("/" + rootNode, false); // 如果根节点不存在则创建永久根节点 if (stat == null) { System.out.println("根节点不存在!"); zooKeeper.create("/" + rootNode, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } } // 获取锁 public void zkLock() throws KeeperException, InterruptedException { // 在根节点下创建临时顺序节点,返回值为创建的节点路径 currentNode = zooKeeper.create("/" + rootNode + "/" + subNode, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); // 获取所有的节点 List <String> children = zooKeeper.getChildren("/" + rootNode, false); // 列表中只有一个节点,就直接获取到锁 if (children.size() == 0) { return; } else { // 对节点进行排序 Collections.sort(children); //当前节点名称 String thisNode = currentNode.substring(("/" + rootNode + "/").length()); // 获取当前节点在数组中的位置 int indexOf = children.indexOf(thisNode); if (indexOf == -1) { System.out.println("数据异常"); } else if (indexOf == 0) { // index == 0 说明 thisNode 在列表中最小,当前 client 获取锁 return; } else { // 获得排名比 currentNode 前 1 位的节点 this.waitPath = "/" + rootNode + "/" + children.get(indexOf - 1); // 在 waitPath 上注册**, 当 waitPath 被删除时,zookeeper 会回调**的 process 方法 zooKeeper.getData(waitPath, true, new Stat()); waitLatch.await(); return; } } } // 释放锁 public void unZkLock() { try { zooKeeper.delete(this.currentNode, -1); } catch (InterruptedException | KeeperException e) { e.printStackTrace(); } } }
 



测试代码

public class DistributedLockTest {
 
 public static void main(String[] args) throws InterruptedException, IOException, KeeperException {
  // 创建分布式锁 1
  final DistributedLock lock1 = new DistributedLock();
  // 创建分布式锁 2
  final DistributedLock lock2 = new DistributedLock();
 
  new Thread(new Runnable() {
   @Override
   public void run() {
    try {
     lock1.zkLock();
     System.out.println("线程 1 获取锁");
     Thread.sleep(5 * 1000);
     lock1.unZkLock();
     System.out.println("线程 1 释放锁");
    } catch (Exception e) {
     e.printStackTrace();
    }
   }
  }).start();
 
  new Thread(new Runnable() {
   @Override
   public void run() {
    try {
     lock2.zkLock();
     System.out.println("线程 2 获取锁");
     Thread.sleep(5 * 1000);
     lock2.unZkLock();
     System.out.println("线程 2 释放锁");
    } catch (Exception e) {
     e.printStackTrace();
    }
   }
  }).start();
 }
}


输出信息

线程 1 获取到锁了!
线程 1 再次获取到锁了!
休息一下!
线程 1 释放锁了!
线程 1 释放锁了!
线程 2 获取到锁了!
线程 2 再次获取到锁了!
休息一下!
线程 2 释放锁了!
线程 2 释放锁了!
解决方案



在上面获取 Zookeeper 连接的代码中自定义 ZKClientConfig 配置信息,将 ENABLE_CLIENT_SASL_KEY 改成 false。

 ZKClientConfig config = new ZKClientConfig();
 config.setProperty(ZKClientConfig.ENABLE_CLIENT_SASL_KEY, "false");
 // 获取连接
 zooKeeper = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
  @Override
  public void process(WatchedEvent event) {
   // 连接建立时, 打开 latch, 唤醒 wait 在该 latch 上的线程
   if (event.getState() == Event.KeeperState.SyncConnected) {
    connectLatch.countDown();
   }
   // 发生了 waitPath 的删除事件
   if (event.getType() == Event.EventType.NodeDeleted && event.getPath().equals(waitPath)) {
    waitLatch.countDown();
   }
  }
 }, config);



Curator 案例

导入 POM 文件

<dependency>
 <groupId>org.apache.curator</groupId>
 <artifactId>curator-framework</artifactId>
 <version>4.3.0</version>
</dependency>
<dependency>
 <groupId>org.apache.curator</groupId>
 <artifactId>curator-recipes</artifactId>
 <version>4.3.0</version>
</dependency>
<dependency>
 <groupId>org.apache.curator</groupId>
 <artifactId>curator-client</artifactId>
 <version>4.3.0</version>
</dependency>


实战代码

public class CuratorLockTest {
 
    // 测试代码
 public static void main(String[] args) throws Exception {
  // 创建分布式锁1
  InterProcessMutex locks1 = new InterProcessMutex(getCuratorFramework(), "/locks");
 
  // 创建分布式锁2
  InterProcessMutex locks2 = new InterProcessMutex(getCuratorFramework(), "/locks");
 
  new Thread(new Runnable() {
   @Override
   public void run() {
    try {
     // 获取到锁
     locks1.acquire();
     System.out.println("线程 1 获取到锁了!");
     locks1.acquire();
     System.out.println("线程 1 再次获取到锁了!");
     System.out.println("休息一下!");
     Thread.sleep(5 * 1000);
     locks1.release();
     System.out.println("线程 1 释放锁了!");
     locks1.release();
     System.out.println("线程 1 释放锁了!");
    } catch (Exception e) {
     e.printStackTrace();
    }
   }
  }).start();
 
  new Thread(new Runnable() {
   @Override
   public void run() {
    try {
     // 获取到锁
     locks2.acquire();
     System.out.println("线程 2 获取到锁了!");
     locks2.acquire();
     System.out.println("线程 2 再次获取到锁了!");
     System.out.println("休息一下!");
     Thread.sleep(5 * 1000);
     locks2.release();
     System.out.println("线程 2 释放锁了!");
     locks2.release();
     System.out.println("线程 2 释放锁了!");
    } catch (Exception e) {
     e.printStackTrace();
    }
   }
  }).start();
 }
 
 // 创建连接
 private static CuratorFramework getCuratorFramework() throws Exception {
  ExponentialBackoffRetry backoffRetry = new ExponentialBackoffRetry(3000, 3);
  DefaultZookeeperFactory zookeeperFactory = new DefaultZookeeperFactory();
  CuratorFramework client = CuratorFrameworkFactory.builder().connectString("192.168.3.33:2181")
    .sessionTimeoutMs(2000).retryPolicy(backoffRetry)..build();
  client.start();
  System.out.println("客户端启动成功!");
  return client;
 }
 
}



输出信息

线程 2 获取到锁了!
线程 2 再次获取到锁了!
休息一下!
线程 2 释放锁了!
线程 2 释放锁了!
线程 1 获取到锁了!
线程 1 再次获取到锁了!
休息一下!
线程 1 释放锁了!
线程 1 释放锁了!
解决方案



使用 Curator 出现这个问题的方案还是和上面原生的是一样,因为其本质还是通过 Zookeeper 的客户端代码去进行一个连接。

创建自定义 ZookeeperFactory

public class DefaultZookeeperFactory implements ZookeeperFactory {
 
 @Override
 public ZooKeeper newZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean b) throws Exception {
  // 自定义 ZKClientConfig 配置
  ZKClientConfig config = new ZKClientConfig();
  config.setProperty(ZKClientConfig.ENABLE_CLIENT_SASL_KEY, "false");
  return new ZooKeeper(connectString, sessionTimeout, watcher, b, config);
 }


}
使用 CuratorFrameworkFactory 创建连接的时候导入自定义 ZookeeperFactory

CuratorFramework client = CuratorFrameworkFactory.builder().connectString("192.168.3.33:2181")
    .sessionTimeoutMs(2000).retryPolicy(backoffRetry).
      zookeeperFactory(zookeeperFactory).build();

小伙伴们有兴趣想了解内容和更多相关学习资料的请点赞收藏+评论转发+关注我,后面会有很多干货。




 
#面试复盘#
全部评论
感谢楼主啊,还贴心的上代码
点赞 回复 分享
发布于 2022-06-18 20:24

相关推荐

评论
点赞
3
分享

创作者周榜

更多
牛客网
牛客网在线编程
牛客网题解
牛客企业服务