Cassandra Gossip
.....
gossiper工作流程。在Cassandra启动的时候启动gossiper线程,具体如下:
在db启动的主线程中启动gossiper。通过
preparTojoin()
方法调用start方法中构造本节点的EndpointState信息并放入到endpointStateMap中。然后启动GossipTask定时任务,每1s钟执行一次
GossipTask运行时首先更新心跳小版本信息,即自增+1.
makeRandomGossipDigest
构造Digest(ep,generation<大版本>, maxVersion<所有VersionedValue中最大的version>)列表,用于和其他节点比较消息的新旧使用构造消息并发送
消息包含clusterName,partitionerName,Digests。消息类型:GOSSIP_DIGEST_SYN
随机找一个活的节点发送该消息
如果上步发给的节点不是seed,则有概率(可能发,可能不发)的尝试给某个不可达(之前由于种种原因被gossip标记为unreachable,网络,宕机等)的节点发一次
如果上上步发给的节点不是seed,或者活的节点数<seeds数。则有概率(可能发,可能不发)的尝试给某个seed再发一次
6. doStatusCheck
重点说第5点消息的发送流程:
首先本地LOCAL构造了GossipDigestSyn消息,通过GOSSIP_DIGEST_SYN进行发送
远端REMOTE收到消息,通过GossipDigestSynVerbHandler进行处理
首先判断收到的clusterName和partitionerName与自身不同则不进行该消息的处理
然后对Digests的对象进行排序。排序规则为:对每个ep的EndpointState中所有VersionedValue求出其小版本version的最大值与接收到的Digest.maxVersion求差值在绝对值,以及Digest.ep和Digest.generation生成新的Digest对象,对该对象排序,优先generation大版本,然后version。将收到的Digests采用该排序规则的逆序进行排序。也就是generation大的排前,generation相等的情况下,Digest.version和自身的VersionedValue中最大的version差值的绝对值大的排前(abs(max([VersionedValue1.version,VersionedValue2.version...])- Digest.version))。
examineGossiper
计算收到的消息和自身的新旧。自身新的则需要发给对方,自身旧的则需要从对方拉取。deltaGossipDigestList表示对方新,需要从对方拉取新的元数据。deltaEpStateMap表示自身新,需要发送给对方。新旧的判断标准就是通过节点维护的每个ep的版本信息(generation,version),大的则是新的。gDigestAckMessage = new MessageOut<GossipDigestAck>(MessagingService.Verb.GOSSIP_DIGEST_ACK, new GossipDigestAck(deltaGossipDigestList, deltaEpStateMap), GossipDigestAck.serializer);
发送计算后的消息,包含需要从对方拉取的新数据列表,以及需要发送给对方的新数据
本地LOCAL收到该消息,通过GossipDigestAckVerbHandler进行处理
以schema类似数据,且generation相同,version不同为例,
applyNewStates()
执行该逻辑,调用onchange()
方法。其中StorageService的onchange中,走schema的逻辑分支
更新其ep对应的peers表中scheam_Version信息,并触发从该ep拉取最新的schema数据。进入schema的更新逻辑
Gossiper.instance.applyStateLocally(epStateMap);
将远端发过来的deltaEpStateMap数据更新到自己本地。在更新这些数据期间,不通类型数据触发不通的处理逻辑。Gossiper.instance.getStateForVersionBiggerThan(addr, gDigest.getMaxVersion());
计算远端需要的消息数据MessageOut<GossipDigestAck2> gDigestAck2Message = new MessageOut<GossipDigestAck2>(MessagingService.Verb.GOSSIP_DIGEST_ACK2, new GossipDigestAck2(deltaEpStateMap), GossipDigestAck2.serializer);
将远端需要的数据构造对应的消息并发送远端REMOTE收到消息,通过GossipDigestAck2VerbHandler处理
调用
Gossiper.instance.applyStateLocally(remoteEpStateMap);
- 点赞
- 收藏
- 关注作者
评论(0)