0%

分布式锁的不同实现

分布式锁其实就是,控制分布式系统不同进程共同访问共享资源的一种锁的实现。如果不同的系统或同一个系统的不同主机之间共享了某个临界资源,往往需要互斥来防止彼此干扰,以保证一致性。

业界流行的分布式锁实现,一般基于数据库,Redis和Zookeeper。

1. 数据库

1.1 基于悲观锁

  1. 在对任意记录进行修改前,先尝试为该记录加上排他锁(exclusive locking)。
  2. 如果加锁失败,说明该记录正在被修改,那么当前查询可能要等待或者抛出异常。 具体响应方式由开发者根据实际需要决定。
  3. 如果成功加锁,那么就可以对记录做修改,事务完成后就会解锁了。
  4. 其间如果有其他对该记录做修改或加排他锁的操作,都会等待我们解锁或直接抛出异常。
  • select for update,拿到锁。
  • 其他事务抢锁会阻塞。
  • 拿到锁以后,通过 commit 释放锁。

1.2 基于乐观锁

使用版本号时,可以在数据初始化时指定一个版本号,每次对数据的更新操作都对版本号执行+1操作。并判断当前版本号是不是该数据的最新的版本号。

参考: https://zhuanlan.zhihu.com/p/524143305

2. redis

  1. SETNX + expire 函数解决,2个函数没有达到原子性。

  2. SET NX PX,一个函数搞定。但是过期了可以释放别人的锁。

  3. 通过UUID 判断,只能释放自己的锁,这个时候要通过 lua 脚本。

  4. 锁会有提前过期的风险。定时去检测这个锁的失效时间,如果锁快要过期了,就自动对锁进行续期。

参考:https://www.liuvv.com/p/e4e467c6.html

3. zookeeper

Zookeeper的节点Znode有四种类型,Zookeeper分布式锁实现应用了临时顺序节点。

  • 持久节点:默认的节点类型。创建节点的客户端与zookeeper断开连接后,该节点依旧存在。
  • 持久节点顺序节点:所谓顺序节点,就是在创建节点时,Zookeeper根据创建的时间顺序给该节点名称进行编号,持久节点顺序节点就是有顺序的持久节点。
  • 临时节点:和持久节点相反,当创建节点的客户端与zookeeper断开连接后,临时节点会被删除。
  • 临时顺序节点:有顺序的临时节点。

3.1 获取锁

当第一个客户端请求过来时,Zookeeper客户端会创建一个持久节点locks。如果它(Client1)想获得锁,需要在locks节点下创建一个顺序节点lock1

img

客户端Client1会查找locks下面的所有临时顺序子节点,判断自己的节点lock1是不是排序最小的那一个,如果是,则成功获得锁。

img

这时候如果又来一个客户端client2前来尝试获得锁,它会在locks下再创建一个临时节点lock2。

客户端client2一样也会查找locks下面的所有临时顺序子节点,判断自己的节点lock2是不是最小的,此时,发现lock1才是最小的,于是获取锁失败。获取锁失败,它是不会甘心的,client2向它排序靠前的节点lock1注册Watcher事件,用来监听lock1是否存在,也就是说client2抢锁失败进入等待状态。

img

此时,如果再来一个客户端Client3来尝试获取锁,它会在locks下再创建一个临时节点lock3。

img

同样的,client3一样也会查找locks下面的所有临时顺序子节点,判断自己的节点lock3是不是最小的,发现自己不是最小的,就获取锁失败。它也是不会甘心的,它会向在它前面的节点lock2注册Watcher事件,以监听lock2节点是否存在。

img

通过临时和序号的原理, 加上 watch 的原理

  • 创建临时序号节点
  • 获取最小的节点是否时读锁, 如果是读, 那么获得锁
  • 如果不是, 阻塞等待, 每个节点只监听它的上一个节点, 获取通知

3.2 释放锁

Zookeeper的客户端业务完成或者发生故障,都会删除临时节点,释放锁。如果是任务完成,Client1会显式调用删除lock1的指令。

img

如果是客户端故障了,根据临时节点得特性,lock1是会自动删除的。

img

lock1节点被删除后,Client2可开心了,因为它一直监听着lock1。lock1节点删除,Client2立刻收到通知,也会查找locks下面的所有临时顺序子节点,发下lock2是最小,就获得锁。

img

4. etcd

4.1 实现分布式锁的基础

  • Lease 机制
    1. 即租约机制(TTL,Time To Live),Etcd 可以为存储的 Key-Value 对设置租约,当租约到期,Key-Value 将失效删除;同时也支持续约,通过客户端可以在租约到期之前续约,以避免 Key-Value 对过期失效。
    2. Lease 机制可以保证分布式锁的安全性,为锁对应的 Key 配置租约,即使锁的持有者因故障而不能主动释放锁,锁也会因租约到期而自动释放。
  • Revision 机制
    1. 每个 Key 带有一个 Revision 号,每进行一次事务便加一,因此它是全局唯一的,如初始值为 0,进行一次 put(key, value),Key 的 Revision 变为 1,同样的操作,再进行一次,Revision 变为 2;
    2. 换成 key1 进行 put(key1, value) 操作,Revision 将变为 3;这种机制有一个作用:通过 Revision 的大小就可以知道写操作的顺序。在实现分布式锁时,多个客户端同时抢锁,根据 Revision 号大小依次获得锁,可以避免 “羊群效应” (也称“惊群效应”),实现公平锁。
  • Prefix 机制
    1. 即前缀机制,也称目录机制,例如,一个名为 /mylock 的锁,两个争抢它的客户端进行写操作,实际写入的 Key 分别为:key1="/mylock/UUID1",key2="/mylock/UUID2",其中,UUID 表示全局唯一的 ID,确保两个 Key 的唯一性。
    2. 很显然,写操作都会成功,但返回的 Revision 不一样,那么,如何判断谁获得了锁呢?通过前缀“/mylock” 查询,返回包含两个 Key-Value 对的 Key-Value 列表,同时也包含它们的 Revision,通过 Revision 大小,客户端可以判断自己是否获得锁,如果抢锁失败,则等待锁释放(对应的 Key 被删除或者租约过期),然后再判断自己是否可以获得锁。
  • Watch 机制
    1. 即监听机制,Watch 机制支持监听某个固定的 Key,也支持监听一个范围(前缀机制),当被监听的 Key 或范围发生变化,客户端将收到通知;
    2. 在实现分布式锁时,如果抢锁失败,可通过 Prefix 机制返回的 Key-Value 列表获得 Revision 比自己小且相差最小的 Key(称为 Pre-Key),对 Pre-Key 进行监听,因为只有它释放锁,自己才能获得锁,如果监听到 Pre-Key 的 DELETE 事件,则说明 Pre-Key 已经释放,自己已经持有锁。

4.2 获取释放锁过程

下面描述了使用 Etcd 实现分布式锁的业务流程,假设对某个共享资源设置的锁名为:/lock/mylock。客户端连接 Etcd,以 /lock/mylock 为前缀创建全局唯一的 Key。

设置租约

假设第一个客户端对应的 Key="/lock/mylock/UUID1",第二个为 Key="/lock/mylock/UUID2";客户端分别为自己的 Key 创建租约 Lease,租约的长度根据业务耗时确定,假设为 15s。

在一个客户端持有锁期间,其它客户端只能等待,为了避免等待期间租约失效,客户端需创建一个定时任务作为“心跳”进行续约。此外,如果持有锁期间客户端崩溃,心跳停止,Key 将因租约到期而被删除,从而锁释放,避免死锁。

判断获得锁

进行 Put 操作,假设两个客户端 Put 操作返回的 Revision 分别为1、2,客户端需记录 Revision 用以接下来判断自己是否获得锁。

客户端以前缀 /lock/mylock 读取 Key-Value 列表(Key-Value 中带有 Key 对应的 Revision),判断自己 Key 的 Revision 是否为当前列表中最小的,如果是则认为获得锁;

获得锁后,操作共享资源,执行业务代码。完成业务流程后,删除对应的 Key 释放锁。

监听

否则监听列表中前一个 Revision 比自己小的 Key 的删除事件,一旦监听到删除事件或者因租约失效而删除的事件,则自己获得锁。

enter image description here

4.3 golang实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
package example

import (
"context"
"fmt"
"log"
"sync"
"testing"
"time"

"go.etcd.io/etcd/client/v3"
"go.etcd.io/etcd/client/v3/concurrency"
)

func TestMutex_TryLock(t *testing.T) {

cli, err := clientv3.New(clientv3.Config{Endpoints: []string{"127.0.0.1:2379"}})
if err != nil {
log.Fatal(err)
}

defer cli.Close()


s1, err := concurrency.NewSession(cli)
if err != nil {
log.Fatal(err)
}
defer s1.Close()

s2, err := concurrency.NewSession(cli)
if err != nil {
log.Fatal(err)
}
defer s2.Close()

m1 := concurrency.NewMutex(s1, "/my-lock") // 创建 锁1,锁的 key 前缀为 /my-lock
m2 := concurrency.NewMutex(s2, "/my-lock") // 锁2

// 通过 m1 取获取锁, err 等于 nil 说明获得到了 锁
if err = m1.Lock(context.TODO()); err != nil {
log.Fatal(err)
}
fmt.Println("acquired lock for s1")

if err = m2.Lock(context.TODO()); err == nil {
// 因为,锁被 m1 持有了,此时 m2 不应该获得到锁,走到这里说明除了问题
log.Fatal("should not acquire lock")
}
if err == concurrency.ErrLocked {
fmt.Println("cannot acquire lock for s2, as already locked in another session")
}

// m1 释放锁
if err = m1.Unlock(context.TODO()); err != nil {
log.Fatal(err)
}
fmt.Println("released lock for s1")

// m2 试图去获取锁
// 因为 m1 已经释放了锁,所以 m2 会成功获取到锁
if err = m2.TryLock(context.TODO()); err != nil {
log.Fatal(err)
}
fmt.Println("acquired lock for s2")

// Output:
// acquired lock for s1
// cannot acquire lock for s2, as already locked in another session
// released lock for s1
// acquired lock for s2
}

5. 参考资料

可以加首页作者微信,咨询相关问题!