基于SpringBoot与Redis实现分布式锁

Docker运行Redis

拉取最近版本的Redis镜像:

docker pull redis

启动容器:

docker run -d --name redis -p 6379:6379 redis:latest

进入容器内部,测试存储:

# docker exec -it redis redis-cli
127.0.0.1:6379> set name qcy
OK
127.0.0.1:6379> get name
"qcy"

到这里,说明我们启动成功了。


实现要求

实现的分布式锁必须具有以下特点:

  • 互斥:无论在什么时刻,最多只有一个客户端拥有锁
  • 锁拥有超时时间。否则加锁的客户端突然宕机,来不及释放锁,会导致所有客户端获取锁失败。
  • 在不超时的情况下,客户端只能释放掉自己申请的锁。有这样的一个例子:客户端a申请了锁,设置超时为5秒,可是其运行同步代码超过了5秒。此时客户端b请求锁时,由于该锁超时被redis自动删除,于是客户端b申请锁成功。接着客户端a的同步代码运行结束,这样就会把客户端b申请的锁给释放掉,这样可能会引起数据错乱。

实现原理

如何满足互斥与具有超时时间?

redis是单线程模型,指令是一条一条执行的,因此不存在并发问题。

我们使用以下的指令,来往reids中存一个过期时间为5秒的键值:

set name qcy nx ex 5

其中的name为key,qcy为value,nx代表不存在此key则存储,并且返回OK,若存在则会返回null,ex 5代表过期时间为5秒。

127.0.0.1:6379> set name qcy nx ex 5
OK
127.0.0.1:6379> set name qcy nx ex 5
(nil)

当我们在过期时间内重复存储时,redis会提示操作失败。间隔5秒以后,redis会提示插入成功。

释放锁时,只要删除对应的key即可:

127.0.0.1:6379> set name qcy nx ex 5
OK
127.0.0.1:6379> del name
(integer) 1

那么如何释放掉自己申请的锁?

key为锁名称,value可以为申请锁的客户端的唯一标识。那么在释放锁时,不仅仅比较key是否相同,还要比较value是否为申请锁的客户端的唯一标识,且这两次比较必须是原子操作

为什么必须是原子操作?

假设有这样的一种场景:客户端a申请了名称为name,值为a的锁,接着客户端a想要释放锁,查询出key=name,value=a的键值对,紧接着锁过期,客户端b申请到了锁,将值变为b。可是客户端a内存中的值仍然为a,于是释放掉了客户端b申请到的锁。

利用以上的原理,来实现我们的分布式锁。


代码实现

代码使用最简单的配置

工程目录结构:

pom依赖:

<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>1.18.6</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> </dependencies>

其中,SpringBoot与Redis集成需要使用到spring-boot-starter-data-redis,输出日志使用到lombok,测试时用到spring-boot-starter-test

配置如下:

spring:
  redis:
    host: localhost
    port: 6379

logging:
  level:
    root: info

最核心的RedisLock类

package com.yang.redislock1; import lombok.extern.slf4j.Slf4j; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.data.redis.core.script.DefaultRedisScript; import org.springframework.data.redis.core.script.RedisScript; import org.springframework.stereotype.Component; import javax.annotation.Resource; import java.util.Arrays; import java.util.concurrent.TimeUnit; /**
 * @author qcy
 * @create 2020/08/28 14:21:35
 */ @Slf4j @Component public class RedisLock { @Resource StringRedisTemplate template; public boolean tryLock(String key, String value, int expireTime, TimeUnit timeUnit) {
        Boolean flag = template.opsForValue().setIfAbsent(key, value, expireTime, timeUnit); if (flag == null || !flag) {
            log.info("申请锁(" + key + "," + value + ")失败"); return false;
        }
        log.error("申请锁(" + key + "," + value + ")成功"); return true;
    } public void unLock(String key, String value) {
        String script = "if redis.call('get', KEYS[1]) == KEYS[2] then return redis.call('del', KEYS[1]) else return 0 end";
        RedisScript<Long> redisScript = new DefaultRedisScript<>(script, Long.class);
        Long result = template.execute(redisScript, Arrays.asList(key, value)); if (result == null || result == 0) {
            log.info("释放锁(" + key + "," + value + ")失败,该锁不存在或锁已经过期");
        } else {
            log.info("释放锁(" + key + "," + value + ")成功");
        }
    }

}

其中unLock方法中的lua脚本是一个原子操作,此时这里的KEYS[1]是key,KEYS[2]是value。

if redis.call('get', KEYS[1]) == KEYS[2] then return redis.call('del', KEYS[1]) else return 0 end

该段脚本的意思是,首先获取锁的value,判断是否等于期待的value,满足的话,则删除该锁,并返回1;否则直接返回0。

为什么不直接删除该key?

因为需要满足实现要求中的第三点:在锁不过期的情况,只能释放掉自己申请的锁。

锁的名称都一样,因此将客户端的唯一标识存进了其value中,那么删除前,需要判断是否是自己创建的锁。

测试类:

第一种情况,两个客户端都正常的创建、释放锁。

package com.yang.redislock1; import lombok.extern.slf4j.Slf4j; import org.junit.jupiter.api.Test; import org.springframework.boot.test.context.SpringBootTest; import javax.annotation.Resource; import java.util.concurrent.TimeUnit; @Slf4j @SpringBootTest class Redislock1ApplicationTests { @Resource RedisLock redisLock; public static final String LOCK_NAME = "name"; public static final String CLIENT_A = "a"; public static final String CLIENT_B = "b"; public static final int EXPIRE_TIME = 5; @Test public void test1() { //客户端a boolean lockResultA = redisLock.tryLock(LOCK_NAME, CLIENT_A, EXPIRE_TIME, TimeUnit.SECONDS); if (lockResultA) { try { //模拟客户端a的操作耗时 Thread.sleep(2 * 1000);
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                redisLock.unLock(LOCK_NAME, CLIENT_A);
            }
        } //客户端b boolean lockResultB = redisLock.tryLock(LOCK_NAME, CLIENT_B, EXPIRE_TIME, TimeUnit.SECONDS); if (lockResultB) { try { //模拟客户端b的操作耗时 Thread.sleep(2 * 1000);
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                redisLock.unLock(LOCK_NAME, CLIENT_B);
            }
        }
    }

}

运行结果如下:

第二种情况,客户端a申请的锁超时

只需要把上述代码中,客户端a的操作耗时改为6秒即可。

输出如下:

第三种情况,假设客户端a不释放锁

修改测试类代码:

//客户端a boolean lockResultA = redisLock.tryLock(LOCK_NAME, CLIENT_A, EXPIRE_TIME, TimeUnit.SECONDS); if (lockResultA) { try { //模拟客户端a的操作耗时 Thread.sleep(2 * 1000);
            } catch (Exception e) {
                e.printStackTrace();
            } finally { //redisLock.unLock(LOCK_NAME, CLIENT_A); log.info("客户端a不释放锁");
            }
        } //客户端b boolean lockResultB = redisLock.tryLock(LOCK_NAME, CLIENT_B, EXPIRE_TIME, TimeUnit.SECONDS); if (lockResultB) { try { //模拟客户端b的操作耗时 Thread.sleep(2 * 1000);
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                redisLock.unLock(LOCK_NAME, CLIENT_B);
            }
        }
    }

输出如下:


在集群模式下的问题

以上的场景,在单机版的redis中问题不大,可是在redis集群中,可能存在问题。

客户端a在redis集群中节点1上申请到了锁,然后执行业务逻辑,可是这个时候节点1的锁还没同步到节点2上,节点1突然挂了。这个时候客户端b在节点2上申请锁,立即就成功了。这个时候就出现了一把锁同时被多个客户端持有的情况。

解决此类问题,可以借助RedLock。其大致原理就是,客户端a请求节点1,节点1加锁前向集群中其他超过一半的节点发送加锁请求,只要过半的节点反馈加锁成功,节点1就认为加锁成功。

接着客户端a释放锁时,请求节点1,节点1等待数据同步成功后,向所有其他节点发起释放锁的请求,接着节点1认为释放锁成功。

当然,RedLock算法还会考虑更多的细节问题,这里就不细讲了。


全部评论
更多内容请移步我的博客https://blog.csdn.net/qq_33591903
点赞 回复
分享
发布于 2020-09-11 10:41

相关推荐

点赞 收藏 评论
分享
牛客网
牛客企业服务