raft
Raft算法实现了一种复制状态机。每个分布式的状态机中存储了一份包含命令序列的日志文件,这些文件通过复制的形式传播到其他节点中。每个日志包含相同的命令,并且顺序也相同。状态机会按顺序执行这些命令并产生相同的状态,最终所有的状态机都将达到一个确定的最终状态。
raft是强一致的集群日志同步算法。
在Raft算法中,每一个节点会维护一份复制日志(Replicated Log),复制日志中存储了按顺序排列的条目(Entry),用户执行的每一个操作都会生成日志中的一个条目,稍后这个条目会通过节点之间的交流复制到所有节点上。
如果一个条目是被大多数节点认可的,那么这种条目被称为Committed Entry,这也是节点唯一会执行的条目类型。各个节点只要按顺序执行复制日志中的Committed Entry,最终就会到达相同的状态。这样,即便节点崩溃后苏醒,也可以快速恢复到和其他节点相同的状态。
总结一下Raft算法的核心思想就是,保证每个节点具有相同的复制日志,进而保证所有节点的最终状态是一致的。
基本原理
Raft中的节点有3种状态,领导者(Leader),候选人(Candidate)和跟随者(Follower)。
其中,Leader是大多数的节点选举产生的,并且节点的状态可以随着时间发生变化。某个Leader节点在领导的这段时期被称为任期(Term)。新的Term是从选举Leader时开始增加的,每次Candidate节点开始新的选举,Term都会加1。
如果Candidate选举成为了Leader,意味着它成为了这个Term后续时间的Leader。每一个节点会存储当前的Term,如果某一个节点当前的Term小于其他节点,那么节点会更新自己的Term为已知的最大Term。如果一个Candidate发现自己当前的Term过时了,它会立即变为Follower。
一般情况下(网络分区除外)在一个时刻只会存在一个Leader,其余的节点都是Follower。Leader会处理所有的客户端写请求(如果是客户端写请求到Follower,也会被转发到Leader处理),将操作作为一个Entry追加到复制日志中,并把日志复制到所有节点上。而Candidate则是节点选举时的过渡状态,用于自身拉票选举Leader。
Raft节点之间通过RPC(Remote Prcedure Cal,远程过程调用)来进行通信。Raft论文中指定了两种方法用于节点的通信,其中,RequestVote 方法由Candidate在选举时使用,AppendEntries则是Leader复制log到其他节点时使用,同时也可以用于心跳检测。RPC 方法可以是并发的,且支持失败重试。
Raft算法可以分为三个部分:选举、日志复制和异常处理。下面我们分阶段介绍一下。
选举与任期
在Raft中有一套心跳检测,只要Follower收到来自Leader或者Candidate的信息,它就会保持Follower的状态。但是如果Follower一段时间内没有收到RPC请求(例如可能是Leader挂了),新一轮选举的机会就来了。这时Follower会将当前Term加1并过渡到Candidate状态。它会给自己投票,并发送RequestVote RPC请求给其他的节点进行拉票。
Candidate的状态会持续,直到下面的三种情况发生。
- 如果这个Candidate节点获得了大部分节点的支持,赢得选举变为了Leader。一旦它变为Leader,这个新的Leader节点就会向其他节点发送 AppendEntries RPC, 确认自己Leader的地位,终止选举。
- 如果其他节点成为了Leader。它会收到其他节点的AppendEntries RPC。如果发现其他节点的当前Term比自己的大,则会变为Follower状态。
- 如果有许多节点同时变为了Candidate,则可能会出现一段时间内没有节点能够选举成功的情况,这会导致选举超时。
为了快速解决并修复这第三种情况,Raft规定了每一个Candidate在选举前会重置一个随机的选举超时(Election Timeout)时间,这个随机时间会在一个区间内(例如150-300ms)。
随机时间保证了在大部分情况下,有一个唯一的节点首先选举超时,它会在大部分节点选举超时前发送心跳检测,赢得选举。如果一个Leader在心跳检测中发现另一个节点有更高的Term,它会转变为Follower,否则将一直保持Leader状态。
日志复制(Log Replication)
一个节点成为Leader之后,会开始接受来自客户端的请求。每一个客户端请求都包含一个节点的状态机将要执行的操作(Command)。Leader会将这个操作包装为一个Entry放入到log中,并通过AppendEntries RPC 发送给其他节点,要求其他节点把这个Entry添加到log中。
当Entry被复制到大多数节点之后,也就是被大部分的节点认可之后,这个Entry的状态就变为Committed。Raft算法会保证 Committed Entry 一定能够被所有节点的状态机执行。
一旦Follower通过RPC协议知道某一个Entry被commit了,Follower就可以按顺序执行log中的Committed Entry了。
如图所示,我们可以把log理解为Entry的集合。Entry中包含了Command命令(例如x←3),Entry所在的Term(方框里面的数字),以及每一个Entry的顺序编号(最上面标明的log index,顺序递增)。
但这里还有一个重要的问题,就是Raft节点在日志复制的过程中需要保证日志数据的一致性。要实现这一点,需要确认下面几个关键的属性:
- 如果不同节点的log中的Entry有相同的index和Term, 那么它们存储的一定是相同的Command;
- 如果不同节点的log中的Entry有相同的index和Term,那么这个Entry之前所有的Entry都是相同的。
接下来我们就来看看,Raft算法是怎么在不可靠的分布式环境中保证数据一致性的。
在实际生产过程中,Raft算法可能会因为分布式系统中遇到的难题(例如节点崩溃),出现多种数据不一致的情况。如下所示,a → f分别代表Follower的复制日志中可能遇到的情况,方框中的方格表示当前节点复制日志中每一个Entry对应的Term序号。
a → e的情况你可以想一想什么时候会发生
f这种情况可能是这样的:f是Term 2的Leader, 它添加Entry到log中之后,Entry还没有复制到其他节点,也就是说,还没等到commit就崩溃了。但是它快速恢复之后又变为了Term 3 的Leader, 再次添加Entry到log之后,没有commit又崩溃了。当f再次苏醒时,世界已然发生了巨变。
所以我们可以看到,在正常的情况下,Raft可以满足上面的两个属性,但是异常情况下,这种情况就可能被打破,出现数据不一致的情况。为了让数据保持最终一致,Raft算法会强制要求Follower的复制日志和Leader的复制日志一致,这样一来,Leader就必须要维护一个Entry index了。在这个Entry index之后的都是和Follower不相同的Entry,在这个Entry之前的都是和Follower一致的Entry。
Leader会为每一个Follower维护一份next index数组,里面标志了将要发送给Follower的下一个Entry的序号。最后,Follower会删除掉所有不同的Entry,并保留和Leader一致的复制日志,这一过程都会通过AppendEntries RPC 执行完毕。
不过,仅仅通过上面的措施还不足以保证数据的一致性。想想下图这个例子:
从这张图可以看出,一个已经被Committed的Entry是有可能被覆盖掉的。例如在a阶段,节点s1成为了Leader,Entry 2还没有成为Committed。在b阶段,s1崩溃,s5成为了Leader ,添加Entry到自己的log中,但是仍然没有commit。在c阶段,s5崩溃,s1成为了Leader,而且在这个过程中Entry 2成为了Committed Entry。接着在d阶段s1崩溃,s5成为了Leader,它会将本已commit的Entry 2给覆盖掉。但我们真正想期望的是e这种情况。
怎么解决这个问题呢?Raft使用了一种简单的方法。Raft为Leader添加了下面几个限制:
- 要成为Leader必须要包含过去所有的Committed Entry;
- Candidate要想成为Leader,必须要经过大部分Follower节点的同意。而当Entry成为Committed Entry时,表明该Entry其实已经存在于大部分节点中了,所以这个Committed Entry 会出现在至少一个Follower节点中。因此我们可以证明,当前Follower节点中,至少有一个节点是包含了上一个Leader节点的所有Committed Entry的。Raft算法规定,只有当一个Follower节点的复制日志是最新的(如果复制日志的Term最大,则其日志最新,如果Term相同,那么越长的复制日志越新),它才可能成为Leader。
日志特性
- 如果在不同的日志中的两个日志条目的
索引
和索引下标
相同,那么他们的指令就是相同的。(原因:leader 最多在一个任期里的一个日志索引位置创建一条日志条目,日志条目在日志的位置从来不会改变
) - 如果在不同的日志里的 2 个日志条目拥有相同的任期号和索引,那么他们之前的日志项都是相同的。(
原因:每次 RPC 发送附加日志时,leader 会把这条日志条目的前面的日志的下标和任期号一起发送给 follower,如果 follower 发现和自己的日志不匹配,那么就拒绝接受这条日志,这个称之为一致性检查
)。 2.1 这里需要提一下 Raft 的日志匹配规则:如果 2 个日志的相同的索引位置的日志条目的任期号相同,那么 Raft 就认为这个日志从头到这个索引之间全部相同
,这个非常重要。
- 选举leader需要半数以上节点参与
- 节点commit 日志最多允许选举为leader
- commit (提交)日志一样多,则term, index越大选举为leader
raft保证
- 提交成功的请求,一定不会丢
- 各个节点的数据将最终一致
etcd
将数据存储在集群中的高可用k-v存储
允许应用实时监听存储中的k-v 的变化
能够容忍单点故障,能够应对网络分区
使用etcd的场景默认处理的数据都是控制数据,对于应用数据,只推荐数据量很小,但是更新访问频繁的情况。
etcd与raft的关系
raft是强一致的集群日志同步算法
etcd是一个分布式kv存储
etcd利用raft算法在集群中同步key-value
抽屉理论
60人 31人又苹果。取30人那么其中一定有一个有苹果
quorum 模型
集群至少有2n+1的节点
Leader, Follower, Follower
分2步骤
-
1.日志复制给n+1节点后,本地提交返回客户端(同步) (写入性能差(1000次/秒))
-
2.异步通知所有follower完成提交
交互协议
sdk内置grpc协议,性能高效
重要特性
- 底层存储是按key有序排列的,可以顺序遍历
- 因为key有序,所以etcd支持按目录结构高效遍历
- 支持复杂事物,提供if…then..else…的事务能力
- 基于租约机制实现key的ttl过期
mvcc 多版本控制
1put /key1 value1 -> revision=1
2put /key2 value2 -> revision=2
3put /keys value3 -> revision=3
提交的版本(revision) 在ectd中是单调递增的
同key维护多个历史版本,用于watch机制
历史版本过多,可以通过compact命令完成删减
监听kv变化
通过watch机制,可以监听某个key或某个目录(key前缀)的连续变化
常用于分布式系统的配置分发,状态同步
watch 工作原理
租约lease
请求10秒的租约,返回租约id5,然后把数据a=1和租约id存储到 数据库中。过期了,etcd会去删除。也可以通过sdk 续租。
安装
mac安装
1# mac安装
2ETCD_VER=v3.3.18
3
4# choose either URL
5GOOGLE_URL=https://storage.googleapis.com/etcd
6GITHUB_URL=https://github.com/etcd-io/etcd/releases/download
7DOWNLOAD_URL=${GOOGLE_URL}
8
9rm -f /tmp/etcd-${ETCD_VER}-darwin-amd64.zip
10rm -rf /tmp/etcd-download-test && mkdir -p /tmp/etcd-download-test
11
12curl -L ${DOWNLOAD_URL}/${ETCD_VER}/etcd-${ETCD_VER}-darwin-amd64.zip -o /tmp/etcd-${ETCD_VER}-darwin-amd64.zip
13unzip /tmp/etcd-${ETCD_VER}-darwin-amd64.zip -d /tmp && rm -f /tmp/etcd-${ETCD_VER}-darwin-amd64.zip
14mv /tmp/etcd-${ETCD_VER}-darwin-amd64/* /tmp/etcd-download-test && rm -rf mv /tmp/etcd-${ETCD_VER}-darwin-amd64
15
16/tmp/etcd-download-test/etcd --version
17/tmp/etcd-download-test/etcdctl version
18
19mv /tmp/etcd-download-test/etcd /usr/local/bin/
20mv /tmp/etcd-download-test/etcdctl /usr/local/bin/
启动etcd
1nohup etcd --listen-client-urls "http://0.0.0.0:2379" --advertise-client-urls "http://0.00.0:2379" &
命令行操作etcd
1ETCDCTL_API=3 etcdctl
2ETCDCTL_API=3 etcdctl put "name" "mao"
3ETCDCTL_API=3 etcdctl get "name"
4ETCDCTL_API=3 etcdctl del "name"
5ETCDCTL_API=3 etcdctl put "/cron/jobs/job1" "{'msg':'ok'}"
6ETCDCTL_API=3 etcdctl put "/cron/jobs/job2" "{'msg':'ok'}"
7ETCDCTL_API=3 etcdctl get "/cron/jobs/" --prefix
8ETCDCTL_API=3 etcdctl watch "/cron/jobs" --prefix #阻塞监听/cron/jobs 为前缀的key 。
9ETCDCTL_API=3 etcdctl del "/cron/jobs" --prefix #删除
golang操作etcd
客户端安装 go get go.etcd.io/etcd/clientv3
etcd客户端安装失败
https://www.codercto.com/a/108257.html
1// go.mod 增加下面代码
2
3replace (
4 github.com/coreos/bbolt => go.etcd.io/bbolt v1.3.4
5 google.golang.org/grpc => google.golang.org/grpc v1.26.0
6)
写入kv
1package main
2
3import (
4 "context"
5 "fmt"
6 "go.etcd.io/etcd/clientv3"
7 "time"
8)
9
10func main() {
11 var (
12 config clientv3.Config
13 client *clientv3.Client
14 err error
15 ctx context.Context
16 kv clientv3.KV
17 putResp *clientv3.PutResponse
18 )
19 config = clientv3.Config{
20 Endpoints: []string{"localhost:2379"}, //集群列表
21 DialTimeout: 5 * time.Second,
22 }
23 if client, err = clientv3.New(config); err != nil {
24 fmt.Println(err)
25 }
26
27 defer client.Close()
28 ctx = context.TODO()
29 kv = clientv3.NewKV(client)
30
31 //写入hello
32 kv.Put(, "/cron/jobs/job1", "hello")
33
34 if putResp, err = kv.Put(ctx, "/cron/jobs/job1", "bye", clientv3.WithPrevKV()); err != nil {
35 fmt.Println(err)
36 } else {
37 fmt.Println("Revision:", putResp.Header.Revision)
38 if putResp.PrevKv != nil { // 打印hello
39 fmt.Println("PrevValue:", string(putResp.PrevKv.Value))
40 }
41 }
42
43 if putResp, err = kv.Put(ctx, "/cron/jobs/job2", "111", clientv3.WithPrevKV()); err != nil {
44 fmt.Println(err)
45 } else {
46 fmt.Println("Revision:", putResp.Header.Revision)
47 if putResp.PrevKv != nil { // 打印hello
48 fmt.Println("PrevValue:", string(putResp.PrevKv.Value))
49 }
50 }
51}
获得,前缀获得
1package main
2
3import (
4 "go.etcd.io/etcd/clientv3"
5 "time"
6 "fmt"
7 "context"
8)
9
10func main() {
11 var (
12 config clientv3.Config
13 client *clientv3.Client
14 err error
15 kv clientv3.KV
16 getResp *clientv3.GetResponse
17 )
18
19 config = clientv3.Config{
20 Endpoints: []string{"127.0.0.1:2379"}, // 集群列表
21 DialTimeout: 5 * time.Second,
22 }
23
24 // 建立一个客户端
25 if client, err = clientv3.New(config); err != nil {
26 fmt.Println(err)
27 return
28 }
29
30 // 用于读写etcd的键值对
31 kv = clientv3.NewKV(client)
32
33 if getResp, err = kv.Get(context.TODO(), "/cron/jobs/job1", /*clientv3.WithCountOnly()*/); err != nil {
34 fmt.Println(err)
35 } else {
36 fmt.Println(getResp.Kvs, getResp.Count)
37 }
38 // WithPrefix()能获得到 demo1中 /cron/jobs/job1 /cron/jobs/job2 的信息
39 //if getResp, err = kv.Get(context.TODO(), "/cron/jobs/", clientv3.WithPrefix()); err != nil {
40 // fmt.Println(err)
41 //} else {
42 // fmt.Println(getResp.Kvs, getResp.Count)
43 //}
44}
删除
1package main
2//删除key
3import (
4 "context"
5 "fmt"
6 "github.com/coreos/etcd/mvcc/mvccpb"
7 "go.etcd.io/etcd/clientv3"
8 "time"
9)
10
11func main() {
12 var (
13 config clientv3.Config
14 client *clientv3.Client
15 err error
16 kv clientv3.KV
17 delResp *clientv3.DeleteResponse
18 kvpair *mvccpb.KeyValue
19 )
20
21 config = clientv3.Config{
22 Endpoints: []string{"127.0.0.1:2379"}, // 集群列表
23 DialTimeout: 5 * time.Second,
24 }
25
26 // 建立一个客户端
27 if client, err = clientv3.New(config); err != nil {
28 fmt.Println(err)
29 return
30 }
31
32 // 用于读写etcd的键值对
33 kv = clientv3.NewKV(client)
34
35 // 删除KV
36 if delResp, err = kv.Delete(context.TODO(), "/cron/jobs/job1", clientv3.WithPrevKV()); err != nil {
37 fmt.Println(err)
38 return
39 }
40
41 // 被删除之前的value是什么
42 if len(delResp.PrevKvs) != 0 {
43 for _, kvpair = range delResp.PrevKvs {
44 fmt.Println("删除了:", string(kvpair.Key), string(kvpair.Value))
45 }
46 }
47
48 // WithFromKey 能把 /cron/jobs/job1 /cron/jobs/job2 都删除
49 //if delResp, err = kv.Delete(context.TODO(), "/cron/jobs/job1", clientv3.WithFromKey()); err != nil {
50 // fmt.Println(err)
51 // return
52 //}
53}
续租, 带过期时间的续租
1package main
2
3import (
4 "go.etcd.io/etcd/clientv3"
5 "time"
6 "fmt"
7 "context"
8)
9
10func main() {
11 var (
12 config clientv3.Config
13 client *clientv3.Client
14 err error
15 lease clientv3.Lease
16 leaseGrantResp *clientv3.LeaseGrantResponse
17 leaseId clientv3.LeaseID
18 putResp *clientv3.PutResponse
19 getResp *clientv3.GetResponse
20 keepResp *clientv3.LeaseKeepAliveResponse
21 keepRespChan <-chan *clientv3.LeaseKeepAliveResponse
22 kv clientv3.KV
23 )
24 config = clientv3.Config{
25 Endpoints: []string{"127.0.0.1:2379"}, // 集群列表
26 DialTimeout: 5 * time.Second,
27 }
28 // 建立一个客户端
29 if client, err = clientv3.New(config); err != nil {
30 fmt.Println(err)
31 return
32 }
33 // 申请一个lease(租约)
34 lease = clientv3.NewLease(client)
35 // 申请一个10秒的租约
36 if leaseGrantResp, err = lease.Grant(context.TODO(), 10); err != nil {
37 fmt.Println(err)
38 return
39 }
40 // 拿到租约的ID
41 leaseId = leaseGrantResp.ID
42 //一秒续租一次
43 if keepRespChan, err = lease.KeepAlive(context.TODO(), leaseId); err != nil {
44 fmt.Println(err)
45 return
46 }
47
48 // 处理续约应答的协程, 一秒续租一次
49 go func() {
50 for {
51 select {
52 case keepResp = <- keepRespChan:
53 if keepRespChan == nil {
54 fmt.Println("租约已经失效了")
55 goto END
56 } else { // 每秒会续租一次, 所以就会受到一次应答
57 fmt.Println("收到自动续租应答:", keepResp.ID)
58 }
59 }
60 }
61 END:
62 }()
63
64 // 获得kv API子集
65 kv = clientv3.NewKV(client)
66
67 // Put一个KV, 让它与租约关联起来, 从而实现10秒后自动过期
68 if putResp, err = kv.Put(context.TODO(), "/cron/lock/job1", "", clientv3.WithLease(leaseId)); err != nil {
69 fmt.Println(err)
70 return
71 }
72
73 fmt.Println("写入成功:", putResp.Header.Revision)
74
75 // 定时的看一下key过期了没有
76 for {
77 if getResp, err = kv.Get(context.TODO(), "/cron/lock/job1"); err != nil {
78 fmt.Println(err)
79 return
80 }
81 if getResp.Count == 0 {
82 fmt.Println("kv过期了")
83 break
84 }
85 fmt.Println("还没过期:", getResp.Kvs)
86 time.Sleep(2 * time.Second)
87 }
88}
1package main
2
3import (
4 "context"
5 "fmt"
6 "go.etcd.io/etcd/clientv3"
7 "runtime"
8 "time"
9)
10
11func main() {
12 var (
13 config clientv3.Config
14 client *clientv3.Client
15 err error
16 lease clientv3.Lease
17 leaseGrantResp *clientv3.LeaseGrantResponse
18 leaseId clientv3.LeaseID
19 putResp *clientv3.PutResponse
20 getResp *clientv3.GetResponse
21 keepResp *clientv3.LeaseKeepAliveResponse
22 ok bool
23 keepRespChan <-chan *clientv3.LeaseKeepAliveResponse
24 kv clientv3.KV
25 )
26
27 config = clientv3.Config{
28 Endpoints: []string{"127.0.0.1:2379"}, // 集群列表
29 DialTimeout: 5 * time.Second,
30 }
31
32 // 建立一个客户端
33 if client, err = clientv3.New(config); err != nil {
34 fmt.Println(err)
35 return
36 }
37
38 // 申请一个lease(租约)
39 lease = clientv3.NewLease(client)
40
41 // 申请一个10秒的租约
42 if leaseGrantResp, err = lease.Grant(context.TODO(), 10); err != nil {
43 fmt.Println(err)
44 return
45 }
46 // 拿到租约的ID
47 leaseId = leaseGrantResp.ID
48 // 5秒后会取消自动续租 ,10秒生命周期 一共15秒
49 ctx, _ := context.WithTimeout(context.TODO(), time.Second*5)
50 if keepRespChan, err = lease.KeepAlive(ctx, leaseId); err != nil {
51 fmt.Println(err)
52 return
53 }
54 // 处理续约应答的协程, 5秒续租一次
55 go func() {
56 for {
57 select {
58
59 case keepResp, ok = <-keepRespChan:
60 if ok {
61 //每5秒会续租一次, 所以就会受到一次,2次应答
62 if keepResp != nil {
63 fmt.Println("收到自动续租应答:", keepResp.ID)
64 }
65 } else {
66 fmt.Println("租约已经失效了")
67 runtime.Goexit()
68 }
69
70 }
71 }
72 //END:
73 }()
74
75 // 获得kv API子集
76 kv = clientv3.NewKV(client)
77
78 // Put一个KV, 让它与租约关联起来, 从而实现10秒后自动过期
79 if putResp, err = kv.Put(context.TODO(), "/cron/lock/job1", "", clientv3.WithLease(leaseId)); err != nil {
80 fmt.Println(err)
81 return
82 }
83
84 fmt.Println("写入成功:", putResp.Header.Revision)
85
86 // 定时的看一下key过期了没有
87 for {
88 if getResp, err = kv.Get(context.TODO(), "/cron/lock/job1"); err != nil {
89 fmt.Println(err)
90 return
91 }
92 if getResp.Count == 0 {
93 fmt.Println("kv过期了")
94 break
95 }
96 fmt.Println("还没过期:", getResp.Kvs)
97 time.Sleep(2 * time.Second)
98 }
99}
监听kv
1package main
2
3import (
4 "go.etcd.io/etcd/clientv3"
5 "time"
6 "fmt"
7 "context"
8 "github.com/coreos/etcd/mvcc/mvccpb"
9)
10
11func main() {
12 var (
13 config clientv3.Config
14 client *clientv3.Client
15 err error
16 kv clientv3.KV
17 watcher clientv3.Watcher
18 getResp *clientv3.GetResponse
19 watchStartRevision int64
20 watchRespChan <-chan clientv3.WatchResponse
21 watchResp clientv3.WatchResponse
22 event *clientv3.Event
23 )
24
25 // 客户端配置
26 config = clientv3.Config{
27 Endpoints: []string{"127.0.0.1:2379"},
28 DialTimeout: 5 * time.Second,
29 }
30
31 // 建立连接
32 if client, err = clientv3.New(config); err != nil {
33 fmt.Println(err)
34 return
35 }
36
37 // KV
38 kv = clientv3.NewKV(client)
39
40 // 模拟etcd中KV的变化
41 go func() {
42 for {
43 kv.Put(context.TODO(), "/cron/jobs/job7", "i am job7")
44
45 kv.Delete(context.TODO(), "/cron/jobs/job7")
46
47 time.Sleep(1 * time.Second)
48 }
49 }()
50
51 // 先GET到当前的值,并监听后续变化
52 if getResp, err = kv.Get(context.TODO(), "/cron/jobs/job7"); err != nil {
53 fmt.Println(err)
54 return
55 }
56
57 // 现在key是存在的
58 if len(getResp.Kvs) != 0 {
59 fmt.Println("当前值:", string(getResp.Kvs[0].Value))
60 }
61
62 // 当前etcd集群事务ID, 单调递增的
63 watchStartRevision = getResp.Header.Revision + 1
64
65 // 创建一个watcher
66 watcher = clientv3.NewWatcher(client)
67
68 // 启动监听
69 fmt.Println("从该版本向后监听:", watchStartRevision)
70
71
72 // 5秒后 取消监听
73 ctx, cancelFunc := context.WithCancel(context.TODO())
74 time.AfterFunc(5 * time.Second, func() {
75 cancelFunc()
76 })
77
78 watchRespChan = watcher.Watch(ctx, "/cron/jobs/job7", clientv3.WithRev(watchStartRevision))
79
80 // 处理kv变化事件
81 for watchResp = range watchRespChan {
82 for _, event = range watchResp.Events {
83 switch event.Type {
84 case mvccpb.PUT:
85 fmt.Println("修改为:", string(event.Kv.Value), "Revision:", event.Kv.CreateRevision, event.Kv.ModRevision)
86 case mvccpb.DELETE:
87 fmt.Println("删除了", "Revision:", event.Kv.ModRevision)
88 }
89 }
90 }
91}
操作 kv
1package main
2
3import (
4 "go.etcd.io/etcd/clientv3"
5 "time"
6 "fmt"
7 "context"
8)
9
10func main() {
11 var (
12 config clientv3.Config
13 client *clientv3.Client
14 err error
15 kv clientv3.KV
16 putOp clientv3.Op
17 getOp clientv3.Op
18 opResp clientv3.OpResponse
19 )
20
21 // 客户端配置
22 config = clientv3.Config{
23 Endpoints: []string{"127.0.0.1:2379"},
24 DialTimeout: 5 * time.Second,
25 }
26
27 // 建立连接
28 if client, err = clientv3.New(config); err != nil {
29 fmt.Println(err)
30 return
31 }
32
33 kv = clientv3.NewKV(client)
34
35 // 创建Op: operation
36 putOp = clientv3.OpPut("/cron/jobs/job8", "123123123")
37
38 // 执行OP
39 if opResp, err = kv.Do(context.TODO(), putOp); err != nil {
40 fmt.Println(err)
41 return
42 }
43 // kv.Do(op)
44 // kv.Put
45 // kv.Get
46 // kv.Delete
47 fmt.Println("写入Revision:", opResp.Put().Header.Revision)
48
49 // 创建Op
50 getOp = clientv3.OpGet("/cron/jobs/job8")
51
52 // 执行OP
53 if opResp, err = kv.Do(context.TODO(), getOp); err != nil {
54 fmt.Println(err)
55 return
56 }
57
58 // 打印
59 fmt.Println("数据Revision:", opResp.Get().Kvs[0].ModRevision) // create rev == mod rev
60 fmt.Println("数据value:", string(opResp.Get().Kvs[0].Value))
61}
分布式锁
1package main
2
3import (
4 "go.etcd.io/etcd/clientv3"
5 "time"
6 "fmt"
7 "context"
8)
9
10func main() {
11 var (
12 config clientv3.Config
13 client *clientv3.Client
14 err error
15 lease clientv3.Lease
16 leaseGrantResp *clientv3.LeaseGrantResponse
17 leaseId clientv3.LeaseID
18 keepRespChan <-chan *clientv3.LeaseKeepAliveResponse
19 keepResp *clientv3.LeaseKeepAliveResponse
20 ctx context.Context
21 cancelFunc context.CancelFunc
22 kv clientv3.KV
23 txn clientv3.Txn
24 txnResp *clientv3.TxnResponse
25 )
26
27 // 客户端配置
28 config = clientv3.Config{
29 Endpoints: []string{"127.0.0.1:2379"},
30 DialTimeout: 5 * time.Second,
31 }
32
33 // 建立连接
34 if client, err = clientv3.New(config); err != nil {
35 fmt.Println(err)
36 return
37 }
38
39 // lease实现锁自动过期:
40 // op操作
41 // txn事务: if else then
42
43 // 1, 上锁 (创建租约, 自动续租, 拿着租约去抢占一个key)
44 lease = clientv3.NewLease(client)
45
46 // 申请一个5秒的租约
47 if leaseGrantResp, err = lease.Grant(context.TODO(), 5); err != nil {
48 fmt.Println(err)
49 return
50 }
51
52 // 拿到租约的ID
53 leaseId = leaseGrantResp.ID
54
55 // 准备一个用于取消自动续租的context
56 ctx, cancelFunc = context.WithCancel(context.TODO())
57
58 // 确保函数退出后, 自动续租会停止
59 defer cancelFunc()
60 defer lease.Revoke(context.TODO(), leaseId)
61
62 // 5秒后会取消自动续租
63 if keepRespChan, err = lease.KeepAlive(ctx, leaseId); err != nil {
64 fmt.Println(err)
65 return
66 }
67
68 // 处理续约应答的协程
69 go func() {
70 for {
71 select {
72 case keepResp = <- keepRespChan:
73 if keepRespChan == nil {
74 fmt.Println("租约已经失效了")
75 goto END
76 } else { // 每秒会续租一次, 所以就会受到一次应答
77 fmt.Println("收到自动续租应答:", keepResp.ID)
78 }
79 }
80 }
81 END:
82 }()
83
84 // if 不存在key, then 设置它, else 抢锁失败
85 kv = clientv3.NewKV(client)
86
87 // 创建事务
88 txn = kv.Txn(context.TODO())
89
90 // 定义事务
91
92 // 如果key不存在
93 txn.If(clientv3.Compare(clientv3.CreateRevision("/cron/lock/job9"), "=", 0)).
94 Then(clientv3.OpPut("/cron/lock/job9", "xxx", clientv3.WithLease(leaseId))).
95 Else(clientv3.OpGet("/cron/lock/job9")) // 否则抢锁失败
96
97 // 提交事务
98 if txnResp, err = txn.Commit(); err != nil {
99 fmt.Println(err)
100 return // 没有问题
101 }
102
103 // 判断是否抢到了锁
104 if !txnResp.Succeeded {
105 fmt.Println("锁被占用:", string(txnResp.Responses[0].GetResponseRange().Kvs[0].Value))
106 return
107 }
108
109 // 2, 处理业务
110
111 fmt.Println("处理任务")
112 time.Sleep(5 * time.Second)
113
114 // 3, 释放锁(取消自动续租, 释放租约)
115 // defer 会把租约释放掉, 关联的KV就被删除了
116}
封装后的代码
1package main
2
3import (
4 "context"
5 "fmt"
6 "go.etcd.io/etcd/clientv3"
7 "time"
8)
9
10type EtcdMutex struct {
11 Tlt int64 //租约时间
12 Conf clientv3.Config //配置
13 Key string //etcd的key
14 cancel context.CancelFunc //关闭租约
15 lease clientv3.Lease //释放
16 leaseID clientv3.LeaseID //释放编号
17 txn clientv3.Txn //标记
18}
19
20func (em *EtcdMutex) init() error {
21 var err error
22 var ctx context.Context
23 //新建一个上下文
24 client, err := clientv3.New(em.Conf)
25 if err != nil {
26 return nil
27 }
28 //新建一个kv,交易
29 em.txn = clientv3.NewKV(client).Txn(context.TODO())
30 em.lease = clientv3.NewLease(client)
31 leaseResp, err := em.lease.Grant(context.TODO(), em.Tlt)
32 if err != nil {
33 return nil
34 }
35 ctx, em.cancel = context.WithCancel(context.TODO())
36 em.leaseID = leaseResp.ID
37 _, err = em.lease.KeepAlive(ctx, em.leaseID)
38 return err
39
40}
41
42func (em *EtcdMutex) Lock() error {
43 err := em.init()
44 if err != nil {
45 return err
46 }
47 //锁定
48 em.txn.If(clientv3.Compare(clientv3.CreateRevision(em.Key), "=", 0)).Then(
49 clientv3.OpPut(em.Key, "", clientv3.WithLease(em.leaseID))).Else()
50 txnresp, err := em.txn.Commit() //递交交易
51 if err != nil {
52 return err
53 }
54 if !txnresp.Succeeded {
55 return fmt.Errorf("抢锁失败")
56 }
57
58 return nil
59
60}
61func (em *EtcdMutex) UnLock() {
62 em.cancel()
63 em.lease.Revoke(context.TODO(), em.leaseID)
64 fmt.Println("释放锁")
65}
66
67func main() {
68 var conf = clientv3.Config{
69 Endpoints: []string{"127.0.0.1:2379"},
70 DialTimeout: 5 * time.Second,
71 }
72
73 eMutex1 := &EtcdMutex{Conf: conf, Tlt: 10, Key: "lock"}
74 eMutex2 := &EtcdMutex{Conf: conf, Tlt: 10, Key: "lock"}
75
76 //线程1
77 go func() {
78 err := eMutex1.Lock()
79 if err != nil {
80 fmt.Println("线程1抢锁失败")
81 fmt.Println(err)
82 } else {
83 time.Sleep(time.Second)
84 fmt.Println("线程1抢锁成功")
85 time.Sleep(time.Second * 10)
86 defer eMutex1.UnLock()
87 fmt.Println("线程1结束")
88 }
89 }()
90
91 go func() {
92 err := eMutex2.Lock()
93 if err != nil {
94 fmt.Println("线程2抢锁失败")
95 fmt.Println(err)
96 }else{
97 time.Sleep(time.Second)
98 fmt.Println("线程2抢锁成功")
99 time.Sleep(time.Second * 30)
100 defer eMutex2.UnLock()
101 fmt.Println("线程2结束")
102 }
103 }()
104
105 time.Sleep(time.Second * 50)
106}