解析高可用分布式键值存储 etcd 的原理
这篇文章将会介绍 etcd 的实现原理,其中包括 Raft 协议、存储两大模块,在最后我们也会简单介绍 etcd 一些具体应用场景。etcd 的官方将它定位成一个可信赖的分布式键值存储服务,它能够为整个分布式集群存储一些关键数据,协助分布式集群的正常运转。
我们可以简单看一下 etcd 和 Zookeeper 在定义上有什么不同:
etcd is a distributed> ZooKeeper is a centralized service for maintaining configuration information, naming, providing distributed synchronization, and providing group services.
其中前者是一个用于存储关键数据的键值存储,后者是一个用于管理配置等信息的中心化服务。
etcd 的使用其实非常简单,它对外提供了 gRPC 接口,我们可以通过 Protobuf 和 gRPC 直接对 etcd 中存储的数据进行管理,也可以使用官方提供的 etcdctl 操作存储的数据。
service KV {
rpc Range(RangeRequest) returns (RangeResponse) {
option (google.api.http) = {
post: "/v3beta/kv/range"
body: "*"
};
}
rpc Put(PutRequest) returns (PutResponse) {
option (google.api.http) = {
post: "/v3beta/kv/put"
body: "*"
};
}
}
文章并不会展开介绍 etcd 的使用方法,这一小节将逐步介绍几大核心模块的实现原理,包括 etcd 使用 Raft 协议同步各个节点数据的过程以及 etcd 底层存储数据使用的结构。
在每一个分布式系统中,etcd 往往都扮演了非常重要的地位,由于很多服务配置发现以及配置的信息都存储在 etcd 中,所以整个集群可用性的上限往往就是 etcd 的可用性,而使用 3 ~ 5 个 etcd 节点构成高可用的集群往往都是常规操作。
正是因为 etcd 在使用的过程中会启动多个节点,如何处理几个节点之间的分布式一致性就是一个比较有挑战的问题了。
解决多个节点数据一致性的方案其实就是共识算法,在之前的文章中我们简单介绍过 Zookeeper 使用的Zab 协议 以及常见的共识算法 Paxos 和 Raft,etcd 使用的共识算法就是 Raft,这一节我们将详细介绍 Raft 以及 etcd 中 Raft 的一些实现细节。
Raft 从一开始就被设计成一个易于理解和实现的共识算法,它在容错和性能上与 Paxos 协议比较类似,区别在于它将分布式一致性的问题分解成了几个子问题,然后一一进行解决。
每一个 Raft 集群中都包含多个服务器,在任意时刻,每一台服务器只可能处于 Leader、Follower 以及 Candidate 三种状态;在处于正常的状态时,集群中只会存在一个 Leader,其余的服务器都是 Follower。
上述图片修改自 In Search of an Understandable Consensus Algorithm 一文 5.1 小结中图四。
所有的 Follower 节点都是被动的,它们不会主动发出任何的请求,只会响应 Leader 和 Candidate 发出的请求,对于每一个用户的可变操作,都会被路由给 Leader 节点进行处理,除了 Leader 和 Follower 节点之外,Candidate 节点其实只是集群运行过程中的一个临时状态。
Raft 集群中的时间也被切分成了不同的几个任期(Term),每一个任期都会由 Leader 的选举开始,选举结束后就会进入正常操作的阶段,直到 Leader 节点出现问题才会开始新一轮的选择。
每一个服务器都会存储当前集群的最新任期,它就像是一个单调递增的逻辑时钟,能够同步各个节点之间的状态,当前节点持有的任期会随着 每一个 请求被传递到其他的节点上。
Raft 协议在每一个任期的开始时都会从一个集群中选出一个节点作为集群的 Leader 节点,这个节点会负责集群中的日志的复制以及管理工作。
我们将 Raft 协议分成三个子问题:节点选举、日志复制以及安全性,文章会以 etcd 为例介绍 Raft 协议是如何解决这三个子问题的。
节点选举
在此我向大家推荐一个架构学习交流群。交流学习群号:821169538里面会分享一些资深架构师录制的视频录像:有Spring,MyBatis,Netty源码分析,高并发、高性能、分布式、微服务架构的原理,JVM性能优化、分布式架构等这些成为架构师必备的知识体系。还能领取免费的学习资源,目前受益良多。
使用 Raft 协议的 etcd 集群在启动节点时,会遵循 Raft 协议的规则,所有节点一开始都被初始化为 Follower 状态,新加入的节点会在 NewNode 中做一些配置的初始化,包括用于接收各种信息的 Channel:
// https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/raft/node.go#L190-225
func StartNode(c *Config, peers []Peer) Node {
r := newRaft(c)
r.becomeFollower(1, None)
r.raftLog.committed = r.raftLog.lastIndex()
for _, peer := range peers {
r.addNode(peer.ID)
}
n := newNode()
go n.run(r)
return &n
}
在做完这些初始化的节点和 Raft 配置的事情之后,就会进入一个由 for 和 select 组成的超大型循环,这个循环会从 Channel 中获取待处理的事件:
// https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/raft/node.go#L291-423
func (n node) run(r raft) {
lead := None
for { if lead != r.lead {
lead = r.lead
}
select {
case m := continue
}
r.send(pb.Message{Term: r.Term, To:>
}
}
当前节点会立刻调用 becomeCandidate 将当前节点的 Raft 状态变成候选人;在这之后,它会将票投给自己,如果当前集群只有一个节点,该节点就会直接成为集群中的 Leader 节点。
如果集群中存在了多个节点,就会向集群中的其他节点发出 MsgVote 消息,请求其他节点投票,在 Step 函数中包含不同状态的节点接收到消息时的响应:
// https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/raft/raft.go#L785-927
func (r *raft) Step(m pb.Message) error {
// ...
switch m.Type {
case pb.MsgVote, pb.MsgPreVote:
canVote := r.Vote == m.From || (r.Vote == None && r.lead == None)
if canVote && r.raftLog.isUpToDate(m.Index, m.LogTerm) {
r.send(pb.Message{To: m.From, Term: m.Term, Type: pb.MsgVoteResp})
r.electionElapsed = 0
r.Vote = m.From
} else {
r.send(pb.Message{To: m.From, Term: r.Term, Type: pb.MsgVoteResp, Reject: true})
}
}
// ...
return nil
}
如果当前节点投的票就是消息的来源或者当前节点没有投票也没有 Leader,那么就会向来源的节点投票,否则就会通知该节点当前节点拒绝投票。
在 stepCandidate 方法中,候选人节点会处理来自其他节点的投票响应消息,也就是 MsgVoteResp :
// https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/raft/raft.go#L1146-1189
func stepCandidate(r *raft, m pb.Message) error {
switch m.Type {
// ...
case pb.MsgVoteResp:
gr := r.poll(m.From, m.Type, !m.Reject)
switch r.quorum() {
case gr:
r.becomeLeader()
r.bcastAppend()
// ...
}
}
return nil
}
每当收到一个 MsgVoteResp 类型的消息时,就会设置当前节点持有的 votes 数组,更新其中存储的节点投票状态并返回投『同意』票的人数,如果获得的票数大于法定人数 quorum ,当前节点就会成为集群的 Leader 并向其他的节点发送当前节点当选的消息,通知其余节点更新 Raft 结构体中的 Term 等信息。
节点状态
对于每一个节点来说,它们根据不同的节点状态会对网络层发来的消息做出不同的响应,我们会分别介绍下面的四种状态在 Raft 中对于配置和消息究竟是如何处理的。
对于每一个 Raft 的节点状态来说,它们分别有三个比较重要的区别,其中一个是在改变状态时调用 becomeLeader 、 becomeCandidate 、 becomeFollower 和 becomePreCandidate方法改变 Raft 状态有比较大的不同,第二是处理消息时调用 stepLeader 、 stepCandidate和 stepFollower 时有比较大的不同,最后是几种不同状态的节点具有功能不同的定时任务。
对于方法的详细处理,我们在这一节中不详细介绍和分析,如果一个节点的状态是 Follower,那么当前节点切换到 Follower 一定会通过 becomeFollower 函数,在这个函数中会重置节点持有任期,并且设置处理消息的函数为 stepFollower :
// https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/raft/raft.go#L671-678
func (r *raft) becomeFollower(term uint64, lead uint64) {
r.step = stepFollower
r.reset(term)
r.tick = r.tickElection
r.lead = lead
r.state = StateFollower
}
// https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/raft/raft.go#L636-643
func (r *raft) tickElection() {
r.electionElapsed++
if r.promotable() && r.pastElectionTimeout() { r.electionElapsed = 0
r.Step(pb.Message{From: r.id, Type: pb.MsgHup})
}
}
除此之外,它还会设置一个用于在 Leader 节点宕机时触发选举的定时器 tickElection 。
Candidate 状态的节点与 Follower 的配置差不了太多,只是在消息处理函数 step 、任期以及状态上的设置有一些比较小的区别:
// https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/raft/raft.go#L680-691
func (r *raft) becomeCandidate() {
r.step = stepCandidate
r.reset(r.Term + 1)
r.tick = r.tickElection
r.Vote = r.id
r.state = StateCandidate
}
最后的 Leader 就与这两者有其他的区别了,它不仅设置了处理消息的函数 step 而且设置了与其他状态完全不同的 tick 函数:
// https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/raft/raft.go#L708-728
func (r *raft) becomeLeader() {
r.step = stepLeader
r.reset(r.Term)
r.tick = r.tickHeartbeat
r.lead = r.id
r.state = StateLeader
r.pendingConfIndex = r.raftLog.lastIndex()
r.appendEntry(pb.Entry{Data: nil})
}
这里的 tick 函数 tickHeartbeat 每隔一段时间会通过 Step 方法向集群中的其他节点发送 MsgBeat 消息:
// https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/raft/raft.go#L646-669
func (r *raft) tickHeartbeat() {
r.heartbeatElapsed++
r.electionElapsed++
if r.electionElapsed >= r.electionTimeout { r.electionElapsed = 0
if r.checkQuorum {
r.Step(pb.Message{From: r.id, Type: pb.MsgCheckQuorum})
}
}
if r.heartbeatElapsed >= r.heartbeatTimeout {
r.heartbeatElapsed = 0
r.Step(pb.Message{From: r.id, Type: pb.MsgBeat})
}
}
上述代码中的 MsgBeat 消息会在 Step 中被转换成 MsgHeartbeat 最终发送给其他的节点,Leader 节点超时之后的选举流程我们在前两节中也已经介绍过了,在这里就不再重复了。
etcd 目前支持 V2 和 V3 两个大版本,这两个版本在实现上有比较大的不同,一方面是对外提供接口的方式,另一方面就是底层的存储引擎,V2 版本的实例是一个纯内存的实现,所有的数据都没有存储在磁盘上,而 V3 版本的实例就支持了数据的持久化。
在这一节中,我们会介绍 V3 版本的 etcd 究竟是通过什么样的方式存储用户数据的。
在 V3 版本的设计中,etcd 通过 backend 后端这一设计,很好地封装了存储引擎的实现细节,为上层提供一个更一致的接口,对于 etcd 的其他模块来说,它们可以将更多注意力放在接口中的约定上,不过在这里,我们更关注的是 etcd 对 Backend 接口的实现。
// https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/mvcc/backend/backend.go#L51-69
type Backend interface {
ReadTx() ReadTx
BatchTx() BatchTx
Snapshot() Snapshot
Hash(ignores mapstruct{}) (uint32, error)
Size() int64
SizeInUse() int64
Defrag() error
ForceCommit()
Close() error
}
etcd 底层默认使用的是开源的嵌入式键值存储数据库 bolt ,但是这个项目目前的状态已经是归档不再维护了,如果想要使用这个项目可以使用 CoreOS 的 bbolt 版本。
这一小节中,我们会简单介绍 etcd 是如何使用 BoltDB 作为底层存储的,首先可以先来看一下 pacakge 内部的 backend 结构体,这是一个实现了 Backend 接口的结构:
// https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/mvcc/backend/backend.go#L80-104
type backend struct {
size int64
sizeInUse int64
commits int64
mu sync.RWMutex
db *bolt.DB
batchInterval time.Duration
batchLimit int
batchTx *batchTxBuffered
readTx *readTx
stopc chan struct{}
donec chan struct{}
lg *zap.Logger
}
从结构体的成员 db 我们就可以看出,它使用了 BoltDB 作为底层存储,另外的两个 readTx和 batchTx 分别实现了 ReadTx 和 BatchTx 接口:
// https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/mvcc/backend/read_tx.go#L30-36
type ReadTx interface {
Lock()
Unlock()
UnsafeRange(bucketName []byte, key, endKey []byte, limit int64) (keys [][]byte, vals [][]byte)
UnsafeForEach(bucketName []byte, visitor func(k, v []byte) error) error
}
// https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/mvcc/backend/batch_tx.go#L28-38
type BatchTx interface {
ReadTx
UnsafeCreateBucket(name []byte)
UnsafePut(bucketName []byte, key []byte, value []byte)
UnsafeSeqPut(bucketName []byte, key []byte, value []byte)
UnsafeDelete(bucketName []byte, key []byte)
Commit()
CommitAndStop()
}
从这两个接口的定义,我们不难发现它们能够对外提供数据库的读写操作,而 Backend 就能对这两者提供的方法进行封装,为上层屏蔽存储的具体实现:
每当我们使用 newBackend 创建一个新的 backend 结构时,都会创建一个 readTx 和 batchTx 结构体,这两者一个负责处理只读请求,一个负责处理读写请求:
// https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/mvcc/backend/backend.go#L137-176
func newBackend(bcfg BackendConfig) *backend {
bopts := &bolt.Options{}
bopts.InitialMmapSize = bcfg.mmapSize()
db, _ := bolt.Open(bcfg.Path, 0600, bopts)
b := &backend{ db: db,
batchInterval: bcfg.BatchInterval,
batchLimit: bcfg.BatchLimit,
readTx: &readTx{
buf: txReadBuffer{
txBuffer: txBuffer{make(map*bucketBuffer)},
},
buckets: make(map*bolt.Bucket),
},
stopc: make(chan struct{}),
donec: make(chan struct{}),
}
b.batchTx = newBatchTxBuffered(b)
go b.run()
return b
}
当我们在 newBackend 中进行了初始化 BoltDB、事务等工作后,就会开一个 goroutine 异步的对所有批量读写事务进行定时提交:
// https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/mvcc/backend/backend.go#L289-305
func (b *backend) run() {
defer close(b.donec)
t := time.NewTimer(b.batchInterval)
defer t.Stop()
for {
select {
caseatRev })
if n != -1 {
return g.revs, g.created, g.ver - int64(len(g.revs)-n-1), nil
}
return revision{}, revision{}, 0, ErrRevisionNotFound
}
KeyIndex
在我们具体介绍方法实现的细节之前,首先我们需要理解 keyIndex 包含的字段以及管理同一个 Key 不同版本的方式:
每一个 keyIndex 结构体中都包含当前键的值以及最后一次修改对应的 revision 信息,其中还保存了一个 Key 的多个 generation ,每一个 generation 都会记录当前 Key『从生到死』的全部过程,每当一个 Key 被删除时都会调用 timestone 方法向当前的 generation 中追加一个新的墓碑版本:在此我向大家推荐一个架构学习交流群。交流学习群号:821169538里面会分享一些资深架构师录制的视频录像:有Spring,MyBatis,Netty源码分析,高并发、高性能、分布式、微服务架构的原理,JVM性能优化、分布式架构等这些成为架构师必备的知识体系。还能领取免费的学习资源,目前受益良多。
// https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/mvcc/key_index.go#L127-145
func (ki keyIndex) tombstone(lg zap.Logger, main int64, sub int64) error {
if ki.generations.isEmpty() {
return ErrRevisionNotFound
}
ki.put(lg, main, sub)
ki.generations = append(ki.generations, generation{})
return nil
}
这个 tombstone 版本标识这当前的 Key 已经被删除了,但是在每次删除一个 Key 之后,就会在当前的 keyIndex 中创建一个新的 generation 结构用于存储新的版本信息,其中 ver 记录当前 generation 包含的修改次数, created 记录创建 generation 时的 revision 版本,最后的 revs 用于存储所有的版本信息。
etcd 中所有的查询请求,无论是查询一个还是多个、是数量还是键值对,最终都会调用 rangeKeys 方法:
// https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/mvcc/kvstore_txn.go#L112-165
func (tr storeTxnRead) rangeKeys(key, end []byte, curRev int64, ro RangeOptions) (RangeResult, error) {
rev := ro.Rev
revpairs := tr.s.kvindex.Revisions(key, end, rev)
if len(revpairs) == 0 {
return &RangeResult{KVs: nil, Count: 0, Rev: curRev}, nil
}
kvs := make([]mvccpb.KeyValue, int(ro.Limit))
revBytes := newRevBytes()
for i, revpair := range revpairs[:len(kvs)] {
revToBytes(revpair, revBytes)
_, vs := tr.tx.UnsafeRange(keyBucketName, revBytes, nil, 0)
kvs.Unmarshal(vs)
}
return &RangeResult{KVs: kvs, Count: len(revpairs), Rev: curRev}, nil
}
为了获取一个范围内的所有键值对,我们首先需要通过 Revisions 函数从 btree 中获取范围内所有的 keyIndex :
// https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/mvcc/index.go#L106-120
func (ti treeIndex) Revisions(key, end []byte, atRev int64) (revs []revision) {
if end == nil {
rev, , , err := ti.Get(key, atRev)
if err != nil {
return nil
}
return []revision{rev}
}
ti.visit(key, end, func(ki keyIndex) {
if rev, , , err := ki.get(ti.lg, atRev); err == nil {
revs = append(revs, rev)
}
})
return revs
}
如果只需要获取一个 Key 对应的版本,就是直接使用 treeIndex 的方法,但是当上述方法会从 btree 索引中获取一个连续多个 revision 值时,就会调用 keyIndex.get 来遍历整颗树并选取合适的版本:
func (ki keyIndex) get(lg zap.Logger, atRev int64) (modified, created revision, ver int64, err error) {
g := ki.findGeneration(atRev)
if g.isEmpty() {
return revision{}, revision{}, 0, ErrRevisionNotFound
}
n := g.walk(func(rev revision) bool { return rev.main > atRev })
if n != -1 {
return g.revs, g.created, g.ver - int64(len(g.revs)-n-1), nil
}
return revision{}, revision{}, 0, ErrRevisionNotFound
}
因为每一个 Key 的 keyIndex 中其实都存储着多个 generation ,我们需要根据传入的参数返回合适的 generation 并从其中返回主版本大于 atRev 的 revision 结构。
对于上层的键值存储来说,它会利用这里返回的 revision 从真正存储数据的 BoltDB 中查询当前 Key 对应 revision 的结果。
当我们向 etcd 中插入数据时,会使用传入的 key 构建一个 keyIndex 结构体并从树中获取相关版本等信息:
// https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/mvcc/index.go#L53-66
func (ti *treeIndex) Put(key []byte, rev revision) {
keyi := &keyIndex{key: key}
item := ti.tree.Get(keyi)
if item == nil {
keyi.put(ti.lg, rev.main, rev.sub)
ti.tree.ReplaceOrInsert(keyi)
return
}
okeyi := item.(*keyIndex)
okeyi.put(ti.lg, rev.main, rev.sub)
}
treeIndex.Put 在获取内存中的 keyIndex 结构之后会通过 keyIndex.put 其中加入新的 revision :
// https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/mvcc/key_index.go#L77-104
func (ki keyIndex) put(lg zap.Logger, main int64, sub int64) {
rev := revision{main: main, sub: sub}
if len(ki.generations) == 0 { ki.generations = append(ki.generations, generation{})
}
g := &ki.generations
if len(g.revs) == 0 {
g.created = rev
}
g.revs = append(g.revs, rev)
g.ver++
ki.modified = rev
}
每一个新 revision 结构体写入 keyIndex 时,都会改变当前 generation 的 created 和 ver 等参数,从这个方法中我们就可以了解到 generation 中的各个成员都是如何被写入的。
写入的操作除了增加之外,删除某一个 Key 的函数也会经常被调用:
// https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/mvcc/kvstore_txn.go#L252-309
func (tw *storeTxnWrite) delete(key []byte) {
ibytes := newRevBytes()
idxRev := revision{main: tw.beginRev + 1, sub: int64(len(tw.changes))}
revToBytes(idxRev, ibytes)
ibytes = appendMarkTombstone(tw.storeTxnRead.s.lg, ibytes)
kv := mvccpb.KeyValue{Key: key}
d, _ := kv.Marshal()
tw.tx.UnsafeSeqPut(keyBucketName, ibytes, d)
tw.s.kvindex.Tombstone(key,>
tw.changes = append(tw.changes, kv)
}
正如我们在文章前面所介绍的,删除操作会向结构体中的 generation 追加一个新的 tombstone 标记,用于标识当前的 Key 已经被删除;除此之外,上述方法还会将每一个更新操作的 revision 存到单独的 keyBucketName 中。
索引的恢复
因为在 etcd 中,所有的 keyIndex 都是在内存的 btree 中存储的,所以在启动服务时需要从 BoltDB 中将所有的数据都加载到内存中,在这里就会初始化一个新的 btree 索引,然后调用 restore 方法开始恢复索引:
// https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/mvcc/kvstore.go#L321-433
func (s *store) restore() error {
min, max := newRevBytes(), newRevBytes()
revToBytes(revision{main: 1}, min)
revToBytes(revision{main: math.MaxInt64, sub: math.MaxInt64}, max)
tx := s.b.BatchTx()
rkvc, revc := restoreIntoIndex(s.lg, s.kvindex)
for {
keys, vals := tx.UnsafeRange(keyBucketName, min, max, int64(restoreChunkKeys))
if len(keys) == 0 {
break
}
restoreChunk(s.lg, rkvc, keys, vals, keyToLease)
newMin := bytesToRev(keys[:revBytesLen])
newMin.sub++
revToBytes(newMin, min)
}
close(rkvc)
s.currentRev =
}
// https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/mvcc/watchable_store.go#L111-142
func (s watchableStore) watch(key, end []byte, startRev int64,>watcher, cancelFunc) {
wa := &watcher{
key: key,
end: end,
minRev: startRev,
id: > ch: ch,
fcs: fcs,
}
synced := startRev > s.store.currentRev || startRev == 0
if synced {
s.synced.add(wa)
} else {
s.unsynced.add(wa)
}
return wa, func() { s.cancelWatcher(wa) }
}
当 etcd 服务启动时,会在服务端运行一个用于处理监听事件的 watchServer gRPC 服务,客户端的 Watch 请求最终都会被转发到这个服务的 Watch 函数中:
// https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/etcdserver/api/v3rpc/watch.go#L136-206
func (ws watchServer) Watch(stream pb.Watch_WatchServer) (err error) {
sws := serverWatchStream{
// ...
gRPCStream:stream,
watchStream: ws.watchable.NewWatchStream(),
ctrlStream: make(chan pb.WatchResponse, ctrlStreamBufLen),
}
sws.wg.Add(1)
go func() {
sws.sendLoop()
sws.wg.Done()
}()
go func() {
sws.recvLoop()
}()
sws.wg.Wait()
return err
}
当客户端想要通过 Watch 结果监听某一个 Key 或者一个范围的变动,在每一次客户端调用服务端上述方式都会创建两个 Goroutine,这两个协程一个会负责向监听者发送数据变动的事件,另一个协程会负责处理客户端发来的事件。
// https://sourcegraph.com/github.com/etcd-io/etcd@1cab49e/-/blob/etcdserver/api/v3rpc/watch.go#L220-334
func (sws *serverWatchStream) recvLoop() error {
for {
req, err := sws.gRPCStream.Recv()
if err == io.EOF {
return nil
}
if err != nil {
return err
}
switch uv := req.RequestUnion.(type) { case *pb.WatchRequest_CreateRequest:
creq := uv.CreateRequest
filters := FiltersFromRequest(creq)
wsrev := sws.watchStream.Rev()
rev := creq.StartRevision
id, _ := sws.watchStream.Watch(mvcc.WatchID(creq.WatchId), creq.Key, creq.RangeEnd, rev, filters...)
wr := &pb.WatchResponse{
Header: sws.newResponseHeader(wsrev),
WatchId:int64(id),
Created:true,
Canceled: err != nil,
}
select {
case sws.ctrlStream
页:
[1]