Advertisement

分布式锁用 Redis 好,还是 ZooKeeper 好?

阅读量:

速读摘要

明显不可行。设想一个场景:张三正在厕所内一直处于未释放状态,并持续蹲坐。这种情况下外部人员试图进入都会遇到阻碍——这让人感到非常无奈和愤怒。例如,在某个业务流程中若长时间运行会导致相关机制失效并被重新设置。此时若其他线程可能持有新的锁权限,则当前操作可能导致误删行为的发生——这显然是不被允许的。第三个观察者(第三个线程)进入发现当前节点已被占满时会触发某种机制:由于系统设计原则导致前一个节点会被指定为watcher以监控后续操作。当第一个节点完成任务后及时释放资源并删除相关数据后会由下一个节点接管该锁权限

对于锁的相关知识,大家都有所了解。例如Synchronized和ReentrantLock这两种常用机制,在单进程中能够实现多个线程安全地共享同一个资源。然而随着互联网项目的快速发展以及越来越多转向集群部署方案的趋势下,在分布式系统环境中这两种传统的锁机制已经难以满足需求。

来两张图举例说明下,本地锁的情况下:

在这里插入图片描述
  • 分布式锁情况下:
在这里插入图片描述

从其核心思想上讲, 就是一种全部都需要的思想, 所有服务都在一个统一的地方进行取锁操作, 必须等到取得锁后才能继续执行下去。

说完思想,下面来说一下具体的实现。

为了实现分布式锁机制,在Redis中设置了SETNX命令。其功能是:如果指定键不存在,则设置其值;如果键已存在,则不进行设置。类似于张三试图进入有锁的厕所时的情景:当张三发现门关着时不进去;但如果他看到门开着,则允许进入。可以看出,在Redis中使用SETNX命令进行两次操作时会出现不同的结果:第一次set操作返回1(表明操作成功完成),第二次则返回0(表示set失败)。这是因为第二次set操作发现目标键已经存在而不再执行赋值行为。

在这里插入图片描述

下面介绍几种不可行的情况:

1, 单纯依靠setnx这个指令是否足够?显然是不够的哦!举个例子来说吧, 张三被困在厕所里, 但他始终未给予适当的释放, 这样一来的话, 外面的人都想冲进去, 却都被堵在外面了, 想对张三进行粗暴处理呀!

2,假设已经进行了加锁,但是因为宕机或者出现异常未释放锁,就造成了所谓的“死锁 ”。聪明的你们肯定早都想到了,为它设置过期时间不就好了,可以 SETEX key seconds value 命令,为指定 key 设置过期时间,单位为秒。但这样又有另一个问题,我刚加锁成功,还没设置过期时间,Redis 宕机了不就又死锁了 ,所以说要保证原子性 吖,要么一起成功,要么一起失败。当然我们能想到的 Redis 肯定早都为你实现好了,在 Redis 2.8 的版本后,Redis 就为我们提供了一条组合命令 SET key value ex seconds nx,加锁的同时设置过期时间。
就好比是公司规定每人最多只能在厕所呆 2 分钟,不管释放没释放完都得出来,这样就解决了“死锁”问题。,
但这样就没有问题了吗?怎么可能。

再设想另一种情况:厕所门肯定是只能从内部开启的。当张三上完厕所后立即锁上门时,在外面的人可能会误以为张三是依然在里面。然而,在等待了3分钟之后,如果外面的人直接撬开门进入查看发现里面却是张四,则会陷入尴尬境地。换言之,在使用Redis时也面临着类似问题:例如一个业务流程所需的时间非常长,在这段时间内原本由某个锁控制的权限已经失效(lock has expired),此时新用户的锁已经成功设置完毕。然而在业务处理完成后直接释放锁头可能导致原有授权者的权限被错误地删除(lock deletion by others),这种情况显然存在严重缺陷(mess up)。因此,在对锁进行加权操作时建议引入一个随机值作为唯一标识,在进行锁删除操作时需要与该标识进行比对确认只有真正拥有该锁的操作者才能释放对应的权限(delete lock only by owner)

多说无益,烦人,直接上代码:

//基于 jedis 和 lua 脚本来实现

复制代码
    privatestaticfinal String LOCK_SUCCESS = "OK";
    privatestaticfinal Long RELEASE_SUCCESS = 1L;
    privatestaticfinal String SET_IF_NOT_EXIST = "NX";
    privatestaticfinal String SET_WITH_EXPIRE_TIME = "PX";
    
    @Override
    public String acquire() {
    try {
        // 获取锁的超时时间,超过这个时间则放弃获取锁
        long end = System.currentTimeMillis() + acquireTimeout;
        // 随机生成一个 value
        String requireToken = UUID.randomUUID().toString();
        while (System.currentTimeMillis() < end) {
            String result = jedis
                .set(lockKey, requireToken, SET_IF_NOT_EXIST, SET_WITH_EXPIRE_TIME, expireTime);
            if (LOCK_SUCCESS.equals(result)) {
                return requireToken;
            }
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    } catch (Exception e) {
        log.error("acquire lock due to error", e);
    }
    
    returnnull;
    }
    
    @Override
    public boolean release(String identify) {
    if (identify == null) {
        returnfalse;
    }
    //通过 lua 脚本进行比对删除操作,保证原子性
    String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
    Object result = new Object();
    try {
        result = jedis.eval(script, Collections.singletonList(lockKey),
            Collections.singletonList(identify));
        if (RELEASE_SUCCESS.equals(result)) {
            log.info("release lock success, requestToken:{}", identify);
            returntrue;
        }
    } catch (Exception e) {
        log.error("release lock due to error", e);
    } finally {
        if (jedis != null) {
            jedis.close();
        }
    }
    
    log.info("release lock failed, requestToken:{}, result:{}", identify, result);
    returnfalse;}
复制代码
    思考:加锁和释放锁的原子性可以用 lua 脚本来保证,那锁的自动续期改如何实现呢?

Redisson

Redisson 从名称上看是 Redis 的一个扩展包。它本质上与 Redis 相似,并采用了加锁机制作为基础。然而,在功能实现上对 Redis 进行了更为深入的功能扩展和封装管理。通过这一工具包, 不仅能够方便地访问到一系列标准的分布式 Java 对象集合, 而且还可以自定义开发多种分布式服务方案。

Redisson采用Netty框架实现并支持NIO技术,在功能上不仅能够作为Redis底层驱动客户端使用,并且支持对Redis各种组态形式的连接功能;它还提供多种命令执行方式:同步发送、异步发送、异步流发送以及管道式发送等多种模式;此外还集成LUA脚本处理机制,并能够接收并处理 Redis 返回的各种结果类型;在此基础之上进一步融合了多种高级解决方案:将原生 Redis 数据结构Hash, List, Set, String, Geo, HyperLogLog等进行了封装为Java中大家最为熟悉的数据容器(Map, List, Set, Object Bucket, Geospatial Bucket等);同时在原有基础上新增分布式多值映射(Multimap)、本地缓存映射(LocalCachedMap)、有序集(SortedSet)等复杂数据结构;特别值得一提的是,在基础功能之上实现了更高阶的应用场景:例如分布式锁(Lock);更重要的是 Redisson并未在此停止发展,在分布式锁的基础上进一步完善了联锁机制(MultiLock)、读写锁(ReadWriteLock)、公平锁(Fair Lock)、红锁(RedLock)、信号量(Semaphore)以及可过期性信号量(PermitExpirableSemaphore)等关键组件;正是基于这些高阶应用方案的支持使得 Redisson 成为了构建分布式系统的重要工具。

在引入 Redisson 的依赖后,就可以直接进行调用:

复制代码
    <dependency>
    <groupId>org.redisson</groupId>
    <artifactId>redisson</artifactId>
    <version>3.13.4</version>
    </dependency>

先来一段 Redisson 的加锁代码:

复制代码
    private void test() {
    //分布式锁名  锁的粒度越细,性能越好
    RLock lock = redissonClient.getLock("test_lock");
    lock.lock();
    try {
        //具体业务......
    } finally {
        lock.unlock();
    }}

非常简单地应用JDK实现的ReentrantLock功能。此外还支持ReadWriteLock、Reentrant Lock、Fair Lock以及RedLock等多种锁类型。详细信息请参考官方文档。

Redisson 主要具备哪些核心优势?Redis 存储服务以其高效的高可用性和容灾能力著称,在 Redisson 基础之上实现了以下几大核心功能:其一,在没有显式配置的情况下,默认实现了一键式持续模式;其二,在业务运行时间过长时,默认情况下系统会在接近完成时重新生成新的 30秒锁周期;其三,在资源耗尽的情况下能够优雅地进行资源回收和清理

当锁定业务运行完成后无需当前进行续期操作,即使未手动触发解锁操作系统也会在30秒后自动清除该锁以避免死锁问题。

前面也提到了锁的自动续期,我们来看看 Redisson 是如何来实现的?

首先阐述我们主要涉及的内容是Redisson中的RLock机制,即可回滚锁的具体实现方式主要包括两种

复制代码
    // 最常见的使用方法

      lock.lock();
    // 加锁以后10秒钟自动解锁
    // 无需调用unlock方法手动解锁
    lock.lock(10, TimeUnit.SECONDS);

仅当一个无参数的方法时才实现锁的自动续期操作,在内部采用了"看门狗"机制,请允许我们深入探究一下源码结构

在这里插入图片描述

无论是否带有参数的函数(空参或带参),它们都会调用同一个名为lock的方法。若无参数会被视为时间-1,则会传递实际输入的时间值。

在这里插入图片描述

继续点进 scheduleExpirationRenewal 方法:

在这里插入图片描述

点进 renewExpiration 方法:

在这里插入图片描述

总结
当指定锁过期时间时,则会在该时间自动失效。若未指定,默认使用看门狗设置的30秒。一旦成功加粗占锁,则会启动一个定时任务,在每隔10秒时重新设置新的失效时间(直至该锁被解除为止)。因为这种方法避免了频繁续锁带来的性能开销(可以适当延长失效时长,并配合unlock操作)。小结:尽管lock()内置了自动续锁机制(但开发中还是推荐采用lock(time, timeUnit)方法更为高效)。

当业务完成时,系统会自动解密锁;如果出现长时间未完成的情况,在设置好的超时时间后进行报错处理;错误发生后,在指定的时间内重新获取锁资源。

复制代码
    public void test() {
    RLock lock = redissonClient.getLock("test_lock");
    lock.lock(30, TimeUnit.SECONDS);
    try {
        //.......具体业务
    } finally {
        //手动释放锁
        lock.unlock();
    }}

ZooKeeper

有很多人对分布式系统中的ZooKeeper有一定的了解,并将其用作注册中心的一种解决方案;然而,在除了作为注册中心之外的情况下,ZooKeeper也被广泛应用于实现分布式锁机制。

让我们了解ZooKeeper中如何生成指定路径下的节点。该命令采用create [-s] [-e] path [data]的形式。其中-s选项用于生成顺序类型的节点(如持久化ID),而-e选项则用于生成非永久性的临时节点。

在这里插入图片描述

通过这种方式就实现了父节点的创建,并且同时为每个父节点又生成了相应的子节点。该组合命令其名称意在表明它将所有输入项按顺序排列到一个临时存储区中。

在 ZooKeeper 中实现分布式锁的主要方法是建立短暂的一系列有序节点。而在这种机制中使用短暂的资源而不是永久性的资源是否更为高效?选择顺序节点而非其他类型的节点有什么原因?

除了上述之外,在 ZooKeeper 中如何获取节点信息?我们可以使用 zookeeper 提供的命令 'ls [-w] path' 来获取当前目录下的所有服务实例。其中 '-w' 参数用于启用监视模式。根目录 '/ ' 提供了对整个系统服务实例的列表。通过此列表可观察到已创建的服务实例。如果目标目录下包含特定名称的服务实例,则可进一步查询其子目录中的服务实例。

在这里插入图片描述

在ZooKeeper框架中对顺序节点增加了顺序控制功能。同时,在实现分布式锁的过程中,注册监听器也是一项关键的技术手段。

在这里插入图片描述

下面来看一下 ZooKeeper 实现分布式锁的主要流程:

复制代码
    * 当第一个线程进来时会去父节点上创建一个临时的顺序节点。
    * 第二个线程进来发现锁已经被持有了,就会为当前持有锁的节点注册一个 watcher 监听器。
    * 第三个线程进来发现锁已经被持有了,因为是顺序节点的缘故,就会为上一个节点去创建一个 watcher 监听器。
    * 当第一个线程释放锁后,删除节点,由它的下一个节点去占有锁。

到这里,各位经验丰富的小伙伴已经意识到顺序节点的优势所在。而非顺序节点的情况下,则会使得每一个新进入的线程都需要在持有锁的节点上注册相应的监听器,容易导致类似“链式反应”的现象发生。

成百上千头羊浩浩荡荡地朝你冲来,即便你难以支撑压力也不可避免地会导致 ZooKeeper 服务器出现宕机风险。

当某个顺序节点检测到已有线程持有锁时

关于选择临时节点的原因类似于 Redis 的过期时间机制,在 ZooKeeper 服务中断的情况下,在服务器出现故障时, 临时节点也会随之失效,从而防止了潜在的死锁问题

下面来上一段代码的实现:

复制代码
    public class ZooKeeperDistributedLock implements Watcher {
    
    private ZooKeeper zk;
    private String locksRoot = "/locks";
    private String productId;
    private String waitNode;
    private String lockNode;
    private CountDownLatch latch;
    private CountDownLatch connectedLatch = new CountDownLatch(1);
    private int sessionTimeout = 30000;
    
    public ZooKeeperDistributedLock(String productId) {
        this.productId = productId;
        try {
            String address = "192.168.189.131:2181,192.168.189.132:2181";
            zk = new ZooKeeper(address, sessionTimeout, this);
            connectedLatch.await();
        } catch (IOException e) {
            throw new LockException(e);
        } catch (KeeperException e) {
            throw new LockException(e);
        } catch (InterruptedException e) {
            throw new LockException(e);
        }
    }
    
    public void process(WatchedEvent event) {
        if (event.getState() == KeeperState.SyncConnected) {
            connectedLatch.countDown();
            return;
        }
    
        if (this.latch != null) {
            this.latch.countDown();
        }
    }
    
    public void acquireDistributedLock() {
        try {
            if (this.tryLock()) {
                return;
            } else {
                waitForLock(waitNode, sessionTimeout);
            }
        } catch (KeeperException e) {
            throw new LockException(e);
        } catch (InterruptedException e) {
            throw new LockException(e);
        }
    }
    //获取锁
    public boolean tryLock() {
        try {
        // 传入进去的 locksRoot + “/” + productId
        // 假设 productId 代表了一个商品 id,比如说 1
        // locksRoot = locks
        // /locks/10000000000,/locks/10000000001,/locks/10000000002
        lockNode = zk.create(locksRoot + "/" + productId, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
    
        // 看看刚创建的节点是不是最小的节点
        // locks:10000000000,10000000001,10000000002
        List<String> locks = zk.getChildren(locksRoot, false);
        Collections.sort(locks);
    
        if(lockNode.equals(locksRoot+"/"+ locks.get(0))){
            //如果是最小的节点,则表示取得锁
            return true;
        }
    
        //如果不是最小的节点,找到比自己小 1 的节点
      int previousLockIndex = -1;
            for(int i = 0; i < locks.size(); i++) {
        if(lockNode.equals(locksRoot + “/” + locks.get(i))) {
                    previousLockIndex = i - 1;
            break;
        }
       }
    
       this.waitNode = locks.get(previousLockIndex);
        } catch (KeeperException e) {
            throw new LockException(e);
        } catch (InterruptedException e) {
            throw new LockException(e);
        }
        return false;
    }
    
    private boolean waitForLock(String waitNode, long waitTime) throws InterruptedException, KeeperException {
        Stat stat = zk.exists(locksRoot + "/" + waitNode, true);
        if (stat != null) {
            this.latch = new CountDownLatch(1);
            this.latch.await(waitTime, TimeUnit.MILLISECONDS);
            this.latch = null;
        }
        return true;
    }
    
    //释放锁
    public void unlock() {
        try {
            System.out.println("unlock " + lockNode);
            zk.delete(lockNode, -1);
            lockNode = null;
            zk.close();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (KeeperException e) {
            e.printStackTrace();
        }
    }
    //异常
    public class LockException extends RuntimeException {
        private static final long serialVersionUID = 1L;
    
        public LockException(String e) {
            super(e);
        }
    
        public LockException(Exception e) {
            super(e);
        }
    }
    }

既然弄懂了Redis和ZooKeeper各自实现分布式锁的不同原理之后,或许它们之间存在明显的差异呢?确实如此,我已经将这些内容进行了系统的整理归纳,并准备了一份详细的资料.

总结

在实现机制上存在差异:Redis 采用插入占位数据的方式进行操作定位;ZooKeeper 则通过注册临时节点的方式来完成相关功能。
当系统发生故障时(即宕机情况),Redis 在过期时间到达前会自动释放锁;而ZooKeeper 因为其结构特性,在这种情况下已不再维护该节点,并相应地进行锁的释放。
如果没有竞争到锁,则Redis 通常会通过自旋的方式持续获取锁;相对而言效率较低的是ZooKeeper 方式——它是通过注册监听器的方式来获取必要的资源 lock。

不过具体应采取何种实现方式,并非一概而论;而是需要具体情况具体对待,并结合项目所使用的技术架构进行落地实施。

本文是直译自某网络公众号文章的内容,并在其基础上加入了个人的见解和观点。由于无法找到原文链接信息,在文中标注为"注明为本人原创内容"。如有发现雷同情况,请联系本人。

全部评论 (0)

还没有任何评论哟~