raft

Raft算法实现了一种复制状态机。每个分布式的状态机中存储了一份包含命令序列的日志文件,这些文件通过复制的形式传播到其他节点中。每个日志包含相同的命令,并且顺序也相同。状态机会按顺序执行这些命令并产生相同的状态,最终所有的状态机都将达到一个确定的最终状态。

raft是强一致的集群日志同步算法。

在Raft算法中,每一个节点会维护一份复制日志(Replicated Log),复制日志中存储了按顺序排列的条目(Entry),用户执行的每一个操作都会生成日志中的一个条目,稍后这个条目会通过节点之间的交流复制到所有节点上。

如果一个条目是被大多数节点认可的,那么这种条目被称为Committed Entry,这也是节点唯一会执行的条目类型。各个节点只要按顺序执行复制日志中的Committed Entry,最终就会到达相同的状态。这样,即便节点崩溃后苏醒,也可以快速恢复到和其他节点相同的状态。

总结一下Raft算法的核心思想就是,保证每个节点具有相同的复制日志,进而保证所有节点的最终状态是一致的。

基本原理

Raft中的节点有3种状态,领导者(Leader),候选人(Candidate)和跟随者(Follower)。

其中,Leader是大多数的节点选举产生的,并且节点的状态可以随着时间发生变化。某个Leader节点在领导的这段时期被称为任期(Term)。新的Term是从选举Leader时开始增加的,每次Candidate节点开始新的选举,Term都会加1。

如果Candidate选举成为了Leader,意味着它成为了这个Term后续时间的Leader。每一个节点会存储当前的Term,如果某一个节点当前的Term小于其他节点,那么节点会更新自己的Term为已知的最大Term。如果一个Candidate发现自己当前的Term过时了,它会立即变为Follower。

一般情况下(网络分区除外)在一个时刻只会存在一个Leader,其余的节点都是Follower。Leader会处理所有的客户端写请求(如果是客户端写请求到Follower,也会被转发到Leader处理),将操作作为一个Entry追加到复制日志中,并把日志复制到所有节点上。而Candidate则是节点选举时的过渡状态,用于自身拉票选举Leader。

Raft节点之间通过RPC(Remote Prcedure Cal,远程过程调用)来进行通信。Raft论文中指定了两种方法用于节点的通信,其中,RequestVote 方法由Candidate在选举时使用,AppendEntries则是Leader复制log到其他节点时使用,同时也可以用于心跳检测。RPC 方法可以是并发的,且支持失败重试。

Raft算法可以分为三个部分:选举、日志复制和异常处理。下面我们分阶段介绍一下。

选举与任期

在Raft中有一套心跳检测,只要Follower收到来自Leader或者Candidate的信息,它就会保持Follower的状态。但是如果Follower一段时间内没有收到RPC请求(例如可能是Leader挂了),新一轮选举的机会就来了。这时Follower会将当前Term加1并过渡到Candidate状态。它会给自己投票,并发送RequestVote RPC请求给其他的节点进行拉票。

Candidate的状态会持续,直到下面的三种情况发生。

  • 如果这个Candidate节点获得了大部分节点的支持,赢得选举变为了Leader。一旦它变为Leader,这个新的Leader节点就会向其他节点发送 AppendEntries RPC, 确认自己Leader的地位,终止选举。
  • 如果其他节点成为了Leader。它会收到其他节点的AppendEntries RPC。如果发现其他节点的当前Term比自己的大,则会变为Follower状态。
  • 如果有许多节点同时变为了Candidate,则可能会出现一段时间内没有节点能够选举成功的情况,这会导致选举超时。

为了快速解决并修复这第三种情况,Raft规定了每一个Candidate在选举前会重置一个随机的选举超时(Election Timeout)时间,这个随机时间会在一个区间内(例如150-300ms)。

随机时间保证了在大部分情况下,有一个唯一的节点首先选举超时,它会在大部分节点选举超时前发送心跳检测,赢得选举。如果一个Leader在心跳检测中发现另一个节点有更高的Term,它会转变为Follower,否则将一直保持Leader状态。

日志复制(Log Replication)

一个节点成为Leader之后,会开始接受来自客户端的请求。每一个客户端请求都包含一个节点的状态机将要执行的操作(Command)。Leader会将这个操作包装为一个Entry放入到log中,并通过AppendEntries RPC 发送给其他节点,要求其他节点把这个Entry添加到log中。

当Entry被复制到大多数节点之后,也就是被大部分的节点认可之后,这个Entry的状态就变为Committed。Raft算法会保证 Committed Entry 一定能够被所有节点的状态机执行。

一旦Follower通过RPC协议知道某一个Entry被commit了,Follower就可以按顺序执行log中的Committed Entry了。

如图所示,我们可以把log理解为Entry的集合。Entry中包含了Command命令(例如x←3),Entry所在的Term(方框里面的数字),以及每一个Entry的顺序编号(最上面标明的log index,顺序递增)。

图片

但这里还有一个重要的问题,就是Raft节点在日志复制的过程中需要保证日志数据的一致性。要实现这一点,需要确认下面几个关键的属性:

  • 如果不同节点的log中的Entry有相同的index和Term, 那么它们存储的一定是相同的Command;
  • 如果不同节点的log中的Entry有相同的index和Term,那么这个Entry之前所有的Entry都是相同的。

接下来我们就来看看,Raft算法是怎么在不可靠的分布式环境中保证数据一致性的。

在实际生产过程中,Raft算法可能会因为分布式系统中遇到的难题(例如节点崩溃),出现多种数据不一致的情况。如下所示,a → f分别代表Follower的复制日志中可能遇到的情况,方框中的方格表示当前节点复制日志中每一个Entry对应的Term序号。

图片

a → e的情况你可以想一想什么时候会发生

f这种情况可能是这样的:f是Term 2的Leader, 它添加Entry到log中之后,Entry还没有复制到其他节点,也就是说,还没等到commit就崩溃了。但是它快速恢复之后又变为了Term 3 的Leader, 再次添加Entry到log之后,没有commit又崩溃了。当f再次苏醒时,世界已然发生了巨变。

所以我们可以看到,在正常的情况下,Raft可以满足上面的两个属性,但是异常情况下,这种情况就可能被打破,出现数据不一致的情况。为了让数据保持最终一致,Raft算法会强制要求Follower的复制日志和Leader的复制日志一致,这样一来,Leader就必须要维护一个Entry index了。在这个Entry index之后的都是和Follower不相同的Entry,在这个Entry之前的都是和Follower一致的Entry。

Leader会为每一个Follower维护一份next index数组,里面标志了将要发送给Follower的下一个Entry的序号。最后,Follower会删除掉所有不同的Entry,并保留和Leader一致的复制日志,这一过程都会通过AppendEntries RPC 执行完毕。

不过,仅仅通过上面的措施还不足以保证数据的一致性。想想下图这个例子:

图片

从这张图可以看出,一个已经被Committed的Entry是有可能被覆盖掉的。例如在a阶段,节点s1成为了Leader,Entry 2还没有成为Committed。在b阶段,s1崩溃,s5成为了Leader ,添加Entry到自己的log中,但是仍然没有commit。在c阶段,s5崩溃,s1成为了Leader,而且在这个过程中Entry 2成为了Committed Entry。接着在d阶段s1崩溃,s5成为了Leader,它会将本已commit的Entry 2给覆盖掉。但我们真正想期望的是e这种情况。

怎么解决这个问题呢?Raft使用了一种简单的方法。Raft为Leader添加了下面几个限制:

  • 要成为Leader必须要包含过去所有的Committed Entry;
  • Candidate要想成为Leader,必须要经过大部分Follower节点的同意。而当Entry成为Committed Entry时,表明该Entry其实已经存在于大部分节点中了,所以这个Committed Entry 会出现在至少一个Follower节点中。因此我们可以证明,当前Follower节点中,至少有一个节点是包含了上一个Leader节点的所有Committed Entry的。Raft算法规定,只有当一个Follower节点的复制日志是最新的(如果复制日志的Term最大,则其日志最新,如果Term相同,那么越长的复制日志越新),它才可能成为Leader。

日志特性

  1. 如果在不同的日志中的两个日志条目的索引索引下标 相同,那么他们的指令就是相同的。(原因:leader 最多在一个任期里的一个日志索引位置创建一条日志条目,日志条目在日志的位置从来不会改变
  2. 如果在不同的日志里的 2 个日志条目拥有相同的任期号和索引,那么他们之前的日志项都是相同的。(原因:每次 RPC 发送附加日志时,leader 会把这条日志条目的前面的日志的下标和任期号一起发送给 follower,如果 follower 发现和自己的日志不匹配,那么就拒绝接受这条日志,这个称之为一致性检查)。 2.1 这里需要提一下 Raft 的日志匹配规则:如果 2 个日志的相同的索引位置的日志条目的任期号相同,那么 Raft 就认为这个日志从头到这个索引之间全部相同 ,这个非常重要。
  • 选举leader需要半数以上节点参与
  • 节点commit 日志最多允许选举为leader
  • commit (提交)日志一样多,则term, index越大选举为leader

raft保证

  • 提交成功的请求,一定不会丢
  • 各个节点的数据将最终一致

etcd

将数据存储在集群中的高可用k-v存储

允许应用实时监听存储中的k-v 的变化

能够容忍单点故障,能够应对网络分区

使用etcd的场景默认处理的数据都是控制数据,对于应用数据,只推荐数据量很小,但是更新访问频繁的情况

etcd与raft的关系

raft是强一致的集群日志同步算法

etcd是一个分布式kv存储

etcd利用raft算法在集群中同步key-value

抽屉理论

60人 31人又苹果。取30人那么其中一定有一个有苹果

quorum 模型

集群至少有2n+1的节点

Leader, Follower, Follower

分2步骤

  • 1.日志复制给n+1节点后,本地提交返回客户端(同步) (写入性能差(1000次/秒))

  • 2.异步通知所有follower完成提交

交互协议

sdk内置grpc协议,性能高效

重要特性

  • 底层存储是按key有序排列的,可以顺序遍历
  • 因为key有序,所以etcd支持按目录结构高效遍历
  • 支持复杂事物,提供if…then..else…的事务能力
  • 基于租约机制实现key的ttl过期

mvcc 多版本控制

1put /key1 value1 -> revision=1 
2put /key2 value2 -> revision=2
3put /keys value3 -> revision=3

提交的版本(revision) 在ectd中是单调递增的

同key维护多个历史版本,用于watch机制

历史版本过多,可以通过compact命令完成删减

监听kv变化

通过watch机制,可以监听某个key或某个目录(key前缀)的连续变化

常用于分布式系统的配置分发,状态同步

watch 工作原理

租约lease

请求10秒的租约,返回租约id5,然后把数据a=1和租约id存储到 数据库中。过期了,etcd会去删除。也可以通过sdk 续租。

安装

mac安装

 1# mac安装
 2ETCD_VER=v3.3.18
 3
 4# choose either URL
 5GOOGLE_URL=https://storage.googleapis.com/etcd
 6GITHUB_URL=https://github.com/etcd-io/etcd/releases/download
 7DOWNLOAD_URL=${GOOGLE_URL}
 8
 9rm -f /tmp/etcd-${ETCD_VER}-darwin-amd64.zip
10rm -rf /tmp/etcd-download-test && mkdir -p /tmp/etcd-download-test
11
12curl -L ${DOWNLOAD_URL}/${ETCD_VER}/etcd-${ETCD_VER}-darwin-amd64.zip -o /tmp/etcd-${ETCD_VER}-darwin-amd64.zip
13unzip /tmp/etcd-${ETCD_VER}-darwin-amd64.zip -d /tmp && rm -f /tmp/etcd-${ETCD_VER}-darwin-amd64.zip
14mv /tmp/etcd-${ETCD_VER}-darwin-amd64/* /tmp/etcd-download-test && rm -rf mv /tmp/etcd-${ETCD_VER}-darwin-amd64
15
16/tmp/etcd-download-test/etcd --version
17/tmp/etcd-download-test/etcdctl version
18
19mv /tmp/etcd-download-test/etcd /usr/local/bin/
20mv /tmp/etcd-download-test/etcdctl /usr/local/bin/

启动etcd

1nohup etcd --listen-client-urls "http://0.0.0.0:2379" --advertise-client-urls  "http://0.00.0:2379" &

命令行操作etcd

1ETCDCTL_API=3 etcdctl
2ETCDCTL_API=3 etcdctl put "name" "mao"
3ETCDCTL_API=3 etcdctl get "name" 
4ETCDCTL_API=3 etcdctl del "name" 
5ETCDCTL_API=3 etcdctl put "/cron/jobs/job1" "{'msg':'ok'}"
6ETCDCTL_API=3 etcdctl put "/cron/jobs/job2" "{'msg':'ok'}"
7ETCDCTL_API=3 etcdctl get "/cron/jobs/"  --prefix  
8ETCDCTL_API=3 etcdctl watch "/cron/jobs" --prefix #阻塞监听/cron/jobs 为前缀的key 。
9ETCDCTL_API=3 etcdctl del "/cron/jobs" --prefix  #删除

golang操作etcd

客户端安装 go get go.etcd.io/etcd/clientv3

etcd客户端安装失败

https://www.codercto.com/a/108257.html

1// go.mod 增加下面代码
2
3replace (
4   github.com/coreos/bbolt => go.etcd.io/bbolt v1.3.4
5   google.golang.org/grpc => google.golang.org/grpc v1.26.0
6)

写入kv

 1package main
 2
 3import (
 4	"context"
 5	"fmt"
 6	"go.etcd.io/etcd/clientv3"
 7	"time"
 8)
 9
10func main() {
11	var (
12		config  clientv3.Config
13		client  *clientv3.Client
14		err     error
15		ctx     context.Context
16		kv      clientv3.KV
17		putResp *clientv3.PutResponse
18	)
19	config = clientv3.Config{
20		Endpoints:   []string{"localhost:2379"}, //集群列表
21		DialTimeout: 5 * time.Second,
22	}
23	if client, err = clientv3.New(config); err != nil {
24		fmt.Println(err)
25	}
26
27	defer client.Close()
28	ctx = context.TODO()
29	kv = clientv3.NewKV(client)
30  
31  	//写入hello 
32	kv.Put(, "/cron/jobs/job1", "hello")
33  
34	if putResp, err = kv.Put(ctx, "/cron/jobs/job1", "bye", clientv3.WithPrevKV()); err != nil {
35			fmt.Println(err)
36	} else {
37		fmt.Println("Revision:", putResp.Header.Revision)
38		if putResp.PrevKv != nil { // 打印hello
39			fmt.Println("PrevValue:", string(putResp.PrevKv.Value))
40		}
41	}
42
43	if putResp, err = kv.Put(ctx, "/cron/jobs/job2", "111", clientv3.WithPrevKV()); err != nil {
44		fmt.Println(err)
45	} else {
46		fmt.Println("Revision:", putResp.Header.Revision)
47		if putResp.PrevKv != nil { // 打印hello
48			fmt.Println("PrevValue:", string(putResp.PrevKv.Value))
49		}
50	}
51}

获得,前缀获得

 1package main
 2
 3import (
 4	"go.etcd.io/etcd/clientv3"
 5	"time"
 6	"fmt"
 7	"context"
 8)
 9
10func main() {
11	var (
12		config clientv3.Config
13		client *clientv3.Client
14		err error
15		kv clientv3.KV
16		getResp *clientv3.GetResponse
17	)
18
19	config = clientv3.Config{
20		Endpoints: []string{"127.0.0.1:2379"}, // 集群列表
21		DialTimeout: 5 * time.Second,
22	}
23
24	// 建立一个客户端
25	if client, err = clientv3.New(config); err != nil {
26		fmt.Println(err)
27		return
28	}
29
30	// 用于读写etcd的键值对
31	kv = clientv3.NewKV(client)
32
33	if getResp, err = kv.Get(context.TODO(), "/cron/jobs/job1", /*clientv3.WithCountOnly()*/); err != nil {
34		fmt.Println(err)
35	} else {
36		fmt.Println(getResp.Kvs, getResp.Count)
37	}
38	//  WithPrefix()能获得到 demo1中 /cron/jobs/job1  /cron/jobs/job2 的信息
39	//if getResp, err = kv.Get(context.TODO(), "/cron/jobs/", clientv3.WithPrefix()); err != nil {
40	//	fmt.Println(err)
41	//} else {
42	//	fmt.Println(getResp.Kvs, getResp.Count)
43	//}
44}

删除

 1package main
 2//删除key
 3import (
 4	"context"
 5	"fmt"
 6	"github.com/coreos/etcd/mvcc/mvccpb"
 7	"go.etcd.io/etcd/clientv3"
 8	"time"
 9)
10
11func main() {
12	var (
13		config  clientv3.Config
14		client  *clientv3.Client
15		err     error
16		kv      clientv3.KV
17		delResp *clientv3.DeleteResponse
18		kvpair  *mvccpb.KeyValue
19	)
20
21	config = clientv3.Config{
22		Endpoints:   []string{"127.0.0.1:2379"}, // 集群列表
23		DialTimeout: 5 * time.Second,
24	}
25
26	// 建立一个客户端
27	if client, err = clientv3.New(config); err != nil {
28		fmt.Println(err)
29		return
30	}
31
32	// 用于读写etcd的键值对
33	kv = clientv3.NewKV(client)
34
35	// 删除KV
36	if delResp, err = kv.Delete(context.TODO(), "/cron/jobs/job1", clientv3.WithPrevKV()); err != nil {
37		fmt.Println(err)
38		return
39	}
40
41	// 被删除之前的value是什么
42	if len(delResp.PrevKvs) != 0 {
43		for _, kvpair = range delResp.PrevKvs {
44			fmt.Println("删除了:", string(kvpair.Key), string(kvpair.Value))
45		}
46	}
47  
48  	// WithFromKey 能把 /cron/jobs/job1    /cron/jobs/job2 都删除
49	//if delResp, err = kv.Delete(context.TODO(), "/cron/jobs/job1", clientv3.WithFromKey()); err != nil {
50	//	fmt.Println(err)
51	//	return
52	//}
53}

续租, 带过期时间的续租

 1package main
 2
 3import (
 4	"go.etcd.io/etcd/clientv3"
 5	"time"
 6	"fmt"
 7	"context"
 8)
 9
10func main() {
11	var (
12		config clientv3.Config
13		client *clientv3.Client
14		err error
15		lease clientv3.Lease
16		leaseGrantResp *clientv3.LeaseGrantResponse
17		leaseId clientv3.LeaseID
18		putResp *clientv3.PutResponse
19		getResp *clientv3.GetResponse
20		keepResp *clientv3.LeaseKeepAliveResponse
21		keepRespChan <-chan *clientv3.LeaseKeepAliveResponse
22		kv clientv3.KV
23	)
24	config = clientv3.Config{
25		Endpoints: []string{"127.0.0.1:2379"}, // 集群列表
26		DialTimeout: 5 * time.Second,
27	}
28	// 建立一个客户端
29	if client, err = clientv3.New(config); err != nil {
30		fmt.Println(err)
31		return
32	}
33	// 申请一个lease(租约)
34	lease = clientv3.NewLease(client)
35	// 申请一个10秒的租约
36	if leaseGrantResp, err = lease.Grant(context.TODO(), 10); err != nil {
37		fmt.Println(err)
38		return
39	}
40	// 拿到租约的ID
41	leaseId = leaseGrantResp.ID
42	//一秒续租一次
43	if keepRespChan, err = lease.KeepAlive(context.TODO(), leaseId); err != nil {
44		fmt.Println(err)
45		return
46	}
47
48	// 处理续约应答的协程, 一秒续租一次
49	go func() {
50		for {
51			select {
52			case keepResp = <- keepRespChan:
53				if keepRespChan == nil {
54					fmt.Println("租约已经失效了")
55					goto END
56				} else {	// 每秒会续租一次, 所以就会受到一次应答
57					fmt.Println("收到自动续租应答:", keepResp.ID)
58				}
59			}
60		}
61	END:
62	}()
63
64	// 获得kv API子集
65	kv = clientv3.NewKV(client)
66
67	// Put一个KV, 让它与租约关联起来, 从而实现10秒后自动过期
68	if putResp, err = kv.Put(context.TODO(), "/cron/lock/job1", "", clientv3.WithLease(leaseId)); err != nil {
69		fmt.Println(err)
70		return
71	}
72
73	fmt.Println("写入成功:", putResp.Header.Revision)
74
75	// 定时的看一下key过期了没有
76	for {
77		if getResp, err = kv.Get(context.TODO(), "/cron/lock/job1"); err != nil {
78			fmt.Println(err)
79			return
80		}
81		if getResp.Count == 0 {
82			fmt.Println("kv过期了")
83			break
84		}
85		fmt.Println("还没过期:", getResp.Kvs)
86		time.Sleep(2 * time.Second)
87	}
88}
 1package main
 2
 3import (
 4	"context"
 5	"fmt"
 6	"go.etcd.io/etcd/clientv3"
 7	"runtime"
 8	"time"
 9)
10
11func main() {
12	var (
13		config         clientv3.Config
14		client         *clientv3.Client
15		err            error
16		lease          clientv3.Lease
17		leaseGrantResp *clientv3.LeaseGrantResponse
18		leaseId        clientv3.LeaseID
19		putResp        *clientv3.PutResponse
20		getResp        *clientv3.GetResponse
21		keepResp       *clientv3.LeaseKeepAliveResponse
22		ok             bool
23		keepRespChan   <-chan *clientv3.LeaseKeepAliveResponse
24		kv             clientv3.KV
25	)
26
27	config = clientv3.Config{
28		Endpoints:   []string{"127.0.0.1:2379"}, // 集群列表
29		DialTimeout: 5 * time.Second,
30	}
31
32	// 建立一个客户端
33	if client, err = clientv3.New(config); err != nil {
34		fmt.Println(err)
35		return
36	}
37
38	// 申请一个lease(租约)
39	lease = clientv3.NewLease(client)
40
41	// 申请一个10秒的租约
42	if leaseGrantResp, err = lease.Grant(context.TODO(), 10); err != nil {
43		fmt.Println(err)
44		return
45	}
46	// 拿到租约的ID
47	leaseId = leaseGrantResp.ID
48	// 5秒后会取消自动续租 ,10秒生命周期 一共15秒
49	ctx, _ := context.WithTimeout(context.TODO(), time.Second*5)
50	if keepRespChan, err = lease.KeepAlive(ctx, leaseId); err != nil {
51		fmt.Println(err)
52		return
53	}
54	// 处理续约应答的协程, 5秒续租一次
55	go func() {
56		for {
57			select {
58
59			case keepResp, ok = <-keepRespChan:
60				if ok {
61					//每5秒会续租一次, 所以就会受到一次,2次应答
62					if keepResp != nil {
63						fmt.Println("收到自动续租应答:", keepResp.ID)
64					}
65				} else {
66					fmt.Println("租约已经失效了")
67					runtime.Goexit()
68				}
69
70			}
71		}
72		//END:
73	}()
74
75	// 获得kv API子集
76	kv = clientv3.NewKV(client)
77
78	// Put一个KV, 让它与租约关联起来, 从而实现10秒后自动过期
79	if putResp, err = kv.Put(context.TODO(), "/cron/lock/job1", "", clientv3.WithLease(leaseId)); err != nil {
80		fmt.Println(err)
81		return
82	}
83
84	fmt.Println("写入成功:", putResp.Header.Revision)
85
86	// 定时的看一下key过期了没有
87	for {
88		if getResp, err = kv.Get(context.TODO(), "/cron/lock/job1"); err != nil {
89			fmt.Println(err)
90			return
91		}
92		if getResp.Count == 0 {
93			fmt.Println("kv过期了")
94			break
95		}
96		fmt.Println("还没过期:", getResp.Kvs)
97		time.Sleep(2 * time.Second)
98	}
99}

监听kv

 1package main
 2
 3import (
 4	"go.etcd.io/etcd/clientv3"
 5	"time"
 6	"fmt"
 7	"context"
 8	"github.com/coreos/etcd/mvcc/mvccpb"
 9)
10
11func main() {
12	var (
13		config clientv3.Config
14		client *clientv3.Client
15		err error
16		kv clientv3.KV
17		watcher clientv3.Watcher
18		getResp *clientv3.GetResponse
19		watchStartRevision int64
20		watchRespChan <-chan clientv3.WatchResponse
21		watchResp clientv3.WatchResponse
22		event *clientv3.Event
23	)
24
25	// 客户端配置
26	config = clientv3.Config{
27		Endpoints: []string{"127.0.0.1:2379"},
28		DialTimeout: 5 * time.Second,
29	}
30
31	// 建立连接
32	if client, err = clientv3.New(config); err != nil {
33		fmt.Println(err)
34		return
35	}
36
37	// KV
38	kv = clientv3.NewKV(client)
39
40	// 模拟etcd中KV的变化
41	go func() {
42		for {
43			kv.Put(context.TODO(), "/cron/jobs/job7", "i am job7")
44
45			kv.Delete(context.TODO(), "/cron/jobs/job7")
46
47			time.Sleep(1 * time.Second)
48		}
49	}()
50
51	// 先GET到当前的值,并监听后续变化
52	if getResp, err = kv.Get(context.TODO(), "/cron/jobs/job7"); err != nil {
53		fmt.Println(err)
54		return
55	}
56
57	// 现在key是存在的
58	if len(getResp.Kvs) != 0 {
59		fmt.Println("当前值:", string(getResp.Kvs[0].Value))
60	}
61
62	// 当前etcd集群事务ID, 单调递增的
63	watchStartRevision = getResp.Header.Revision + 1
64
65	// 创建一个watcher
66	watcher = clientv3.NewWatcher(client)
67
68	// 启动监听
69	fmt.Println("从该版本向后监听:", watchStartRevision)
70
71
72	// 5秒后 取消监听
73	ctx, cancelFunc := context.WithCancel(context.TODO())
74	time.AfterFunc(5 * time.Second, func() {
75		cancelFunc()
76	})
77
78	watchRespChan = watcher.Watch(ctx, "/cron/jobs/job7", clientv3.WithRev(watchStartRevision))
79
80	// 处理kv变化事件
81	for watchResp = range watchRespChan {
82		for _, event = range watchResp.Events {
83			switch event.Type {
84			case mvccpb.PUT:
85				fmt.Println("修改为:", string(event.Kv.Value), "Revision:", event.Kv.CreateRevision, event.Kv.ModRevision)
86			case mvccpb.DELETE:
87				fmt.Println("删除了", "Revision:", event.Kv.ModRevision)
88			}
89		}
90	}
91}

操作 kv

 1package main
 2
 3import (
 4	"go.etcd.io/etcd/clientv3"
 5	"time"
 6	"fmt"
 7	"context"
 8)
 9
10func main() {
11	var (
12		config clientv3.Config
13		client *clientv3.Client
14		err error
15		kv clientv3.KV
16		putOp clientv3.Op
17		getOp clientv3.Op
18		opResp clientv3.OpResponse
19	)
20
21	// 客户端配置
22	config = clientv3.Config{
23		Endpoints: []string{"127.0.0.1:2379"},
24		DialTimeout: 5 * time.Second,
25	}
26
27	// 建立连接
28	if client, err = clientv3.New(config); err != nil {
29		fmt.Println(err)
30		return
31	}
32
33	kv = clientv3.NewKV(client)
34
35	// 创建Op: operation
36	putOp = clientv3.OpPut("/cron/jobs/job8", "123123123")
37
38	// 执行OP
39	if opResp, err = kv.Do(context.TODO(), putOp); err != nil {
40		fmt.Println(err)
41		return
42	}
43	// kv.Do(op)
44	// kv.Put
45	// kv.Get
46	// kv.Delete
47	fmt.Println("写入Revision:", opResp.Put().Header.Revision)
48
49	// 创建Op
50	getOp = clientv3.OpGet("/cron/jobs/job8")
51
52	// 执行OP
53	if opResp, err = kv.Do(context.TODO(), getOp); err != nil {
54		fmt.Println(err)
55		return
56	}
57
58	// 打印
59	fmt.Println("数据Revision:", opResp.Get().Kvs[0].ModRevision)	// create rev == mod rev
60	fmt.Println("数据value:", string(opResp.Get().Kvs[0].Value))
61}

分布式锁

  1package main
  2
  3import (
  4	"go.etcd.io/etcd/clientv3"
  5	"time"
  6	"fmt"
  7	"context"
  8)
  9
 10func main() {
 11	var (
 12		config clientv3.Config
 13		client *clientv3.Client
 14		err error
 15		lease clientv3.Lease
 16		leaseGrantResp *clientv3.LeaseGrantResponse
 17		leaseId clientv3.LeaseID
 18		keepRespChan <-chan *clientv3.LeaseKeepAliveResponse
 19		keepResp *clientv3.LeaseKeepAliveResponse
 20		ctx context.Context
 21		cancelFunc context.CancelFunc
 22		kv clientv3.KV
 23		txn clientv3.Txn
 24		txnResp *clientv3.TxnResponse
 25	)
 26
 27	// 客户端配置
 28	config = clientv3.Config{
 29		Endpoints: []string{"127.0.0.1:2379"},
 30		DialTimeout: 5 * time.Second,
 31	}
 32
 33	// 建立连接
 34	if client, err = clientv3.New(config); err != nil {
 35		fmt.Println(err)
 36		return
 37	}
 38
 39	// lease实现锁自动过期:
 40	// op操作
 41	// txn事务: if else then
 42
 43	// 1, 上锁 (创建租约, 自动续租, 拿着租约去抢占一个key)
 44	lease = clientv3.NewLease(client)
 45
 46	// 申请一个5秒的租约
 47	if leaseGrantResp, err = lease.Grant(context.TODO(), 5); err != nil {
 48		fmt.Println(err)
 49		return
 50	}
 51
 52	// 拿到租约的ID
 53	leaseId = leaseGrantResp.ID
 54
 55	// 准备一个用于取消自动续租的context
 56	ctx, cancelFunc = context.WithCancel(context.TODO())
 57
 58	// 确保函数退出后, 自动续租会停止
 59	defer cancelFunc()
 60	defer lease.Revoke(context.TODO(), leaseId)
 61
 62	// 5秒后会取消自动续租
 63	if keepRespChan, err = lease.KeepAlive(ctx, leaseId); err != nil {
 64		fmt.Println(err)
 65		return
 66	}
 67
 68	// 处理续约应答的协程
 69	go func() {
 70		for {
 71			select {
 72			case keepResp = <- keepRespChan:
 73				if keepRespChan == nil {
 74					fmt.Println("租约已经失效了")
 75					goto END
 76				} else {	// 每秒会续租一次, 所以就会受到一次应答
 77					fmt.Println("收到自动续租应答:", keepResp.ID)
 78				}
 79			}
 80		}
 81	END:
 82	}()
 83
 84	//  if 不存在key, then 设置它, else 抢锁失败
 85	kv = clientv3.NewKV(client)
 86
 87	// 创建事务
 88	txn = kv.Txn(context.TODO())
 89
 90	// 定义事务
 91
 92	// 如果key不存在
 93	txn.If(clientv3.Compare(clientv3.CreateRevision("/cron/lock/job9"), "=", 0)).
 94		Then(clientv3.OpPut("/cron/lock/job9", "xxx", clientv3.WithLease(leaseId))).
 95		Else(clientv3.OpGet("/cron/lock/job9")) // 否则抢锁失败
 96
 97	// 提交事务
 98	if txnResp, err = txn.Commit(); err != nil {
 99		fmt.Println(err)
100		return // 没有问题
101	}
102
103	// 判断是否抢到了锁
104	if !txnResp.Succeeded {
105		fmt.Println("锁被占用:", string(txnResp.Responses[0].GetResponseRange().Kvs[0].Value))
106		return
107	}
108
109	// 2, 处理业务
110
111	fmt.Println("处理任务")
112	time.Sleep(5 * time.Second)
113
114	// 3, 释放锁(取消自动续租, 释放租约)
115	// defer 会把租约释放掉, 关联的KV就被删除了
116}

封装后的代码

  1package main
  2
  3import (
  4	"context"
  5	"fmt"
  6	"go.etcd.io/etcd/clientv3"
  7	"time"
  8)
  9
 10type EtcdMutex struct {
 11	Tlt     int64              //租约时间
 12	Conf    clientv3.Config    //配置
 13	Key     string             //etcd的key
 14	cancel  context.CancelFunc //关闭租约
 15	lease   clientv3.Lease     //释放
 16	leaseID clientv3.LeaseID   //释放编号
 17	txn     clientv3.Txn       //标记
 18}
 19
 20func (em *EtcdMutex) init() error {
 21	var err error
 22	var ctx context.Context
 23	//新建一个上下文
 24	client, err := clientv3.New(em.Conf)
 25	if err != nil {
 26		return nil
 27	}
 28	//新建一个kv,交易
 29	em.txn = clientv3.NewKV(client).Txn(context.TODO())
 30	em.lease = clientv3.NewLease(client)
 31	leaseResp, err := em.lease.Grant(context.TODO(), em.Tlt)
 32	if err != nil {
 33		return nil
 34	}
 35	ctx, em.cancel = context.WithCancel(context.TODO())
 36	em.leaseID = leaseResp.ID
 37	_, err = em.lease.KeepAlive(ctx, em.leaseID)
 38	return err
 39
 40}
 41
 42func (em *EtcdMutex) Lock() error {
 43	err := em.init()
 44	if err != nil {
 45		return err
 46	}
 47	//锁定
 48	em.txn.If(clientv3.Compare(clientv3.CreateRevision(em.Key), "=", 0)).Then(
 49		clientv3.OpPut(em.Key, "", clientv3.WithLease(em.leaseID))).Else()
 50	txnresp, err := em.txn.Commit() //递交交易
 51	if err != nil {
 52		return err
 53	}
 54	if !txnresp.Succeeded {
 55		return fmt.Errorf("抢锁失败")
 56	}
 57
 58	return nil
 59
 60}
 61func (em *EtcdMutex) UnLock() {
 62	em.cancel()
 63	em.lease.Revoke(context.TODO(), em.leaseID)
 64	fmt.Println("释放锁")
 65}
 66
 67func main() {
 68	var conf = clientv3.Config{
 69		Endpoints:   []string{"127.0.0.1:2379"},
 70		DialTimeout: 5 * time.Second,
 71	}
 72
 73	eMutex1 := &EtcdMutex{Conf: conf, Tlt: 10, Key: "lock"}
 74	eMutex2 := &EtcdMutex{Conf: conf, Tlt: 10, Key: "lock"}
 75
 76	//线程1
 77	go func() {
 78		err := eMutex1.Lock()
 79		if err != nil {
 80			fmt.Println("线程1抢锁失败")
 81			fmt.Println(err)
 82		} else {
 83			time.Sleep(time.Second)
 84			fmt.Println("线程1抢锁成功")
 85			time.Sleep(time.Second * 10)
 86			defer eMutex1.UnLock()
 87			fmt.Println("线程1结束")
 88		}
 89	}()
 90
 91	go func() {
 92		err := eMutex2.Lock()
 93		if err != nil {
 94			fmt.Println("线程2抢锁失败")
 95			fmt.Println(err)
 96		}else{
 97			time.Sleep(time.Second)
 98			fmt.Println("线程2抢锁成功")
 99			time.Sleep(time.Second * 30)
100			defer eMutex2.UnLock()
101			fmt.Println("线程2结束")
102		}
103	}()
104
105	time.Sleep(time.Second * 50)
106}