实战分布式系统-python实现Multi-Paxos-2

Leo Xiao 发表于 2021/03/26 19:40:25 2021/03/26
【摘要】 Leader Scout CommanderLeader的主要任务是接受Propose要求新投票的消息并做出决定。成功完成协议的Prepare/Promise部分后Leader将处于“Active状态” 。活跃的Leader可以立即发送Accept消息以响应Propose。与按角色分类的模型保持一致,Leader会委派scout和Commander角色来执行协议的每个部分。class Le...

Leader/Scout/Commander

Leader的主要任务是接受Propose要求新投票的消息并做出决定。成功完成协议的Prepare/Promise部分后Leader将处于“Active状态” 。活跃的Leader可以立即发送Accept消息以响应Propose。

与按角色分类的模型保持一致,Leader会委派scout和Commander角色来执行协议的每个部分。

class Leader(Role):

    def __init__(self, node, peers, commander_cls=Commander, scout_cls=Scout):
        super(Leader, self).__init__(node)
        self.ballot_num = Ballot(0, node.address)
        self.active = False
        self.proposals = {}
        self.commander_cls = commander_cls
        self.scout_cls = scout_cls
        self.scouting = False
        self.peers = peers

    def start(self):
        # reminder others we're active before LEADER_TIMEOUT expires
        def active():
            if self.active:
                self.node.send(self.peers, Active())
            self.set_timer(LEADER_TIMEOUT / 2.0, active)
        active()

    def spawn_scout(self):
        assert not self.scouting
        self.scouting = True
        self.scout_cls(self.node, self.ballot_num, self.peers).start()

    def do_Adopted(self, sender, ballot_num, accepted_proposals):
        self.scouting = False
        self.proposals.update(accepted_proposals)
        # note that we don't re-spawn commanders here; if there are undecided
        # proposals, the replicas will re-propose
        self.logger.info("leader becoming active")
        self.active = True

    def spawn_commander(self, ballot_num, slot):
        proposal = self.proposals[slot]
        self.commander_cls(self.node, ballot_num, slot, proposal, self.peers).start()

    def do_Preempted(self, sender, slot, preempted_by):
        if not slot:  # from the scout
            self.scouting = False
        self.logger.info("leader preempted by %s", preempted_by.leader)
        self.active = False
        self.ballot_num = Ballot((preempted_by or self.ballot_num).n + 1, 
                                 self.ballot_num.leader)

    def do_Propose(self, sender, slot, proposal):
        if slot not in self.proposals:
            if self.active:
                self.proposals[slot] = proposal
                self.logger.info("spawning commander for slot %d" % (slot,))
                self.spawn_commander(self.ballot_num, slot)
            else:
                if not self.scouting:
                    self.logger.info("got PROPOSE when not active - scouting")
                    self.spawn_scout()
                else:
                    self.logger.info("got PROPOSE while scouting; ignored")
        else:
            self.logger.info("got PROPOSE for a slot already being proposed")

Leader想要变为活动状态时会创建一个Scout角色,以响应Propose在其处于非活动状态时收到消息(下图),Scout发送(并在必要时重新发送)Prepare消息,并收集Promise响应,直到听到消息为止。多数同行或直到被抢占为止。在通过Adopted或Preempted回复给Leader。

Leader    Scout      Acceptor     Acceptor    Acceptor
   |          |          |            |           |   
   |          X--------->|            |           |    Prepare
   |          |<---------X            |           |    Promise
   |          X---------------------->|           |    Prepare
   |          |<----------------------X           |    Promise
   |          X---------------------------------->|    Prepare
   |          |<----------------------------------X    Promise
   |<---------X          |            |           |    Adopted
class Scout(Role):

    def __init__(self, node, ballot_num, peers):
        super(Scout, self).__init__(node)
        self.ballot_num = ballot_num
        self.accepted_proposals = {}
        self.acceptors = set([])
        self.peers = peers
        self.quorum = len(peers) / 2 + 1
        self.retransmit_timer = None

    def start(self):
        self.logger.info("scout starting")
        self.send_prepare()

    def send_prepare(self):
        self.node.send(self.peers, Prepare(ballot_num=self.ballot_num))
        self.retransmit_timer = self.set_timer(PREPARE_RETRANSMIT, self.send_prepare)

    def update_accepted(self, accepted_proposals):
        acc = self.accepted_proposals
        for slot, (ballot_num, proposal) in accepted_proposals.iteritems():
            if slot not in acc or acc[slot][0] < ballot_num:
                acc[slot] = (ballot_num, proposal)

    def do_Promise(self, sender, ballot_num, accepted_proposals):
        if ballot_num == self.ballot_num:
            self.logger.info("got matching promise; need %d" % self.quorum)
            self.update_accepted(accepted_proposals)
            self.acceptors.add(sender)
            if len(self.acceptors) >= self.quorum:
                # strip the ballot numbers from self.accepted_proposals, now that it
                # represents a majority
                accepted_proposals = \ 
                    dict((s, p) for s, (b, p) in self.accepted_proposals.iteritems())
                # We're adopted; note that this does *not* mean that no other
                # leader is active.  # Any such conflicts will be handled by the
                # commanders.
                self.node.send([self.node.address],
                    Adopted(ballot_num=ballot_num, 
                            accepted_proposals=accepted_proposals))
                self.stop()
        else:
            # this acceptor has promised another leader a higher ballot number,
            # so we've lost
            self.node.send([self.node.address], 
                Preempted(slot=None, preempted_by=ballot_num))
            self.stop()

Leader为每个有active proposal的slot创建一个Commander角色(下图)。像Scout一样,Commander发送和重新发送Accept消息,并等待大多数接受者的回复Accepted或抢占消息。接受建议后,Commander将Decision消息广播到所有节点。它用Decided或Preempted响应Leader。

Leader    Commander   Acceptor     Acceptor    Acceptor
   |          |          |            |           |   
   |          X--------->|            |           |    Accept
   |          |<---------X            |           |    Accepted
   |          X---------------------->|           |    Accept
   |          |<----------------------X           |    Accepted
   |          X---------------------------------->|    Accept
   |          |<----------------------------------X    Accepted
   |<---------X          |            |           |    Decided
class Commander(Role):

    def __init__(self, node, ballot_num, slot, proposal, peers):
        super(Commander, self).__init__(node)
        self.ballot_num = ballot_num
        self.slot = slot
        self.proposal = proposal
        self.acceptors = set([])
        self.peers = peers
        self.quorum = len(peers) / 2 + 1

    def start(self):
        self.node.send(set(self.peers) - self.acceptors, Accept(
            slot=self.slot, ballot_num=self.ballot_num, proposal=self.proposal))
        self.set_timer(ACCEPT_RETRANSMIT, self.start)

    def finished(self, ballot_num, preempted):
        if preempted:
            self.node.send([self.node.address], 
                           Preempted(slot=self.slot, preempted_by=ballot_num))
        else:
            self.node.send([self.node.address], 
                           Decided(slot=self.slot))
        self.stop()

    def do_Accepted(self, sender, slot, ballot_num):
        if slot != self.slot:
            return
        if ballot_num == self.ballot_num:
            self.acceptors.add(sender)
            if len(self.acceptors) < self.quorum:
                return
            self.node.send(self.peers, Decision(
                           slot=self.slot, proposal=self.proposal))
            self.finished(ballot_num, False)
        else:
            self.finished(ballot_num, True)

有一个问题是后续会介绍的网络模拟器甚至在节点内的消息上也引入了数据包丢失。当所有 Decision消息丢失时,该协议无法继续进行。Replica继续重新传输Propose消息,但是Leader忽略了这些消息,因为它已经对该slot提出了proposal,由于没有Replica收到Decision所以Replica的catch过程找不到结果,解决方案是像实际网络堆栈以西洋确保本地消息始终传递成功。

Bootstrap

node加入cluster时必须获取当前的cluster状态, Bootstrap role循环每个节点发送join消息,知道收到Welcome, Bootstrap的时序图如下所示:
如果在每个role(replica,leader,acceptor)中实现启动过程,并等待welcome消息,会把初始化逻辑分散到每个role,测试起来会非常麻烦,最终,我们决定添加bootstrap role,一旦启动完成,就给node添加每个role,并且将初始状态传递给他们的构造函数。

class Bootstrap(Role):

    def __init__(self, node, peers, execute_fn,
                 replica_cls=Replica, acceptor_cls=Acceptor, leader_cls=Leader,
                 commander_cls=Commander, scout_cls=Scout):
        super(Bootstrap, self).__init__(node)
        self.execute_fn = execute_fn
        self.peers = peers
        self.peers_cycle = itertools.cycle(peers)
        self.replica_cls = replica_cls
        self.acceptor_cls = acceptor_cls
        self.leader_cls = leader_cls
        self.commander_cls = commander_cls
        self.scout_cls = scout_cls

    def start(self):
        self.join()

    def join(self):
        self.node.send([next(self.peers_cycle)], Join())
        self.set_timer(JOIN_RETRANSMIT, self.join)

    def do_Welcome(self, sender, state, slot, decisions):
        self.acceptor_cls(self.node)
        self.replica_cls(self.node, execute_fn=self.execute_fn, peers=self.peers,
                         state=state, slot=slot, decisions=decisions)
        self.leader_cls(self.node, peers=self.peers, commander_cls=self.commander_cls,
                        scout_cls=self.scout_cls).start()
        self.stop()

seed

一个node加入集群时,它至少需要找到集群中一个运行中的节点来响应join消息,当时集群如何启动呢?
一个选项是让bootstrap role尝试联系所有节点后来决定。但是这样做会有两个问题:1. 在一个庞大的集群中,这意味着等待join消息会超时 2.更重要的是,在网络故障的场景下新的node可能无法联系其他节点以及启动一个新的集群。
网络故障是集群应用最具挑战的可靠性问题,在网络隔断中,可能所有的集群成员都是alive的,但是无法与其他node通信。举个例子,如果柏林到北京的集群网络链路故障,在网络故障恢复时重新加入部门节点会很困难。在Multi-Paxos场景下,网络恢复会有两个集群,同样的slot号会出现不同的decision
为了避免这种情况的出现,需要将创新集群变成user-specified的操作。 集群中的一个节点运行种子seed role,其他节点照常启动。 seed会等待直到收到大部分节点的Join消息,再发送一个包含状态机初始状态和空列表的welcome消息, 随后seed role会停止并启动一个bootstrap role来加入新的集群。
seed会模拟bootstrap/replica中的Join/Welcome,所以它的通信状态图与replica role是一样的。

class Seed(Role):

    def __init__(self, node, initial_state, execute_fn, peers, 
                 bootstrap_cls=Bootstrap):
        super(Seed, self).__init__(node)
        self.initial_state = initial_state
        self.execute_fn = execute_fn
        self.peers = peers
        self.bootstrap_cls = bootstrap_cls
        self.seen_peers = set([])
        self.exit_timer = None

    def do_Join(self, sender):
        self.seen_peers.add(sender)
        if len(self.seen_peers) <= len(self.peers) / 2:
            return

        # cluster is ready - welcome everyone
        self.node.send(list(self.seen_peers), Welcome(
            state=self.initial_state, slot=1, decisions={}))

        # stick around for long enough that we don't hear any new JOINs from
        # the newly formed cluster
        if self.exit_timer:
            self.exit_timer.cancel()
        self.exit_timer = self.set_timer(JOIN_RETRANSMIT * 2, self.finish)

    def finish(self):
        # bootstrap this node into the cluster we just seeded
        bs = self.bootstrap_cls(self.node, 
                                peers=self.peers, execute_fn=self.execute_fn)
        bs.start()
        self.stop()

Requester

requester role 负责管理对分布式状态机的请求. 这个role会发送 Invoke消息给replica知道它收到回复 Invoked.
通信图与replica的图一致。

class Requester(Role):

    client_ids = itertools.count(start=100000)

    def __init__(self, node, n, callback):
        super(Requester, self).__init__(node)
        self.client_id = self.client_ids.next()
        self.n = n
        self.output = None
        self.callback = callback

    def start(self):
        self.node.send([self.node.address], 
                       Invoke(caller=self.node.address, 
                              client_id=self.client_id, input_value=self.n))
        self.invoke_timer = self.set_timer(INVOKE_RETRANSMIT, self.start)

    def do_Invoked(self, sender, client_id, output):
        if client_id != self.client_id:
            return
        self.logger.debug("received output %r" % (output,))
        self.invoke_timer.cancel()
        self.callback(output)
        self.stop()

总结

概括的说集群的role就是:

  • Acceptor – promises 并且接受 proposals
  • Replica – 管理分布式状态机: 提交 proposals, 提交 decisions, 响应requesters
  • Leader – lead rounds of the Multi-Paxos 算法
  • Scout – 为multi-Paxos中的leader执行 Prepare/Promise两部分
  • Commander – 为multi-Paxos中的leader执行 Accept/Accepted 两部分
  • Bootstrap – 为集群添加新节点
  • Seed – 创建一个新的集群
  • Requester – 请求分布式状态机操作

现在让cluster跑起来还需要一个组件:所有节点通信的网络Network

Network

所有的网络协议都需要发送和接收消息的能力,以及在某个时间点调用函数的方法,Network类需要提供一个具有这些能力的简单模拟网络,以及模拟数据包丢失和消息延迟,
这里使用Pyton的heapq(最大堆)处理计时器,这样可以高效的选择下一个event,设置计时器涉及将Timer对象推送到堆上,由于堆的删除效率比较底下,取消的计时器不删除,只被标记为取消,
消息的传递会使用计时器延迟调度到每个节点,延迟时间使用随机值,我们再使用functools.partial(偏函数,用于扩展已有函数的能力)做一个延迟调用目标node的receive方法并带上适当的参数,
运行模拟只涉及从堆中弹出计时器并且执行(如果计时器没有被取消同时目标节点还是active状态)。

class Timer(object):

    def __init__(self, expires, address, callback):
        self.expires = expires
        self.address = address
        self.callback = callback
        self.cancelled = False

    def __cmp__(self, other):
        return cmp(self.expires, other.expires)

    def cancel(self):
        self.cancelled = True


class Network(object):
    PROP_DELAY = 0.03
    PROP_JITTER = 0.02
    DROP_PROB = 0.05

    def __init__(self, seed):
        self.nodes = {}
        self.rnd = random.Random(seed)
        self.timers = []
        self.now = 1000.0

    def new_node(self, address=None):
        node = Node(self, address=address)
        self.nodes[node.address] = node
        return node

    def run(self):
        while self.timers:
            next_timer = self.timers[0]
            if next_timer.expires > self.now:
                self.now = next_timer.expires
            heapq.heappop(self.timers)
            if next_timer.cancelled:
                continue
            if not next_timer.address or next_timer.address in self.nodes:
                next_timer.callback()

    def stop(self):
        self.timers = []

    def set_timer(self, address, seconds, callback):
        timer = Timer(self.now + seconds, address, callback)
        heapq.heappush(self.timers, timer)
        return timer

    def send(self, sender, destinations, message):
        sender.logger.debug("sending %s to %s", message, destinations)
        # avoid aliasing by making a closure containing distinct deep copy of
        # message for each dest
        def sendto(dest, message):
            if dest == sender.address:
                # reliably deliver local messages with no delay
                self.set_timer(sender.address, 0,  
                               lambda: sender.receive(sender.address, message))
            elif self.rnd.uniform(0, 1.0) > self.DROP_PROB:
                delay = self.PROP_DELAY + self.rnd.uniform(-self.PROP_JITTER, 
                                                           self.PROP_JITTER)
                self.set_timer(dest, delay, 
                               functools.partial(self.nodes[dest].receive, 
                                                 sender.address, message))
        for dest in (d for d in destinations if d in self.nodes):
            sendto(dest, copy.deepcopy(message))

虽然我们的实现不包含真实网络,但是我们的组件模型是可以切换到真实的网络,在实际的网络中的真实服务器之间通信,不需要修改任何组件。

【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区),文章链接,文章作者等基本信息,否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件至:cloudbbs@huaweicloud.com进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容。
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。