Cassandra Gossip

举报
提桶 发表于 2020/04/04 16:04:26 2020/04/04
【摘要】 cassandra Gossiper主要用于集群中各种信息的同步。保持节点间各类元数据信息交互,进而达到一致

Gossiper主要用于集群中各种信息的同步。通过key value的格式进行信息传输(ApplicationState, VersionedValue).需要同步的信息都存放在ConcurrentMap<InetAddress, EndpointState> endpointStateMap中。

其中EndpointState里面则存储的是具体key value值(AtomicReference<Map<ApplicationState, VersionedValue>> applicationState)。

ApplicationState具体信息主要包含:

    STATUS(节点状态:shutdown,hibernate,removing,LEFT, NORMAL,LEAVING,MOVING等)

    TOKENS(集群中当前节点的token信息)、NET_VERSION(节点间messageService通讯的版本信息)

    HOST_ID(当前节点的hostid)

    RPC_ADDRESS(当前节点的rpc address)

    RELEASE_VERSION(当前节点运行的内核版本号)

    INTERNAL_IP(当前节点的listenAddress)

    SCHEMA(节点的表数据信息)

    HeartBeatState(generation<大版本>, version<小版本>)心跳信息。里面的版本大小表示集群中各种信息的新旧,版本号越大,表示信息越新,最后merge的时候会用新的信息覆盖旧信息。

    .....

gossiper工作流程。在Cassandra启动的时候启动gossiper线程,具体如下:

  1. 在db启动的主线程中启动gossiper。通过preparTojoin()方法调用image.png

  2. start方法中构造本节点的EndpointState信息并放入到endpointStateMap中。然后启动GossipTask定时任务,每1s钟执行一次

  3. GossipTask运行时首先更新心跳小版本信息,即自增+1.

  4. makeRandomGossipDigest构造Digest(ep,generation<大版本>, maxVersion<所有VersionedValue中最大的version>)列表,用于和其他节点比较消息的新旧使用

  5. 构造消息并发送

    image.png

    • 消息包含clusterName,partitionerName,Digests。消息类型:GOSSIP_DIGEST_SYN

    • 随机找一个活的节点发送该消息

    • 如果上步发给的节点不是seed,则有概率(可能发,可能不发)的尝试给某个不可达(之前由于种种原因被gossip标记为unreachable,网络,宕机等)的节点发一次

    • 如果上上步发给的节点不是seed,或者活的节点数<seeds数。则有概率(可能发,可能不发)的尝试给某个seed再发一次


    6. doStatusCheck


重点说第5点消息的发送流程:

  1. 首先本地LOCAL构造了GossipDigestSyn消息,通过GOSSIP_DIGEST_SYN进行发送

  2. 远端REMOTE收到消息,通过GossipDigestSynVerbHandler进行处理

    1. 首先判断收到的clusterName和partitionerName与自身不同则不进行该消息的处理

    2. 然后对Digests的对象进行排序。排序规则为:对每个ep的EndpointState中所有VersionedValue求出其小版本version的最大值与接收到的Digest.maxVersion求差值在绝对值,以及Digest.epDigest.generation生成新的Digest对象,对该对象排序,优先generation大版本,然后version。将收到的Digests采用该排序规则的逆序进行排序。也就是generation大的排前,generation相等的情况下,Digest.version和自身的VersionedValue中最大的version差值的绝对值大的排前(abs(max([VersionedValue1.version,VersionedValue2.version...])- Digest.version))。

    3. examineGossiper计算收到的消息和自身的新旧。自身新的则需要发给对方,自身旧的则需要从对方拉取。deltaGossipDigestList表示对方新,需要从对方拉取新的元数据。deltaEpStateMap表示自身新,需要发送给对方。新旧的判断标准就是通过节点维护的每个ep的版本信息(generation,version),大的则是新的。

    4. gDigestAckMessage = new MessageOut<GossipDigestAck>(MessagingService.Verb.GOSSIP_DIGEST_ACK, new GossipDigestAck(deltaGossipDigestList, deltaEpStateMap), GossipDigestAck.serializer);发送计算后的消息,包含需要从对方拉取的新数据列表,以及需要发送给对方的新数据

  3. 本地LOCAL收到该消息,通过GossipDigestAckVerbHandler进行处理

    1. 以schema类似数据,且generation相同,version不同为例,

    2. applyNewStates()执行该逻辑,调用onchange()方法。

    3. 其中StorageService的onchange中,走schema的逻辑分支

    4. 更新其ep对应的peers表中scheam_Version信息,并触发从该ep拉取最新的schema数据。进入schema的更新逻辑

    5. Gossiper.instance.applyStateLocally(epStateMap);将远端发过来的deltaEpStateMap数据更新到自己本地。在更新这些数据期间,不通类型数据触发不通的处理逻辑。

    6. Gossiper.instance.applyStateLocally(epStateMap);将远端发过来的deltaEpStateMap数据更新到自己本地。在更新这些数据期间,不通类型数据触发不通的处理逻辑。

    7. Gossiper.instance.getStateForVersionBiggerThan(addr, gDigest.getMaxVersion());计算远端需要的消息数据

    8. MessageOut<GossipDigestAck2> gDigestAck2Message = new MessageOut<GossipDigestAck2>(MessagingService.Verb.GOSSIP_DIGEST_ACK2, new GossipDigestAck2(deltaEpStateMap), GossipDigestAck2.serializer);将远端需要的数据构造对应的消息并发送

    9. 远端REMOTE收到消息,通过GossipDigestAck2VerbHandler处理

  4. 调用Gossiper.instance.applyStateLocally(remoteEpStateMap);方法。同第3.1的后续逻辑相同


【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。