Posts etcd 分布式锁实现原理
Post
Cancel

etcd 分布式锁实现原理

docker 启动 etcd

1
2
3
4
5
6
7
8
docker run --rm -p 2379:2379 -p 2380:2380 --name etcd \
quay.io/coreos/etcd:v3.4.9 etcd \
--name node1 \
--initial-advertise-peer-urls http://127.0.0.1:2380 \
--listen-peer-urls http://0.0.0.0:2380 \
--advertise-client-urls http://127.0.0.1:2379 \
--listen-client-urls http://0.0.0.0:2379 \
--initial-cluster node1=http://127.0.0.1:2380

docker-compose 启动 etcd

1
2
3
4
5
6
7
8
9
10
11
12
13
14
version: '3'
services:
    etcd-1:
        image: quay.io/coreos/etcd:v3.4.9
        hostname: etcd-1
        container_name: etcd-1
        command: etcd --data-dir=data.etcd --name etcd-1 --initial-advertise-peer-urls http://etcd-1:2380 --listen-peer-urls http://0.0.0.0:2380 --advertise-client-urls http://etcd-1:2379 --listen-client-urls http://0.0.0.0:2379 --initial-cluster etcd-1=http://etcd-1:2380 --initial-cluster-state new --initial-cluster-token etcd-token-01
        ports:
            - 2379:2379
            - 2380:2380
        networks:
            - etcd-net
networks:
    etcd-net:

my_mutex

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
package main

import (
	"context"
	"fmt"
	"go.etcd.io/etcd/clientv3"
	"go.etcd.io/etcd/mvcc/mvccpb"
	"log"
	"path"
	"strconv"
	"sync"
)

type myMutex struct {
	client  *clientv3.Client
	leaseId clientv3.LeaseID
	pfx     string
	myKey   string
	myRev   int64
}

func newMyMutex(cli *clientv3.Client, pfx string) (*myMutex, error) {
	resp, err := cli.Grant(context.TODO(), 15)
	if err != nil {
		return nil, err
	}
	if _, err := cli.KeepAlive(context.TODO(), resp.ID); err != nil {
		return nil, err
	}

	mu := &myMutex{
		client:  cli,
		leaseId: resp.ID,
		pfx:     pfx,
		myKey:   path.Join(pfx, strconv.FormatInt(int64(resp.ID), 16)),
		myRev:   -1,
	}

	return mu, nil
}

func (mu *myMutex) Close() error {
	ctx, cancel := context.WithCancel(context.TODO())
	_, err := mu.client.Revoke(ctx, mu.leaseId)
	cancel()
	return err
}

func (mu *myMutex) UnLock(ctx context.Context) error {
	_, err := mu.client.Delete(ctx, mu.myKey)
	return err
}

func (mu *myMutex) Lock(ctx context.Context) error {
	cmp := clientv3.Compare(clientv3.CreateRevision(mu.myKey), "=", 0)
	put := clientv3.OpPut(mu.myKey, "", clientv3.WithLease(mu.leaseId))
	get := clientv3.OpGet(mu.myKey)
	getOwner := clientv3.OpGet(mu.pfx, clientv3.WithFirstCreate()...)
	resp, err := mu.client.Txn(ctx).If(cmp).Then(put, getOwner).Else(get, getOwner).Commit()
	if err != nil {
		return err
	}
	mu.myRev = resp.Header.Revision
	if !resp.Succeeded {
		mu.myRev = resp.Responses[0].GetResponseRange().Kvs[0].CreateRevision
	}
	ownerRev := resp.Responses[1].GetResponseRange().Kvs[0].CreateRevision
	if ownerRev == mu.myRev {
		return nil
	}
	getOps := append(clientv3.WithLastCreate(), clientv3.WithMaxCreateRev(mu.myRev-1))
	for {
		resp, err := mu.client.Get(ctx, mu.pfx, getOps...)
		if err != nil {
			return err
		}
		if len(resp.Kvs) == 0 {
			break
		}
		wch := mu.client.Watch(ctx, string(resp.Kvs[0].Key), clientv3.WithRev(resp.Header.Revision))
		for wr := range wch {
			isBreak := false
			for _, ev := range wr.Events {
				if ev.Type == mvccpb.DELETE {
					isBreak = true
					break
				}
			}
			if isBreak {
				break
			}
		}
	}
	return nil
}

func main() {
	var cnt int64
	var wg sync.WaitGroup
	endpoints := []string{"127.0.0.1:2379"}
	for i := 0; i < 10; i++ {
		wg.Add(1)
		go func() {
			cli, err := clientv3.New(clientv3.Config{Endpoints: endpoints})
			if err != nil {
				log.Fatal(err)
			}
			defer cli.Close()
			mu, err := newMyMutex(cli, "/my_mutex")
			if err != nil {
				log.Fatal(err)
			}
			defer mu.Close()
			mu.Lock(context.TODO())

			defer mu.UnLock(context.TODO())

			for k := 0; k < 100000; k++ {
				cnt = cnt + 1
			}
			wg.Done()
		}()
	}
	wg.Wait()
	fmt.Println(cnt)
}

my_mutex 工作原理

  1. 每个 mutex 会创建一个租约 lease,并且租约是长期有效的
  2. 使用 prefix/leaseId 作为 keyetcd 插入数据
  3. etcd 确保每次插入的 key 都有一个位移的 CreateRevision,并且 CreateRevision 是从 0 开始递增的
  4. 2 条规定所有 mutex 按照 prefix/leaseId 作为 keyetcd 插入数据
  5. 每个 mutex 监视 prefix 下所有 key,并且获得这些 keyCreateRevision
  6. 如果 mutex 发现自己的 prefix/leaseIdCreateRevision 是这些 prefix 下所有 key 中最小的,那么当前 mutex 获得锁
This post is licensed under CC BY 4.0 by the author.
Contents