Advertisement

Redis分布式锁

阅读量:

Redis分布式锁

Redis之分布式锁的实现方案 - 如何优雅地实现分布式锁(Java)

博客地址 https://blog.piaoruiqing.com/2019/05/19/redis分布式锁/

文章目录

  • Redis分布式锁

    • 关键词
    • 前言
      • 需求
      • 方案
  • 配置RedisTemplate模块

    • 配置RedisDependency
    • 基础层面的分布式锁机制开发
    • 早期版本中的spring-data-redis基于Redis实现了分布式锁机制,并对相关注意事项进行了详细说明

优化升级
* 优化升级 (支持自动解锁机制及重试逻辑的实现)
* * 分布式锁框架
* 实现Redis分布式锁封装
* 实现加锁功能模块

复制代码
  * 优化二 (自定义异常)
  * * 加锁方法实现

  * 优化三 (优雅地使用注解)
  * * 定义注解
* 切面实现

* 扩展
* 结语
* 参考文献

关键词

  • 分布式锁: 实现了对分布式系统各节点进行共享资源的协调访问。
    • Spring-Data-Redis: 通过将Redis功能封装到Spring框架中实现了配置简便性和功能扩展性。
    • Lua: 作为一种轻便高效的脚本语言,在Redis环境中运行。

前言

本文探讨了一种基于Redis平台的Java实现方案及其优化策略。该方案支持自动解锁机制、自定义异常处理能力、重试功能以及注Unlock接口。在实现过程中,我们采用更加简洁优雅的方式重构分布式锁的核心逻辑。

需求

  • 互斥性: 在分布式系统中, 每个锁仅可由单一线程持有.
    • 高可用: 避免出现死锁现象, 并且即使客户端出现故障也会在超时后自动释放该锁.
    • 非阻塞: 当获取到锁失败时系统将立即返回.

方案

Redis在性能方面表现卓越, 其指令对于分布式锁操作较为友好, 通过采用SET指令的方式进行加锁操作能够达到预期效果

SET

  • ETime seconds – Set the specified expiration time, lasting for a certain number of seconds.
  • PTimer milliseconds – Specify the expiration time to be maintained for a specific duration in milliseconds.
  • NX – 仅在键不存在时设置该键。
  • XX – 仅在键已经存在时设置该键。

实现

简单实现

该做法采用条件赋值策略。Redis采用原子操作机制, 因此在单独执行set命令时无需担心由于并发引发的异常问题。

具体代码实现如下: (spring-data-redis:2.1.6)

依赖引入

复制代码
    <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
    <version>2.1.4.RELEASE</version>
    </dependency>

配置RedisTemplate

复制代码
    @Bean
    @ConditionalOnMissingBean
    public StringRedisTemplate stringRedisTemplate(RedisConnectionFactory factory) {
    
    StringRedisSerializer keySerializer = new StringRedisSerializer();
    RedisSerializer<?> serializer = new StringRedisSerializer();
    StringRedisTemplate template = new StringRedisTemplate();
    template.setConnectionFactory(factory);
    template.setKeySerializer(keySerializer);
    template.setHashKeySerializer(keySerializer);
    template.setValueSerializer(serializer);
    template.setHashValueSerializer(serializer);
    template.afterPropertiesSet();
    return template;
    }

简单的分布式锁实现

复制代码
    /** * try lock
     * @author piaoruiqing
     * * @param key       lock key
     * @param value     value
     * @param timeout   timeout
     * @param unit  	time unit
     * @return 
     */
    public Boolean tryLock(String key, String value, long timeout, TimeUnit unit) {      
    
    	return redisTemplate.opsForValue().setIfAbsent(key, value, timeout, unit);
    }

以上代码即完成了一个简单的分布式锁功能:

其中redisTemplate.opsForValue().setIfAbsent(key, value, timeout, unit); 即表示该操作用于执行Redis命令

复制代码
    redis> set dlock:test-try-lock a EX 10 NX
    OK
    redis> set dlock:test-try-lock a EX 10 NX
    null

早期版本spring-data-redis分布式锁实现及注意事项

该方法自2.1版本起被引入。早期版本中无法指定key的过期时间。若先调用该方法并随后修改key的过期时间,则可能导致系统出现死锁问题。因此,在旧版本中通常需要采用其他方式来实现此功能。例如,在spring-data-redis:1.8.20版本中

复制代码
    /** * try lock
     * @author piaoruiqing
     * * @param key       lock key
     * @param value     value
     * @param timeout   timeout
     * @param unit  	time unit
     * @return 
     */
    public Boolean tryLock(String key, String value, long timeout, TimeUnit unit) {
    
    return redisTemplate.execute(new RedisCallback<Boolean>() {
        @Override
        public Boolean doInRedis(RedisConnection connection) throws DataAccessException {
    
            JedisCommands commands = (JedisCommands)connection.getNativeConnection();
            String result = commands.set(key, value, "NX", "PX", unit.toMillis(timeout));
    
            return "OK".equals(result);
        }
    });
    }

spring-data-redis:1.8.20使用默认情况下采用Jenkins客户端;可以通过调用getNativeConnection方法直接执行Jenkins相关操作。两种版本的实现机制在实际应用中均能达到预期效果。

优化进阶

基于AOP实现分布式锁注解工具 - 不仅能用, 而且好用

优化一 (自动解锁及重试)

自动生成解lock操作及重复尝试机制: 上一节基于分布式锁的基本实现方案能够满足基本需求, 但仍有较大改进空间, 本小节将对自动生成解lock操作以及重复尝试机制进行优化提升

分布式锁抽象类

实现AutoCloseable接口, 可使用try-with-resource方便地完成自动解锁.

复制代码
    /** * distributed lock
     * @author piaoruiqing
     * * @since JDK 1.8
     */
    abstract public class DistributedLock implements AutoCloseable {
    
    private final Logger LOGGER = LoggerFactory.getLogger(getClass());
    
    /** * release lock
     * @author piaoruiqing
     */
    abstract public void release();
    
    /* * (non-Javadoc)
     * @see java.lang.AutoCloseable#close()
     */
    @Override
    public void close() throws Exception {
    
        LOGGER.debug("distributed lock close , {}", this.toString());
    
        this.unlock();
    }
    }
封装Redis分布式锁

它是Redis分布式锁的一个抽象实现,并继承了DistributedLock的同时实现了unlock接口.

复制代码
    /** * redis distributed lock
     * * @author piaoruiqing
     * @date: 2019/01/12 23:20
     * * @since JDK 1.8
     */
    public class RedisDistributedLock extends DistributedLock {
    
    private RedisOperations<String, String> operations;
    private String key;
    private String value;
    
    private static final String COMPARE_AND_DELETE =		// (一)
        "if redis.call('get',KEYS[1]) == ARGV[1]\n" +
        "then\n" +
        "    return redis.call('del',KEYS[1])\n" +
        "else\n" +
        "    return 0\n" +
        "end";
    
    /** * @param operations
     * @param key
     * @param value
     */
    public RedisDistributedLock(RedisOperations<String, String> operations, String key, String value) {
        this.operations = operations;
        this.key = key;
        this.value = value;
    }
    /* * (non-Javadoc)
     * @see com.piaoruiqing.demo.distributed.lock.DistributedLock#release()
     */
    @Override
    public void release() {									// (二)
        List<String> keys = Collections.singletonList(key);
        operations.execute(new DefaultRedisScript<String>(COMPARE_AND_DELETE), keys, value);
    }
    	/* * (non-Javadoc)
     * @see java.lang.Object#toString()
     */
    @Override
    public String toString() {
        return "RedisDistributedLock [key=" + key + ", value=" + value + "]";
    }
    }
  • (一): 利用Lua脚本实现解锁功能,在锁定过程中将对比锁的值与删除操作整合为单一操作流程以确保操作过程中的完整性与安全性. 其核心思想在于避免误删他人持有的锁.
    例如: 在A线程的方法尚未完成时系统发生超时状态,则后续会有B线程成功获取到相同的锁. 然而若在此时A线程完成方法后试图解除锁定却未比对对应的value值,则可能导致误删B线程持有的锁. 此后若有C线程试图加回锁定状态则会引发整个业务逻辑出现严重混乱.
    建议切勿过度依赖分布式机制而应从数据库层面采取相应的措施如引入唯一键约束或采用事务管理等手段以确保数据完整性.

  • (二): 通过RedisOperations库来运行Lua脚本以执行解锁操作.

  • 可参阅redis官方文档

加锁方法实现
复制代码
    /** * @author piaoruiqing
     * @param key           lock key
     * @param timeout       timeout
     * @param retries       number of retries
     * @param waitingTime   retry interval
     * @return
     * @throws InterruptedException
     */
    public DistributedLock acquire(String key, long timeout, int retries, long waitingTime) throws InterruptedException {
    final String value 
        = RandomStringUtils.randomAlphanumeric(4) + System.currentTimeMillis(); // (一)
    do {
        Boolean result 
            = stringRedisTemplate.opsForValue().setIfAbsent(key, value, timeout, TimeUnit.MILLISECONDS); // (二)
        if (result) {
            return new RedisDistributedLock(stringRedisTemplate, key, value);
        }
        if (retries > NumberUtils.INTEGER_ZERO) {
            TimeUnit.MILLISECONDS.sleep(waitingTime);
        }
        if(Thread.currentThread().isInterrupted()){
            break;
        }
    } while (retries-- > NumberUtils.INTEGER_ZERO);
    
    return null;
    }

(一): 锁值必须具有唯一性, 建议采用四位随机字符串配合时间戳作为锁值生成机制
注:基于UUID的随机字符串(UUID.randomUUID())在高并发场景中表现出较差的性能.

  • (二): 尝试加锁, 代码中是2.1版本的做法, 早起版本参考上一节的实现.

此代码已经可以满足自动解锁和重试的需求了, 使用方法:

复制代码
    // 根据key加锁, 超时时间10000ms, 重试2次, 重试间隔500ms
    try(DistributedLock lock = redisLockService.acquire(key, 10000, 2, 500);){
    // do something
    }

但还可以再优雅一点, 将模板代码封装起来, 可支持 Lambda 表达式:

复制代码
    /** * lock handler
     * @author piaoruiqing
     * * @since JDK 1.8
     */
    @FunctionalInterface				// (一)
    public interface LockHandler<T> {
    
    /** * the logic you want to execute
     * * @author piaoruiqing
     * * @return
     * @throws Throwable
     */
     T handle() throws Throwable;	// (二)
    }
  • (一): 创建函数式接口并将其业务逻辑封装到 Lambda 表达式中以实现代码的简洁明了。
    • (二): 在分布式锁机制中直接抛出错误更为合理的做法而非尝试在其中进行处理。

使用LockHandler完成加锁的实现:

复制代码
    public <T> T tryLock(String key, LockHandler<T> handler, long timeout, boolean autoUnlock, int retries, long waitingTime) throws Throwable {
    try (DistributedLock lock = this.acquire(key, timeout, retries, waitingTime);) {
        if (lock != null) {
            LOGGER.debug("get lock success, key: {}", key);
            return handler.handle();
        }
        LOGGER.debug("get lock fail, key: {}", key);
        return null;
    }
    }

此时可以通过比较优雅的方式使用分布式锁来完成编码:

复制代码
    @Test
    public void testTryLock() throws Throwable {
    final String key = "dlock:test-try-lock";
    AnyObject anyObject = redisLockService.tryLock(key, () -> {
        // do something
        return new AnyObject();
    }, 10000, true, 0, 0);
    }

[版权声明]
本文发表于 朴瑞卿的博客,非商业用途的转录权受到尊重, 转录时请保留文章原作者 朴瑞卿 及原始链接: http://blog.piaoruiqing.com. 如需进一步的授权或合作, 请与邮箱: piaoruiqing@gmail.com 联系.

优化二 (自定义异常)

在本方案中,默认情况下不支持自定义异常。然而,在实际应用中可能会遇到这样一种情况:当业务逻辑返回 NULL 值时,这种设计可能无法正确处理某些异常情况。因此,在这种情况下支持自定义异常或许是一个值得考虑的方向。然而,在极端情况下这种方法可能不再适用。

该方案具有显著的易用性,在原有代码的基础上增加了对onFailure参数的支持;当锁为空时将直接抛出异常。

加锁方法实现
复制代码
    public <T> T tryLock(String key, LockHandler<T> handler, long timeout, boolean autoUnlock, int retries, long waitingTime, Class<? extends RuntimeException> onFailure) throws Throwable {	// (一)
    try (DistributedLock lock = this.getLock(key, timeout, retries, waitingTime);) {
        if (lock != null) {
            LOGGER.debug("get lock success, key: {}", key);
            return handler.handle();
        }
        LOGGER.debug("get lock fail, key: {}", key);
        if (null != onFailure) {
            throw onFailure.newInstance();	// (二)
        }
        return null;
    }
    }
  • (一): Class<? extends RuntimeException>对onFailure的指定必须是RuntimeException及其子类之一。作者认为采用RuntimeException具有更高的可读性。如果需要处理其他异常情况,则建议采用统一的处理方式。
    • (二): 反射被用来解决对象级别的动态问题。

优化三 (优雅地使用注解)

结合APO优雅地使用注解完成分布式锁:

定义注解

为了减小篇幅折叠部分注释

复制代码
    /** * distributed lock
     * @author piaoruiqing
     * @date: 2019/01/12 23:15
     * * @since JDK 1.8
     */
    @Documented
    @Retention(RetentionPolicy.RUNTIME)
    @Target(ElementType.METHOD)
    public @interface DistributedLockable {
    
    /** timeout of the lock */
    long timeout() default 5L;
    
    /** time unit */
    TimeUnit unit() default TimeUnit.MILLISECONDS;
    
    /** number of retries */
    int retries() default 0;
    
    /** interval of each retry */
    long waitingTime() default 0L;
    
    /** key prefix */
    String prefix() default "";
    
    /** parameters that construct a key */
    String[] argNames() default {};
    
    /** construct a key with parameters */
    boolean argsAssociated() default true;
    
    /** whether unlock when completed */
    boolean autoUnlock() default true;
    
    /** throw an runtime exception while fail to get lock */
    Class<? extends RuntimeException> onFailure() default NoException.class;
    
    /** no exception */
    public static final class NoException extends RuntimeException {
    
        private static final long serialVersionUID = -7821936618527445658L;
    
    }
    }
  • timeout: 超时段时间
    • unit: 时间计量单位
    • retries: 重试总次数
    • waitingTime: 重试等待周期间歇期
    • prefix: key默认由包名、类名及方法名三部分构成
    • argNames: 参与生成key的参数集合

注释可用于定义方法。需要注意的是,在本研究中采用Spring的AOP技术进行注解实现。因此,在对象内部各方法之间的调用将不再有效。

切面实现
复制代码
    /** * distributed lock aspect
     * @author piaoruiqing
     * @date: 2019/02/02 22:35
     * * @since JDK 1.8
     */
    @Aspect
    @Order(10)	// (一)
    public class DistributedLockableAspect implements KeyGenerator {	// (二)
    
    private final Logger LOGGER = LoggerFactory.getLogger(getClass());
    @Resource
    private RedisLockClient redisLockClient;
    /** * {@link DistributedLockable}
     * @author piaoruiqing
     */
    @Pointcut(value = "execution(* *(..)) && @annotation(com.github.piaoruiqing.dlock.annotation.DistributedLockable)")
    public void distributedLockable() {}
    
    /** * @author piaoruiqing
     * * @param joinPoint
     * @param lockable
     * @return
     * @throws Throwable
     */
    @Around(value = "distributedLockable() && @annotation(lockable)")
    public Object around(ProceedingJoinPoint joinPoint, DistributedLockable lockable) throws Throwable {
        long start = System.nanoTime();
        final String key = this.generate(joinPoint, lockable.prefix(), lockable.argNames(), lockable.argsAssociated()).toString();
        Object result = redisLockClient.tryLock(
            key, () -> {
                return joinPoint.proceed();
            }, 
            lockable.unit().toMillis(lockable.timeout()), lockable.autoUnlock(), 
            lockable.retries(), lockable.unit().toMillis(lockable.waitingTime()),
            lockable.onFailure()
        );
        long end = System.nanoTime();
        LOGGER.debug("distributed lockable cost: {} ns", end - start);
        return result;
    }
    }
  • (一): 切面相关的优先级
    • (二): KeyGenerator采用了定制化的key生成策略,并以前缀加参数名加参数值的形式作为键值;具体机制参考源码.

此时可通过注解的形式实现分布式锁的使用, 该方法对代码的影响较小且具简明的特点

复制代码
    @DistributedLockable(
    argNames = {"anyObject.id", "anyObject.name", "param1"},
    timeout = 20, unit = TimeUnit.SECONDS, 
    onFailure = RuntimeException.class
    )
    public Long distributedLockableOnFaiFailure(AnyObject anyObject, String param1, Object param2, Long timeout) {
    try {
        TimeUnit.SECONDS.sleep(timeout);
        LOGGER.info("distributed-lockable: " + System.nanoTime());
    } catch (InterruptedException e) {
    }
    return System.nanoTime();
    }

扩展

分布式锁的实现方式多样, 可根据不同场景和需求选择不同介质.

Redis:在性能上表现出色,在处理分布式锁时表现出良好的兼容性,并且实现简便,在大多数应用场景中表现良好。
Zookeeper:具备较高的可靠性,在处理分布式锁时同样表现出良好的兼容性,尽管其实现较为复杂但在实际应用中已有成熟的解决方案可用。
数据库:具有简单的实现方式,并可采用乐观lock或悲观lock策略来保证一致性,在大多数情况下其性能尚可接受;不过,在高并发场景中不建议使用该数据库。

结语

本文探讨了Redis分布式锁在Java语言中的具体实现方案,并详细完成了自动解密、自定义异常处理、重试机制以及注销锁定等功能。源码可见于地址.

本实现还有诸多可以优化之处, 如:

  • 重入锁机制的实现
    • 将重试机制转换为订阅Redis事件模式: 通过订阅Redis事件能够显著提升锁性能, 具体来说, 在代码中替换原有的 sleep 操作为 wait+notifyAll 协程。

篇幅有限, 后续再行阐述.

参考文献

[版权声明]
本文发布于 朴瑞卿的博客,可进行非商业性质的文章转载。然而,在任何情况下都不可更改地包含文章来源信息及其链接:http://blog.piaoruiqing.com。如需就授权事宜进行商讨或合作意向,请联系邮箱: [piaoruiqing@gmail.com]。

全部评论 (0)

还没有任何评论哟~