基本命令主要有:
setnx(SET If Not Exists):当且仅当 Key 不存在时,则可以设置,否则不做任何动作。
setex:可以设置超时时间
其原理为:通过 SETNX 设置 Key-Value 来获得锁,随即进入死循环,每次循环判断,如果存在 Key 则继续循环,如果不存在 Key,则跳出循环,当前任务执行完成后,删除 Key 以释放锁。
这种方式可能会导致死锁,为了避免这种情况,需要设置超时时间。
具体的实现步骤
1、 Maven 工程下 pom.xml 依赖
<parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.0.2.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> <java.version>1.8</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <!-- 开启web--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <!-- redis--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency> </dependencies>
2、配置文件 application.yml
server: port: 8080 spring: redis: host: localhost port: 6379
3、创建全局锁类 Lock.java
/** * 全局锁,包括锁的名称 */ public class Lock { private String name; private String value; public Lock(String name, String value) { this.name = name; this.value = value; } public String getName() { return name; } public String getValue() { return value; } }
4、分布式锁类
DistributedLockHandler.java
@Component public class DistributedLockHandler { private static final Logger logger = LoggerFactory.getLogger(DistributedLockHandler.class); private final static long LOCK_EXPIRE = 30 * 1000L;//单个业务持有锁的时间30s,防止死锁 private final static long LOCK_TRY_INTERVAL = 30L;//默认30ms尝试一次 private final static long LOCK_TRY_TIMEOUT = 20 * 1000L;//默认尝试20s @Autowired private StringRedisTemplate template; /** * 尝试获取全局锁 * * @param lock 锁的名称 * @return true 获取成功,false获取失败 */ public boolean tryLock(Lock lock) { return getLock(lock, LOCK_TRY_TIMEOUT, LOCK_TRY_INTERVAL, LOCK_EXPIRE); } /** * 尝试获取全局锁 * * @param lock 锁的名称 * @param timeout 获取超时时间 单位ms * @return true 获取成功,false获取失败 */ public boolean tryLock(Lock lock, long timeout) { return getLock(lock, timeout, LOCK_TRY_INTERVAL, LOCK_EXPIRE); } /** * 尝试获取全局锁 * * @param lock 锁的名称 * @param timeout 获取锁的超时时间 * @param tryInterval 多少毫秒尝试获取一次 * @return true 获取成功,false获取失败 */ public boolean tryLock(Lock lock, long timeout, long tryInterval) { return getLock(lock, timeout, tryInterval, LOCK_EXPIRE); } /** * 尝试获取全局锁 * * @param lock 锁的名称 * @param timeout 获取锁的超时时间 * @param tryInterval 多少毫秒尝试获取一次 * @param lockExpireTime 锁的过期 * @return true 获取成功,false获取失败 */ public boolean tryLock(Lock lock, long timeout, long tryInterval, long lockExpireTime) { return getLock(lock, timeout, tryInterval, lockExpireTime); } /** * 操作redis获取全局锁 * * @param lock 锁的名称 * @param timeout 获取的超时时间 * @param tryInterval 多少ms尝试一次 * @param lockExpireTime 获取成功后锁的过期时间 * @return true 获取成功,false获取失败 */ public boolean getLock(Lock lock, long timeout, long tryInterval, long lockExpireTime) { try { if (StringUtils.isEmpty(lock.getName()) || StringUtils.isEmpty(lock.getValue())) { return false; } long startTime = System.currentTimeMillis(); do{ if (!template.hasKey(lock.getName())) { ValueOperations<String, String> ops = template.opsForValue(); ops.set(lock.getName(), lock.getValue(), lockExpireTime, TimeUnit.MILLISECONDS); return true; } else {//存在锁 logger.debug("lock is exist!!!"); } if (System.currentTimeMillis() - startTime > timeout) {//尝试超过了设定值之后直接跳出循环 return false; } Thread.sleep(tryInterval); } while (template.hasKey(lock.getName())) ; } catch (InterruptedException e) { logger.error(e.getMessage()); return false; } return false; } /** * 释放锁 */ public void releaseLock(Lock lock) { if (!StringUtils.isEmpty(lock.getName())) { template.delete(lock.getName()); } } }
5、创建 TestController 来测试分布式锁
@RestController public class TestController { @Autowired private DistributedLockHandler distributedLockHandler; @RequestMapping("test") public String index(){ Lock lock=new Lock("lynn","min"); if(distributedLockHandler.tryLock(lock)){ try { //为了演示锁的效果,这里睡眠5000毫秒 System.out.println("执行方法"); Thread.sleep(5000); }catch (Exception e){ e.printStackTrace(); } distributedLockHandler.releaseLock(lock); } return "hello world!"; } }
结果:
启动 Application.java,连续访问两次浏览器:
http://localhost:8080/test,控制台可以发现先打印了一次“执行方法”,说明后面一个线程被锁住了,5秒后又再次打印了“执行方法”,说明锁被成功释放。
通过这种方式创建的分布式锁存在以下问题:
1、高并发的情况下,如果两个线程同时进入循环,可能导致加锁失败。
2、SETNX 是一个耗时操作,因为它需要判断 Key 是否存在,因为会存在性能问题。
3、客户端A从master获取到锁, 在master将锁同步到slave之前,master宕掉了。 slave节点被晋级为master节点,客户端B取得了同一个资源被客户端A已经获取到的另外一个锁。安全失效!
Redlock算法假设我们有N(假设5)个Redis master实例,所有节点相互独立,并且业务系统也是单纯的调用,并没有什么其他的类似消息重发之类的辅助系统。下面来模拟一下算法: 1.客户端获取服务器当前的的时间t0,毫秒数。 2.使用相同的key和value依次向5个实例获取锁。客户端在获取锁的时候自身设置一个远小于业务锁需要的持续时间的超时时间。举个例子,假设锁需要10秒,超时时间可以设置成比如5-50毫秒。这个避免某个Redis本身已经挂了,但是客户端一直在尝试获取锁的情况。超时了之后就直接跳到下一个节点。 3.客户端通过当前时间(t1)减去t0,计算获取锁所消耗的时间t2(=t1-t0)。只有t2小于锁的业务有效时间(也就是第二步的10秒),并且,客户端在至少3(5/2+1)台上获取到锁我们才认为锁获取成功。 4.如果锁已经获取,那么锁的业务有效时间为10s-t2。 5.如果客户端没有获取到锁,可能是没有在大于等于N/2+1个实例上获取锁,也可能是有效时间(10s-t2)为负数,我们就尝试去释放锁,即使是并没有在那个节点上获取到。锁的释放释放比较简单,直接删除所有实例上对应的key就好
1、pom.xml 较redis利用 setnx 和 setex 实现追加依赖
<dependency> <groupId>org.redisson</groupId> <artifactId>redisson</artifactId> <version>3.7.0</version> </dependency>
2、需要添加的类
2.1、获取锁后需要处理的逻辑接口
/** * 获取锁后需要处理的逻辑 */ public interface AquiredLockWorker<T> { T invokeAfterLockAquire() throws Exception; }
2.2、获取锁管理类
/** * 获取锁管理类 */ public interface DistributedLocker { /** * 获取锁 * @param resourceName 锁的名称 * @param worker 获取锁后的处理类 * @param <T> * @return 处理完具体的业务逻辑要返回的数据 * @throws UnableToAquireLockException * @throws Exception */ <T> T lock(String resourceName, AquiredLockWorker<T> worker) throws UnableToAquireLockException, Exception; <T> T lock(String resourceName, AquiredLockWorker<T> worker, int lockTime) throws UnableToAquireLockException, Exception; }
2.3、异常类
/** * 异常类 */ public class UnableToAquireLockException extends RuntimeException { public UnableToAquireLockException() { } public UnableToAquireLockException(String message) { super(message); } public UnableToAquireLockException(String message, Throwable cause) { super(message, cause); } }
2.4、获取RedissonClient连接类
/** * 获取RedissonClient连接类 */ @Component public class RedissonConnector { RedissonClient redisson; @PostConstruct public void init(){ redisson = Redisson.create(); } public RedissonClient getClient(){ return redisson; } }
2.5、锁实现
@Component public class RedisLocker implements DistributedLocker{ private final static String LOCKER_PREFIX = "lock:"; @Autowired RedissonConnector redissonConnector; @Override public <T> T lock(String resourceName, AquiredLockWorker<T> worker) throws InterruptedException, UnableToAquireLockException, Exception { return lock(resourceName, worker, 100); } @Override public <T> T lock(String resourceName, AquiredLockWorker<T> worker, int lockTime) throws UnableToAquireLockException, Exception { RedissonClient redisson= redissonConnector.getClient(); RLock lock = redisson.getLock(LOCKER_PREFIX + resourceName); // Wait for 100 seconds seconds and automatically unlock it after lockTime seconds boolean success = lock.tryLock(100, lockTime, TimeUnit.SECONDS); if (success) { try { return worker.invokeAfterLockAquire(); } finally { lock.unlock(); } } throw new UnableToAquireLockException(); } }
2.6、测试
@RestController public class HelloController { @Autowired private DistributedLocker distributedLocker; @RequestMapping("index") public String index()throws Exception{ distributedLocker.lock("test",new AquiredLockWorker<Object>() { @Override public Object invokeAfterLockAquire() { try { System.out.println("执行方法!"); Thread.sleep(5000); }catch (Exception e){ e.printStackTrace(); } return null; } }); return "hello world!"; } }