实战分布式系统-python实现Multi-Paxos-2
Leader/Scout/Commander
Leader的主要任务是接受Propose要求新投票的消息并做出决定。成功完成协议的Prepare/Promise部分后Leader将处于“Active状态” 。活跃的Leader可以立即发送Accept消息以响应Propose。
与按角色分类的模型保持一致,Leader会委派scout和Commander角色来执行协议的每个部分。
class Leader(Role):
def __init__(self, node, peers, commander_cls=Commander, scout_cls=Scout):
super(Leader, self).__init__(node)
self.ballot_num = Ballot(0, node.address)
self.active = False
self.proposals = {}
self.commander_cls = commander_cls
self.scout_cls = scout_cls
self.scouting = False
self.peers = peers
def start(self):
# reminder others we're active before LEADER_TIMEOUT expires
def active():
if self.active:
self.node.send(self.peers, Active())
self.set_timer(LEADER_TIMEOUT / 2.0, active)
active()
def spawn_scout(self):
assert not self.scouting
self.scouting = True
self.scout_cls(self.node, self.ballot_num, self.peers).start()
def do_Adopted(self, sender, ballot_num, accepted_proposals):
self.scouting = False
self.proposals.update(accepted_proposals)
# note that we don't re-spawn commanders here; if there are undecided
# proposals, the replicas will re-propose
self.logger.info("leader becoming active")
self.active = True
def spawn_commander(self, ballot_num, slot):
proposal = self.proposals[slot]
self.commander_cls(self.node, ballot_num, slot, proposal, self.peers).start()
def do_Preempted(self, sender, slot, preempted_by):
if not slot: # from the scout
self.scouting = False
self.logger.info("leader preempted by %s", preempted_by.leader)
self.active = False
self.ballot_num = Ballot((preempted_by or self.ballot_num).n + 1,
self.ballot_num.leader)
def do_Propose(self, sender, slot, proposal):
if slot not in self.proposals:
if self.active:
self.proposals[slot] = proposal
self.logger.info("spawning commander for slot %d" % (slot,))
self.spawn_commander(self.ballot_num, slot)
else:
if not self.scouting:
self.logger.info("got PROPOSE when not active - scouting")
self.spawn_scout()
else:
self.logger.info("got PROPOSE while scouting; ignored")
else:
self.logger.info("got PROPOSE for a slot already being proposed")
Leader想要变为活动状态时会创建一个Scout角色,以响应Propose在其处于非活动状态时收到消息(下图),Scout发送(并在必要时重新发送)Prepare消息,并收集Promise响应,直到听到消息为止。多数同行或直到被抢占为止。在通过Adopted或Preempted回复给Leader。
Leader Scout Acceptor Acceptor Acceptor
| | | | |
| X--------->| | | Prepare
| |<---------X | | Promise
| X---------------------->| | Prepare
| |<----------------------X | Promise
| X---------------------------------->| Prepare
| |<----------------------------------X Promise
|<---------X | | | Adopted
class Scout(Role):
def __init__(self, node, ballot_num, peers):
super(Scout, self).__init__(node)
self.ballot_num = ballot_num
self.accepted_proposals = {}
self.acceptors = set([])
self.peers = peers
self.quorum = len(peers) / 2 + 1
self.retransmit_timer = None
def start(self):
self.logger.info("scout starting")
self.send_prepare()
def send_prepare(self):
self.node.send(self.peers, Prepare(ballot_num=self.ballot_num))
self.retransmit_timer = self.set_timer(PREPARE_RETRANSMIT, self.send_prepare)
def update_accepted(self, accepted_proposals):
acc = self.accepted_proposals
for slot, (ballot_num, proposal) in accepted_proposals.iteritems():
if slot not in acc or acc[slot][0] < ballot_num:
acc[slot] = (ballot_num, proposal)
def do_Promise(self, sender, ballot_num, accepted_proposals):
if ballot_num == self.ballot_num:
self.logger.info("got matching promise; need %d" % self.quorum)
self.update_accepted(accepted_proposals)
self.acceptors.add(sender)
if len(self.acceptors) >= self.quorum:
# strip the ballot numbers from self.accepted_proposals, now that it
# represents a majority
accepted_proposals = \
dict((s, p) for s, (b, p) in self.accepted_proposals.iteritems())
# We're adopted; note that this does *not* mean that no other
# leader is active. # Any such conflicts will be handled by the
# commanders.
self.node.send([self.node.address],
Adopted(ballot_num=ballot_num,
accepted_proposals=accepted_proposals))
self.stop()
else:
# this acceptor has promised another leader a higher ballot number,
# so we've lost
self.node.send([self.node.address],
Preempted(slot=None, preempted_by=ballot_num))
self.stop()
Leader为每个有active proposal的slot创建一个Commander角色(下图)。像Scout一样,Commander发送和重新发送Accept消息,并等待大多数接受者的回复Accepted或抢占消息。接受建议后,Commander将Decision消息广播到所有节点。它用Decided或Preempted响应Leader。
Leader Commander Acceptor Acceptor Acceptor
| | | | |
| X--------->| | | Accept
| |<---------X | | Accepted
| X---------------------->| | Accept
| |<----------------------X | Accepted
| X---------------------------------->| Accept
| |<----------------------------------X Accepted
|<---------X | | | Decided
class Commander(Role):
def __init__(self, node, ballot_num, slot, proposal, peers):
super(Commander, self).__init__(node)
self.ballot_num = ballot_num
self.slot = slot
self.proposal = proposal
self.acceptors = set([])
self.peers = peers
self.quorum = len(peers) / 2 + 1
def start(self):
self.node.send(set(self.peers) - self.acceptors, Accept(
slot=self.slot, ballot_num=self.ballot_num, proposal=self.proposal))
self.set_timer(ACCEPT_RETRANSMIT, self.start)
def finished(self, ballot_num, preempted):
if preempted:
self.node.send([self.node.address],
Preempted(slot=self.slot, preempted_by=ballot_num))
else:
self.node.send([self.node.address],
Decided(slot=self.slot))
self.stop()
def do_Accepted(self, sender, slot, ballot_num):
if slot != self.slot:
return
if ballot_num == self.ballot_num:
self.acceptors.add(sender)
if len(self.acceptors) < self.quorum:
return
self.node.send(self.peers, Decision(
slot=self.slot, proposal=self.proposal))
self.finished(ballot_num, False)
else:
self.finished(ballot_num, True)
有一个问题是后续会介绍的网络模拟器甚至在节点内的消息上也引入了数据包丢失。当所有 Decision消息丢失时,该协议无法继续进行。Replica继续重新传输Propose消息,但是Leader忽略了这些消息,因为它已经对该slot提出了proposal,由于没有Replica收到Decision所以Replica的catch过程找不到结果,解决方案是像实际网络堆栈以西洋确保本地消息始终传递成功。
Bootstrap
node加入cluster时必须获取当前的cluster状态, Bootstrap role循环每个节点发送join
消息,知道收到Welcome
, Bootstrap的时序图如下所示:
如果在每个role(replica,leader,acceptor)中实现启动过程,并等待welcome
消息,会把初始化逻辑分散到每个role,测试起来会非常麻烦,最终,我们决定添加bootstrap role,一旦启动完成,就给node添加每个role,并且将初始状态传递给他们的构造函数。
class Bootstrap(Role):
def __init__(self, node, peers, execute_fn,
replica_cls=Replica, acceptor_cls=Acceptor, leader_cls=Leader,
commander_cls=Commander, scout_cls=Scout):
super(Bootstrap, self).__init__(node)
self.execute_fn = execute_fn
self.peers = peers
self.peers_cycle = itertools.cycle(peers)
self.replica_cls = replica_cls
self.acceptor_cls = acceptor_cls
self.leader_cls = leader_cls
self.commander_cls = commander_cls
self.scout_cls = scout_cls
def start(self):
self.join()
def join(self):
self.node.send([next(self.peers_cycle)], Join())
self.set_timer(JOIN_RETRANSMIT, self.join)
def do_Welcome(self, sender, state, slot, decisions):
self.acceptor_cls(self.node)
self.replica_cls(self.node, execute_fn=self.execute_fn, peers=self.peers,
state=state, slot=slot, decisions=decisions)
self.leader_cls(self.node, peers=self.peers, commander_cls=self.commander_cls,
scout_cls=self.scout_cls).start()
self.stop()
seed
一个node加入集群时,它至少需要找到集群中一个运行中的节点来响应join
消息,当时集群如何启动呢?
一个选项是让bootstrap role尝试联系所有节点后来决定。但是这样做会有两个问题:1. 在一个庞大的集群中,这意味着等待join
消息会超时 2.更重要的是,在网络故障的场景下新的node可能无法联系其他节点以及启动一个新的集群。
网络故障是集群应用最具挑战的可靠性问题,在网络隔断中,可能所有的集群成员都是alive的,但是无法与其他node通信。举个例子,如果柏林到北京的集群网络链路故障,在网络故障恢复时重新加入部门节点会很困难。在Multi-Paxos场景下,网络恢复会有两个集群,同样的slot
号会出现不同的decision
。
为了避免这种情况的出现,需要将创新集群变成user-specified的操作。 集群中的一个节点运行种子seed
role,其他节点照常启动。 seed
会等待直到收到大部分节点的Join
消息,再发送一个包含状态机初始状态和空列表的welcome
消息, 随后seed
role会停止并启动一个bootstrap
role来加入新的集群。
seed
会模拟bootstrap/replica中的Join
/Welcome
,所以它的通信状态图与replica role是一样的。
class Seed(Role):
def __init__(self, node, initial_state, execute_fn, peers,
bootstrap_cls=Bootstrap):
super(Seed, self).__init__(node)
self.initial_state = initial_state
self.execute_fn = execute_fn
self.peers = peers
self.bootstrap_cls = bootstrap_cls
self.seen_peers = set([])
self.exit_timer = None
def do_Join(self, sender):
self.seen_peers.add(sender)
if len(self.seen_peers) <= len(self.peers) / 2:
return
# cluster is ready - welcome everyone
self.node.send(list(self.seen_peers), Welcome(
state=self.initial_state, slot=1, decisions={}))
# stick around for long enough that we don't hear any new JOINs from
# the newly formed cluster
if self.exit_timer:
self.exit_timer.cancel()
self.exit_timer = self.set_timer(JOIN_RETRANSMIT * 2, self.finish)
def finish(self):
# bootstrap this node into the cluster we just seeded
bs = self.bootstrap_cls(self.node,
peers=self.peers, execute_fn=self.execute_fn)
bs.start()
self.stop()
Requester
requester role 负责管理对分布式状态机的请求. 这个role会发送 Invoke
消息给replica知道它收到回复 Invoked
.
通信图与replica的图一致。
class Requester(Role):
client_ids = itertools.count(start=100000)
def __init__(self, node, n, callback):
super(Requester, self).__init__(node)
self.client_id = self.client_ids.next()
self.n = n
self.output = None
self.callback = callback
def start(self):
self.node.send([self.node.address],
Invoke(caller=self.node.address,
client_id=self.client_id, input_value=self.n))
self.invoke_timer = self.set_timer(INVOKE_RETRANSMIT, self.start)
def do_Invoked(self, sender, client_id, output):
if client_id != self.client_id:
return
self.logger.debug("received output %r" % (output,))
self.invoke_timer.cancel()
self.callback(output)
self.stop()
总结
概括的说集群的role就是:
- Acceptor – promises 并且接受 proposals
- Replica – 管理分布式状态机: 提交 proposals, 提交 decisions, 响应requesters
- Leader – lead rounds of the Multi-Paxos 算法
- Scout – 为multi-Paxos中的leader执行
Prepare
/Promise
两部分 - Commander – 为multi-Paxos中的leader执行
Accept
/Accepted
两部分 - Bootstrap – 为集群添加新节点
- Seed – 创建一个新的集群
- Requester – 请求分布式状态机操作
现在让cluster跑起来还需要一个组件:所有节点通信的网络Network
类
Network
所有的网络协议都需要发送和接收消息的能力,以及在某个时间点调用函数的方法,Network
类需要提供一个具有这些能力的简单模拟网络,以及模拟数据包丢失和消息延迟,
这里使用Pyton的heapq
(最大堆)处理计时器,这样可以高效的选择下一个event,设置计时器涉及将Timer
对象推送到堆上,由于堆的删除效率比较底下,取消的计时器不删除,只被标记为取消,
消息的传递会使用计时器延迟调度到每个节点,延迟时间使用随机值,我们再使用functools.partial
(偏函数,用于扩展已有函数的能力)做一个延迟调用目标node的receive
方法并带上适当的参数,
运行模拟只涉及从堆中弹出计时器并且执行(如果计时器没有被取消同时目标节点还是active状态)。
class Timer(object):
def __init__(self, expires, address, callback):
self.expires = expires
self.address = address
self.callback = callback
self.cancelled = False
def __cmp__(self, other):
return cmp(self.expires, other.expires)
def cancel(self):
self.cancelled = True
class Network(object):
PROP_DELAY = 0.03
PROP_JITTER = 0.02
DROP_PROB = 0.05
def __init__(self, seed):
self.nodes = {}
self.rnd = random.Random(seed)
self.timers = []
self.now = 1000.0
def new_node(self, address=None):
node = Node(self, address=address)
self.nodes[node.address] = node
return node
def run(self):
while self.timers:
next_timer = self.timers[0]
if next_timer.expires > self.now:
self.now = next_timer.expires
heapq.heappop(self.timers)
if next_timer.cancelled:
continue
if not next_timer.address or next_timer.address in self.nodes:
next_timer.callback()
def stop(self):
self.timers = []
def set_timer(self, address, seconds, callback):
timer = Timer(self.now + seconds, address, callback)
heapq.heappush(self.timers, timer)
return timer
def send(self, sender, destinations, message):
sender.logger.debug("sending %s to %s", message, destinations)
# avoid aliasing by making a closure containing distinct deep copy of
# message for each dest
def sendto(dest, message):
if dest == sender.address:
# reliably deliver local messages with no delay
self.set_timer(sender.address, 0,
lambda: sender.receive(sender.address, message))
elif self.rnd.uniform(0, 1.0) > self.DROP_PROB:
delay = self.PROP_DELAY + self.rnd.uniform(-self.PROP_JITTER,
self.PROP_JITTER)
self.set_timer(dest, delay,
functools.partial(self.nodes[dest].receive,
sender.address, message))
for dest in (d for d in destinations if d in self.nodes):
sendto(dest, copy.deepcopy(message))
虽然我们的实现不包含真实网络,但是我们的组件模型是可以切换到真实的网络,在实际的网络中的真实服务器之间通信,不需要修改任何组件。
- 点赞
- 收藏
- 关注作者
评论(0)