分布式锁实现
Created at 2019-09-14 Updated at 2020-07-29 Category 分布式 views
分布式锁
使用场景
单机环境中,应用是在同一进程下的,只需要保证单进程多线程环境中的线程安全性,通过 JAVA 提供的 volatile、ReentrantLock、synchronized 以及 concurrent 并发包下一些线程安全的类等就可以做到。而在多机部署环境中,不同机器不同进程,就需要在多进程下保证线程的安全性了。因此,分布式锁应运而生。
实例
比如用户在 App 中接了一笔钱,还钱有两种方式。
- 用户手动还款
- 系统自动还款
如果用户在注定还款时正好系统的定时任务在自动扣款,出现并发操作,那么有可能用户会被扣两次钱。而且系统后台和 App 部署在两台不同的机器上,线程锁不共享,无法解决问题。
分布式锁需满足的条件
- 互斥:同⼀时刻,只能有⼀个进程获得锁,执⾏任务后释放锁。
- 可重⼊(可选):同⼀个任务再次获取该锁不会发生死锁。
- 阻塞锁(可选):获取失败时,具备重试机制,尝试再次获取锁。
- 加锁和解锁必须是同一个线程,不能出现 A 把 B 的锁删除了。
实现分布式锁的三种方式
基于数据库
基于 ZooKeeper
基于 Redis
从健壮性考虑,用 zk 比 Redis 好,但是从性能角度,基于 Redis 实现性能更好。
基于 Redis 的简单实现
使用 setnx 加锁 SET resource_name random_value NX PX 30000,如果执行成功,则客户端成功获取到了锁,如果执行失败,说明获取锁失败。
random_value是由客户端生成的一个随机字符串。NX表示只有当resource_name对应的 key 值不存在才能 SET 成功。这保证了只有第一个请求的客户端才能获得锁,而其它客户端在锁被释放之前都无法获得锁。PX 30000表示这个锁有一个 30 秒的自动过期时间。
判断和释放是两个独立操作,需要执行 Redis Lua 脚本来释放锁,保证原子性。
if redis.call("get",KEYS[1]) == ARGV[1] then
return redis.call("del",KEYS[1])
else
return 0
end
锁超时
如果一个得到锁的线程在执行任务的过程中挂掉,来不及显式地释放锁,这块资源将会永远被锁住,别的线程再也别想进来。所以 key 必需设置超时时间,以保证即使没有被显式释放,这把锁也要在一定时间后自动释放。
误删问题
- 线程 A 得到锁,设置的超时时间是 30 秒
- 线程 A 执行超过 30 秒,
- 锁自动释放
- 线程 B 得到了锁
- 之后 A 执行完了任务,删除了 B 的锁。
所以释放锁的时候必须要判断是不是自己的锁,通过设置值 random_value 来保证唯一。
可重入锁
同⼀个线程再次进⼊同步代码时,可以使⽤⾃⼰已获取到的锁。可重⼊锁可以避免因同⼀线程中多次获取锁⽽导致死锁发⽣。
- 获取锁:⾸先尝试获取锁,如果获取失败,判断这个锁是否是⾃⼰的,如果是则允许再次获取,⽽且必须记录重复获取锁的次数。
- 释放锁:释放锁不能直接删除了,因为锁是可重⼊的,如果锁进⼊了多次,在最内层直接删除锁,导致外部的业务在没有锁的情况下执⾏,会有安全问题。因此必须获取锁时累计重⼊的次数,释放时则减去重⼊次数,如果减到0,则可以删除锁.
出现并发
当两个线程同时访问,一个线程执行时间超过过期时间,可以让获得锁的线程开启一个守护线程,用来给快要过期的锁“续航”。
存在单点风险
如果存储锁对应 key 的那个节点挂了的话,就可能存在丢失锁的风险,导致出现多个客户端持有锁的情况,这样就不能实现资源的独享了。
- 客户端 A 从 master 获取到锁
- 在 master 将锁同步到 slave 之前,master 宕掉了(Redis的主从同步通常是异步的)。
- 主从切换,slave 节点被晋级为 master 节点
- 客户端 B 取得了同一个资源被客户端 A 已经获取到的另外一个锁。导致存在同一时刻存不止一个线程获取到锁的情况。
无论 Redis 是单机模式、主从模式、哨兵模式、集群模式,都存在这种风险,因为 Redis 主从同步是异步的。所以 Redis 之父 antirez 提出了 redlock 算法解决这个问题。
Redisson 中的实现
Redisson 是一个在 Redis 的基础上实现的 Java 驻内存数据网格(In-Memory Data Grid)。它不仅提供了一系列的分布式的 Java 常用对象,还实现了可重入锁(Reentrant Lock)、公平锁(Fair Lock、联锁(MultiLock)、 红锁(RedLock)、 读写锁(ReadWriteLock)等,还提供了许多分布式服务。
- 优点:实现简单,性能好,并发能⼒强,如果对并发能⼒有要求,推荐使⽤
- 缺点:可靠性有争议,极端情况会出现锁失效问题,如果对安全要求较⾼,不建议使⽤
乐观锁实现
乐观锁:大多数是基于数据版本(version)的记录机制实现的。即为数据增加一个版本标识,在基于数据库表的版本解决方案中,一般是通过为数据库表增加一个 version 字段来实现读取出数据时,将此版本号一同读出,之后更新时,对此版本号加 1。此时,将提交数据的版本号与数据库表对应记录的当前版本号进行比对,如果提交的数据版本号大于数据库当前版本号,则予以更新,否则认为是过期数据。
Redis 中可以使用 watch 命令会监视给定的 key,当 exec 时候如果监视的 key 从调用 watch 后发生过变化,则整个事务会失败。也可以调用 watch 多次监视多个 key。这样就可以对指定的 key 加乐观锁了。注意 watch 的 key 是对整个连接有效的,事务也一样。如果连接断开,监视和事务都会被自动清除。当然了 exec,discard,unwatch 命令都会清除连接中的所有监视。
Redis 中的事务(transaction)是一组命令的集合。事务同命令一样都是 Redis 最小的执行单位,一个事务中的命令要么都执行,要么都不执行。Redis 事务的实现需要用到 MULTI 和 EXEC 两个命令,事务开始的时候先向 Redis 服务器发送 MULTI 命令,然后依次发送需要在本次事务中处理的命令,最后再发送 EXEC 命令表示事务命令结束。Redis 的事务是下面 4 个命令来实现:
- multi:开启 Redis 的事务,置客户端为事务态。
- exec:提交事务,执行从 multi 到此命令前的命令队列,置客户端为非事务态。
- discard:取消事务,置客户端为非事务态。
- watch:监视键值对,作用时如果事务提交 exec 时发现监视的监视对发生变化,事务将被取消。
使用 redis 来实现乐观锁,以秒杀系统为例
package com.github.distribute.lock.redis;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.Transaction;
/**
* redis乐观锁实例
*/
public class OptimisticLockTest {
public static void main(String[] args) throws InterruptedException {
long starTime = System.currentTimeMillis();
initPrduct();
initClient();
printResult();
long endTime = System.currentTimeMillis();
long Time = endTime - starTime;
System.out.println("程序运行时间: " + Time + "ms");
}
/**
* 输出结果
*/
public static void printResult() {
Jedis jedis = RedisUtil.getInstance().getJedis();
Set<String> set = jedis.smembers("clientList");
int i = 1;
for (String value : set) {
System.out.println("第" + i++ + "个抢到商品," + value + " ");
}
RedisUtil.returnResource(jedis);
}
/*
* 初始化顾客开始抢商品
*/
public static void initClient() {
ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
int clientNum = 10000;// 模拟客户数目
for (int i = 0; i < clientNum; i++) {
cachedThreadPool.execute(new ClientThread(i));
}
cachedThreadPool.shutdown();
while (true) {
if (cachedThreadPool.isTerminated()) {
System.out.println("所有的线程都结束了!");
break;
}
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
/**
* 初始化商品个数
*/
public static void initPrduct() {
int prdNum = 100;// 商品个数
String key = "prdNum";
String clientList = "clientList";// 抢购到商品的顾客列表
Jedis jedis = RedisUtil.getInstance().getJedis();
if (jedis.exists(key)) {
jedis.del(key);
}
if (jedis.exists(clientList)) {
jedis.del(clientList);
}
jedis.set(key, String.valueOf(prdNum));// 初始化
RedisUtil.returnResource(jedis);
}
}
/**
* 顾客线程
*/
class ClientThread implements Runnable {
Jedis jedis = null;
String key = "prdNum";// 商品主键
String clientList = "clientList";//// 抢购到商品的顾客列表主键
String clientName;
public ClientThread(int num) {
clientName = "编号=" + num;
}
public void run() {
try {
Thread.sleep((int) (Math.random() * 5000));// 随机睡眠一下
} catch (InterruptedException e1) {
}
while (true) {
System.out.println("顾客:" + clientName + "开始抢商品");
jedis = RedisUtil.getInstance().getJedis();
try {
jedis.watch(key);
int prdNum = Integer.parseInt(jedis.get(key));// 当前商品个数
if (prdNum > 0) {
Transaction transaction = jedis.multi();
transaction.set(key, String.valueOf(prdNum - 1));
List<Object> result = transaction.exec();
if (result == null || result.isEmpty()) {
System.out.println("顾客:" + clientName + "没有抢到商品");// 可能是watch-key被外部修改,或者是数据操作被驳回
} else {
jedis.sadd(clientList, clientName);// 抢到商品记录一下
System.out.println("顾客:" + clientName + "抢到商品");
break;
}
} else {
System.out.println("库存为0,顾客:" + clientName + "没有抢到商品");
break;
}
} catch (Exception e) {
e.printStackTrace();
} finally {
jedis.unwatch();
RedisUtil.returnResource(jedis);
}
}
}
}
基于数据库
- 获得锁时向表中插入一条记录,释放锁时删除这条记录。唯一索引可以保证该记录只被插入一次,那么就可以用这个记录是否存在来判断是否处于锁定状态。
- 比如一个订单,那么我们可以用
select * from order_table where id = 'xxx' for update进行加行锁,那么其他的事务就不能对其进行修改。
存在的问题
- 锁没有失效时间,解锁失败的话其它进程无法再获得该锁;
- 只能是非阻塞锁,插入失败直接就报错了,无法重试;
- 不可重入,已经获得锁的进程也必须重新获取锁。
- 一般对比缓存来说性能较低。对于高并发的场景并不是很适合。
基于 ZooKeeper
ZooKeeper 是⼀个分布式的,开放源码的分布式应⽤程序协调服务,是 Hadoop 和 Hbase 的重要组件。
特性
- 在 Zookeeper 中,znode 是⼀个跟 Unix ⽂件系统路径相似的节点,可以往这个节点存储或获取数据
- 通过客户端可对 znode 进⾏增删改查的操作,还可以注册 watcher 监控 znode 的变化。
ZooKeeper 节点类型
- 永久节点(PERSISTENT):不会因为会话结束或者超时而消失;
- 临时节点(EPHEMERAL):如果会话结束或者超时就会消失;
- 有序节点(SEQUENTIAL):会在节点名的后面加一个数字后缀,并且是有序的,例如生成的有序节点为 /lock/node-0000000000,它的下一个有序节点则为 /lock/node-0000000001,以此类推。
zookeeper 的分布式锁原理是利用了临时节点(EPHEMERAL)的特性。
当 znode 被声明为 EPHEMERAL 的后,如果创建 znode 的那个客户端崩溃了,那么相应的 znode 会被自动删除。这样就避免了设置过期时间的问题。
分布式锁实现
- 创建一个锁目录 /lock;
- 当一个客户端需要获取锁时,在 /lock 下创建临时的且有序的子节点;
- 客户端获取 /lock 下的子节点列表,判断自己创建的子节点是否为当前子节点列表中序号最小的子节点,如果是则认为获得锁;否则监听自己的前一个子节点,获得子节点的变更通知后重复此步骤直至获得锁;
- 执行业务代码,完成后,删除自己 id 对应的子节点,下一个排队的节点就可以收到 Watcher 事件,从而被唤醒得到锁后退出。
其中的几个关键点:
node节点选择EPHEMERAL_SEQUENTIAL很重要。- 自增长的特性,可以方便构建一个基于 Fair 特性的锁,前一个节点唤醒后一个节点,形成一个链式的触发过程。可以有效的避免”惊群效应”(一个锁释放,所有等待的线程都被唤醒),有针对性的唤醒,提升性能。
- 选择一个
EPHEMERAL临时节点的特性。因为和zookeeper交互是一个网络操作,不可控因素过多,比如网络断了,上一个节点释放锁的操作会失败。临时节点是和对应的session挂接的,session一旦超时或者异常退出其节点就会消失,类似于ReentrantLock中等待队列Thread的被中断处理。
- 获取
lock操作是一个阻塞的操作,而对应的Watcher是一个异步事件,所以需要使用信号进行通知,使用BooleanMutex,可以比较方便的解决锁重入的问题。(锁重入可以理解为多次读操作,锁释放为写抢占操作)
- 使用
EPHEMERAL会引出一个风险:在非正常情况下,网络延迟比较大会出现session timeout,zookeeper就会认为该client已关闭,从而销毁其id标示,竞争资源的下一个id就可以获取锁。这时可能会有两个process同时拿到锁在跑任务,所以设置好session timeout很重要。 - 同样使用
PERSISTENT同样会存在一个死锁的风险,进程异常退出后,对应的竞争资源id一直没有删除,下一个id一直无法获取到锁对象。