分布式锁实现

Created at 2019-09-14 Updated at 2020-07-29 Category 分布式 Tag Redis / ZooKeeper / 乐观锁

分布式锁

使用场景

单机环境中,应用是在同一进程下的,只需要保证单进程多线程环境中的线程安全性,通过 JAVA 提供的 volatile、ReentrantLock、synchronized 以及 concurrent 并发包下一些线程安全的类等就可以做到。而在多机部署环境中,不同机器不同进程,就需要在多进程下保证线程的安全性了。因此,分布式锁应运而生。

实例

比如用户在 App 中接了一笔钱,还钱有两种方式。

  1. 用户手动还款
  2. 系统自动还款

如果用户在注定还款时正好系统的定时任务在自动扣款,出现并发操作,那么有可能用户会被扣两次钱。而且系统后台和 App 部署在两台不同的机器上,线程锁不共享,无法解决问题。

分布式锁需满足的条件

  • 互斥:同⼀时刻,只能有⼀个进程获得锁,执⾏任务后释放锁。
  • 可重⼊(可选):同⼀个任务再次获取该锁不会发生死锁。
  • 阻塞锁(可选):获取失败时,具备重试机制,尝试再次获取锁。
  • 加锁和解锁必须是同一个线程,不能出现 A 把 B 的锁删除了。

实现分布式锁的三种方式

  • 基于数据库

  • 基于 ZooKeeper

  • 基于 Redis

从健壮性考虑,用 zk 比 Redis 好,但是从性能角度,基于 Redis 实现性能更好。

基于 Redis 的简单实现

使用 setnx 加锁 SET resource_name random_value NX PX 30000,如果执行成功,则客户端成功获取到了锁,如果执行失败,说明获取锁失败。

  • random_value 是由客户端生成的一个随机字符串。
  • NX 表示只有当 resource_name 对应的 key 值不存在才能 SET 成功。这保证了只有第一个请求的客户端才能获得锁,而其它客户端在锁被释放之前都无法获得锁。
  • PX 30000 表示这个锁有一个 30 秒的自动过期时间。

判断和释放是两个独立操作,需要执行 Redis Lua 脚本来释放锁,保证原子性。

if redis.call("get",KEYS[1]) == ARGV[1] then
    return redis.call("del",KEYS[1])
else
    return 0
end

锁超时

如果一个得到锁的线程在执行任务的过程中挂掉,来不及显式地释放锁,这块资源将会永远被锁住,别的线程再也别想进来。所以 key 必需设置超时时间,以保证即使没有被显式释放,这把锁也要在一定时间后自动释放。

误删问题

  1. 线程 A 得到锁,设置的超时时间是 30 秒
  2. 线程 A 执行超过 30 秒,
  3. 锁自动释放
  4. 线程 B 得到了锁
  5. 之后 A 执行完了任务,删除了 B 的锁。

所以释放锁的时候必须要判断是不是自己的锁,通过设置值 random_value 来保证唯一。

可重入锁

同⼀个线程再次进⼊同步代码时,可以使⽤⾃⼰已获取到的锁。可重⼊锁可以避免因同⼀线程中多次获取锁⽽导致死锁发⽣。

  • 获取锁:⾸先尝试获取锁,如果获取失败,判断这个锁是否是⾃⼰的,如果是则允许再次获取,⽽且必须记录重复获取锁的次数。
  • 释放锁:释放锁不能直接删除了,因为锁是可重⼊的,如果锁进⼊了多次,在最内层直接删除锁,导致外部的业务在没有锁的情况下执⾏,会有安全问题。因此必须获取锁时累计重⼊的次数,释放时则减去重⼊次数,如果减到0,则可以删除锁.

出现并发

当两个线程同时访问,一个线程执行时间超过过期时间,可以让获得锁的线程开启一个守护线程,用来给快要过期的锁“续航”。

存在单点风险

如果存储锁对应 key 的那个节点挂了的话,就可能存在丢失锁的风险,导致出现多个客户端持有锁的情况,这样就不能实现资源的独享了。

  1. 客户端 A 从 master 获取到锁
  2. 在 master 将锁同步到 slave 之前,master 宕掉了(Redis的主从同步通常是异步的)。
  3. 主从切换,slave 节点被晋级为 master 节点
  4. 客户端 B 取得了同一个资源被客户端 A 已经获取到的另外一个锁。导致存在同一时刻存不止一个线程获取到锁的情况。
    无论 Redis 是单机模式、主从模式、哨兵模式、集群模式,都存在这种风险,因为 Redis 主从同步是异步的。所以 Redis 之父 antirez 提出了 redlock 算法解决这个问题。

Redisson 中的实现

Redisson 是一个在 Redis 的基础上实现的 Java 驻内存数据网格(In-Memory Data Grid)。它不仅提供了一系列的分布式的 Java 常用对象,还实现了可重入锁(Reentrant Lock)、公平锁(Fair Lock、联锁(MultiLock)、 红锁(RedLock)、 读写锁(ReadWriteLock)等,还提供了许多分布式服务。

  • 优点:实现简单,性能好,并发能⼒强,如果对并发能⼒有要求,推荐使⽤
  • 缺点:可靠性有争议,极端情况会出现锁失效问题,如果对安全要求较⾼,不建议使⽤

乐观锁实现

乐观锁:大多数是基于数据版本(version)的记录机制实现的。即为数据增加一个版本标识,在基于数据库表的版本解决方案中,一般是通过为数据库表增加一个 version 字段来实现读取出数据时,将此版本号一同读出,之后更新时,对此版本号加 1。此时,将提交数据的版本号与数据库表对应记录的当前版本号进行比对,如果提交的数据版本号大于数据库当前版本号,则予以更新,否则认为是过期数据。

Redis 中可以使用 watch 命令会监视给定的 key,当 exec 时候如果监视的 key 从调用 watch 后发生过变化,则整个事务会失败。也可以调用 watch 多次监视多个 key。这样就可以对指定的 key 加乐观锁了。注意 watchkey 是对整个连接有效的,事务也一样。如果连接断开,监视和事务都会被自动清除。当然了 exec,discard,unwatch 命令都会清除连接中的所有监视。

Redis 中的事务(transaction)是一组命令的集合。事务同命令一样都是 Redis 最小的执行单位,一个事务中的命令要么都执行,要么都不执行。Redis 事务的实现需要用到 MULTIEXEC 两个命令,事务开始的时候先向 Redis 服务器发送 MULTI 命令,然后依次发送需要在本次事务中处理的命令,最后再发送 EXEC 命令表示事务命令结束。Redis 的事务是下面 4 个命令来实现:

  1. multi:开启 Redis 的事务,置客户端为事务态。
  2. exec:提交事务,执行从 multi 到此命令前的命令队列,置客户端为非事务态。
  3. discard:取消事务,置客户端为非事务态。
  4. watch:监视键值对,作用时如果事务提交 exec 时发现监视的监视对发生变化,事务将被取消。

使用 redis 来实现乐观锁,以秒杀系统为例

package com.github.distribute.lock.redis;

import java.util.List;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import redis.clients.jedis.Jedis;
import redis.clients.jedis.Transaction;

/**
 * redis乐观锁实例
 */
public class OptimisticLockTest {
    public static void main(String[] args) throws InterruptedException {
        long starTime = System.currentTimeMillis();
        initPrduct();
        initClient();
        printResult();
        long endTime = System.currentTimeMillis();
        long Time = endTime - starTime;
        System.out.println("程序运行时间: " + Time + "ms");
    }

    /**
     * 输出结果
     */
    public static void printResult() {
        Jedis jedis = RedisUtil.getInstance().getJedis();
        Set<String> set = jedis.smembers("clientList");
        int i = 1;
        for (String value : set) {
            System.out.println("第" + i++ + "个抢到商品," + value + " ");
        }
        RedisUtil.returnResource(jedis);
    }

    /*
     * 初始化顾客开始抢商品
     */
    public static void initClient() {
        ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
        int clientNum = 10000;// 模拟客户数目
        for (int i = 0; i < clientNum; i++) {
            cachedThreadPool.execute(new ClientThread(i));
        }
        cachedThreadPool.shutdown();
        while (true) {
            if (cachedThreadPool.isTerminated()) {
                System.out.println("所有的线程都结束了!");
                break;
            }
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    /**
     * 初始化商品个数
     */
    public static void initPrduct() {
        int prdNum = 100;// 商品个数
        String key = "prdNum";
        String clientList = "clientList";// 抢购到商品的顾客列表
        Jedis jedis = RedisUtil.getInstance().getJedis();
        if (jedis.exists(key)) {
            jedis.del(key);
        }
        if (jedis.exists(clientList)) {
            jedis.del(clientList);
        }
        jedis.set(key, String.valueOf(prdNum));// 初始化
        RedisUtil.returnResource(jedis);
    }
}

/**
 * 顾客线程
 */
class ClientThread implements Runnable {
    Jedis jedis = null;
    String key = "prdNum";// 商品主键
    String clientList = "clientList";//// 抢购到商品的顾客列表主键
    String clientName;

    public ClientThread(int num) {
        clientName = "编号=" + num;
    }

    public void run() {
        try {
            Thread.sleep((int) (Math.random() * 5000));// 随机睡眠一下
        } catch (InterruptedException e1) {
        }
        while (true) {
            System.out.println("顾客:" + clientName + "开始抢商品");
            jedis = RedisUtil.getInstance().getJedis();
            try {
                jedis.watch(key);
                int prdNum = Integer.parseInt(jedis.get(key));// 当前商品个数
                if (prdNum > 0) {
                    Transaction transaction = jedis.multi();
                    transaction.set(key, String.valueOf(prdNum - 1));
                    List<Object> result = transaction.exec();
                    if (result == null || result.isEmpty()) {
                        System.out.println("顾客:" + clientName + "没有抢到商品");// 可能是watch-key被外部修改,或者是数据操作被驳回
                    } else {
                        jedis.sadd(clientList, clientName);// 抢到商品记录一下
                        System.out.println("顾客:" + clientName + "抢到商品");
                        break;
                    }
                } else {
                    System.out.println("库存为0,顾客:" + clientName + "没有抢到商品");
                    break;
                }
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                jedis.unwatch();
                RedisUtil.returnResource(jedis);
            }
        }
    }
}

基于数据库

  • 获得锁时向表中插入一条记录,释放锁时删除这条记录。唯一索引可以保证该记录只被插入一次,那么就可以用这个记录是否存在来判断是否处于锁定状态。
  • 比如一个订单,那么我们可以用select * from order_table where id = 'xxx' for update进行加行锁,那么其他的事务就不能对其进行修改。

存在的问题

  • 锁没有失效时间,解锁失败的话其它进程无法再获得该锁;
  • 只能是非阻塞锁,插入失败直接就报错了,无法重试;
  • 不可重入,已经获得锁的进程也必须重新获取锁。
  • 一般对比缓存来说性能较低。对于高并发的场景并不是很适合。

基于 ZooKeeper

ZooKeeper 是⼀个分布式的,开放源码的分布式应⽤程序协调服务,是 Hadoop 和 Hbase 的重要组件。

特性

  • 在 Zookeeper 中,znode 是⼀个跟 Unix ⽂件系统路径相似的节点,可以往这个节点存储或获取数据
  • 通过客户端可对 znode 进⾏增删改查的操作,还可以注册 watcher 监控 znode 的变化

ZooKeeper 节点类型

  • 永久节点(PERSISTENT):不会因为会话结束或者超时而消失;
  • 临时节点(EPHEMERAL):如果会话结束或者超时就会消失;
  • 有序节点(SEQUENTIAL):会在节点名的后面加一个数字后缀,并且是有序的,例如生成的有序节点为 /lock/node-0000000000,它的下一个有序节点则为 /lock/node-0000000001,以此类推。

zookeeper 的分布式锁原理是利用了临时节点(EPHEMERAL)的特性。

当 znode 被声明为 EPHEMERAL 的后,如果创建 znode 的那个客户端崩溃了,那么相应的 znode 会被自动删除。这样就避免了设置过期时间的问题。

分布式锁实现

  • 创建一个锁目录 /lock;
  • 当一个客户端需要获取锁时,在 /lock 下创建临时的且有序的子节点;
  • 客户端获取 /lock 下的子节点列表,判断自己创建的子节点是否为当前子节点列表中序号最小的子节点,如果是则认为获得锁;否则监听自己的前一个子节点,获得子节点的变更通知后重复此步骤直至获得锁;
  • 执行业务代码,完成后,删除自己 id 对应的子节点,下一个排队的节点就可以收到 Watcher 事件,从而被唤醒得到锁后退出。

其中的几个关键点:

  1. node节点选择EPHEMERAL_SEQUENTIAL很重要。
    • 自增长的特性,可以方便构建一个基于 Fair 特性的锁,前一个节点唤醒后一个节点,形成一个链式的触发过程。可以有效的避免”惊群效应”(一个锁释放,所有等待的线程都被唤醒),有针对性的唤醒,提升性能。
    • 选择一个EPHEMERAL临时节点的特性。因为和zookeeper交互是一个网络操作,不可控因素过多,比如网络断了,上一个节点释放锁的操作会失败。临时节点是和对应的session挂接的,session一旦超时或者异常退出其节点就会消失,类似于ReentrantLock中等待队列Thread的被中断处理。
  2. 获取lock操作是一个阻塞的操作,而对应的Watcher是一个异步事件,所以需要使用信号进行通知,使用BooleanMutex,可以比较方便的解决锁重入的问题。(锁重入可以理解为多次读操作,锁释放为写抢占操作)
  • 使用EPHEMERAL会引出一个风险:在非正常情况下,网络延迟比较大会出现session timeoutzookeeper就会认为该client已关闭,从而销毁其id标示,竞争资源的下一个id就可以获取锁。这时可能会有两个process同时拿到锁在跑任务,所以设置好session timeout很重要。
  • 同样使用PERSISTENT同样会存在一个死锁的风险,进程异常退出后,对应的竞争资源id一直没有删除,下一个id一直无法获取到锁对象。

参考文章

Site by Cellophane using Hexo & Random

Hide