Storm原理和架构
Storm系统架构
Storm集群的架构图
1. 主节点: 运行Nimbus的节点是系统的Master节点,即主控节点;Nimbus进程作为Storm系统的中心,负责接收用户提交的作业(Storm中即为以jar包形式保存的topology代码),向工作节点分配处理任务(进程级和线程级)和传输作业副本; 并依赖协调节点的服务监控集群运行状态,提供状态获取接口。Nimbus目前是单独部署的。
2. 从节点:运行supervisor的节点是系统的从节点,即工作节点;Supervisor监听所在节点,根据nimbus的委派,启动、暂停、撤销或者关闭任务的工作进程。工作节点是实时数据处理作业运行的节点。其中,计算在节点上的物理单元是worker,也即工作进程;计算的逻辑单元是executor,也即计算线程。而计算的作业逻辑单元室topology,也即拓扑;计算的任务逻辑单元室task,也即任务。每个worker执行特定topology的executor子集, 每个executor执行一个或多个task。一个topology主要有两类组件:spout和bolt,分别是流式数据在topology中的起始单元和处理单元。组件可以被并行配置,并行的每一份称为一个task,在一个executor中运行。
3. Web节点:运行Storm UI后台服务的节点;Storm UI在指定端口提供网页服务。用户可以通过浏览器访问web页面,通过web页面提交、暂停和撤销作业,也可以以只读的形式获取系统配置、作业及各个组件的运行时状态。Web节点在逻辑上是独立的,可以被安装在系统的任意节点实现监控;但是如果需要实现作业的管理,Storm UI必须和Storm nimbus部署在同一机器上,这是因为Storm UI进程会检查本机是否存在nimbus的连接,若不存在可导致UI部分功能无法正常工作。
4. 协调节点:运行zookeeper进程的节点;Zookeeper并不是Storm专用的,可以作为一类通用的分布式状态协调服务。Nimbus和supervisor之间的所有协调,包括分布式状态维护和分布式配置管理,都是通过该协调节点实现的。为了实现服务的高可用性,Zookeeper往往是以集群形式提供服务的,也即在Storm系统中可以存在多个协调节点。
Storm的工作流
Storm工作流图解
1. Topology:storm中运行的一个实时应用程序,因为各个组件间的消息流动形成逻辑上的一个拓扑结构。
2. Spout: 在一个topology中产生源数据流的组件。通常情况下spout会从外部数据源中读取数据,然后转换为topology内部的源数据。Spout是一个主动的角色,其接口中有个nextTuple()函数,storm框架会不停地调用此函数,用户只要在其中生成源数据即可。
3. Bolt:在一个topology中接受数据然后执行处理的组件。Bolt可以执行过滤、函数操作、合并、写数据库等任何操作。Bolt是一个被动的角色,其接口中有个execute(Tuple input)函数,在接受到消息后会调用此函数,用户可以在其中执行自己想要的操作。
4. Tuple:一次消息传递的基本单元。本来应该是一个key-value的map,但是由于各个组件间传递的tuple的字段名称已经事先定义好,所以tuple中只要按序填入各个value就行了,所以就是一个value list.
5. Stream:源源不断传递的tuple就组成了stream。
Storm的并发机制
1. 服务器(Nodes):Storm集群可以包含多台服务器,即可以扩展为多个Nodes。
2. JVM虚拟机(Worker):每台Storm服务器可以起多个JVM虚拟机,即可以扩展为多个Worker。
3. 线程(Executor):每个worker可以运行一个或多个executor。每个executor可以运行同一个component(spout/Bolt)的一个或多个task。
4. Spout/Bolt实例(Task): task就是一个spout/bolt实例,是真正执行数据处理的地方。
一个正在运行的拓扑由很多worker进程组成,这些worker进程在Storm集群的多台机器上运行。一个worker进程属于一个特定的拓扑并且执行这个拓扑的一个或多个component(spout或者bolt)的一个或多个executor。一个worker进程就是一个Java虚拟机(JVM),它执行一个拓扑的一个子集。
一个executor是由一个worker进程产生的一个线程,它运行在worker的Java虚拟机里。一个executor为同一个component(spout或bolt)运行一个或多个任务。一个executor总会有一个线程来运行executor所有的task,这说明task在executor内部是串行执行的。
真正的数据处理逻辑是在task里执行的,在父executor线程执行过程中会运行task。在代码中实现的每个spout或bolt是在全集群中以很多task的形式运行的。一个component的task数量在这个拓扑的生命周期中是固定不变的,但是一个component的executor(线程)数量会随着时间推移发生变化。这说明以下条件一直成立:threads数量 <= task数量。默认情况下task数量被设置成跟executor的数量是一样的,即Storm会在每个线程上执行一个任务(这通常是你想要的)。
同时请注意:executor线程的数量在拓扑已经启动后,可以发生变化。而拓扑的task数量是固定的
Storm的数据流
Storm 的核心抽象概念是 “流” (stream),一个 stream 相当于一个无限的元组(tuple) 序列。Storm提供基用来做流转换的基件是 “spout” 和 “bolts” ,spout 和 bolt 提供了接口,可以实现这些接口来处理的你自己的应用程序相关的逻辑。
spout 是流的来源, 例如 spout 可以从一个 Kestrel 队列来读 tuple 并且发射(emit)他们形成一个流,或者 spout 可以连接到 Twitter api,来发射一个推文的流。
一个 bolt 消费任意数量的流, 做一些处理,然后可能会发射出新的流,复杂的流转换,例如从一个推文的流计算出一个热门话题的流,需要多个步骤,多个 bolt 。bolt可以通过运行函数(functions)来做任何事,例如过滤元组,做流聚合,做流连接,跟数据库交互等等。
Storm 使用 tuple 做数据模型,一个 tuple 是被命名过的值列表,一个 tuple 中的字段可以是任何类型的对象。它是开箱即用的,Storm 支持所有的简单数据类型,如字符串,字节数组作为 tuple 的字段值。如果要使用另一种类型的对象,你只需要为这个类型实现一个 serializer。topology 中的每一个节点都应该为它要发射的元组声明输出字段。
为拓扑中的每个 Bolt 的确定输入数据流是定义一个拓扑的重要环节。数据流分组定义了在 Bolt 的不同任务(tasks)中划分数据流的方式。在 Storm 中有八种内置的数据流分组方式,而且你还可以通过CustomStreamGrouping 接口实现自定义的数据流分组模型。这八种分组分时分别为:
1. 随机分组(Shuffle grouping):这种方式下元组会被尽可能随机地分配到 Bolt 的不同任务(tasks)中,使得每个任务所处理元组数量能够能够保持基本一致,以确保集群的负载均衡。
2. 域分组(Fields grouping):这种方式下数据流根据定义的“域”来进行分组。例如,如果某个数据流是基于一个名为“user-id”的域进行分组的,那么所有包含相同的“user-id”的元组都会被分配到同一个任务中,这样就可以确保消息处理的一致性。
3. 部分关键字分组(Partial Key grouping):这种方式与域分组很相似,根据定义的域来对数据流进行分组,不同的是,这种方式会考虑下游 Bolt 数据处理的均衡性问题,在输入数据源关键字不平衡时会有更好的性能。
4. 完全分组(All grouping):这种方式下数据流会被同时发送到 Bolt 的所有任务中(也就是说同一个元组会被复制多份然后被所有的任务处理),使用这种分组方式要特别小心。
5. 全局分组(Global grouping):这种方式下所有的数据流都会被发送到 Bolt 的同一个任务中,也就是 id 最小的那个任务。
6. 非分组(None grouping):使用这种方式说明你不关心数据流如何分组。目前这种方式的结果与随机分组完全等效,不过未来 Storm 社区可能会考虑通过非分组方式来让 Bolt 和它所订阅的 Spout 或 Bolt 在同一个线程中执行。
7. 直接分组(Direct grouping):这是一种特殊的分组方式。使用这种方式意味着元组的发送者可以指定下游的哪个任务可以接收这个元组。只有在数据流被声明为直接数据流时才能够使用直接分组方式。使用直接数据流发送元组需要使用 OutputCollector 的其中一个 emitDirect 方法。Bolt 可以通过 TopologyContext 来获取它的下游消费者的任务 id,也可以通过跟踪 OutputCollector 的 emit 方法(该方法会返回它所发送元组的目标任务的 id)的数据来获取任务 id。
8. 本地或随机分组(Local or shuffle grouping):如果在源组件的 worker 进程里目标 Bolt 有一个或更多的任务线程,元组会被随机分配到那些同进程的任务中。换句话说,这与随机分组的方式具有相似的效果。
其中
1. Kafka Spout 负责从kafka队列中提取日志;提取后的日志没有区别,所以通过Shuffle Grouping分发给ParseAndFilter Bolt
2. ParseAndFilter Bolt 负责从所有日志中提取出需要处理的日志(这里提取的日志类型是读信、发信和推荐日志);根据日志类型的不同,分发给ExtractReadmsg、ExtractRecommendmsg和ExtractSendmsg,采用Filed Grouping,其中field即为日志类型。
3. ExtractReadmsg Bolt 负责从读信日志中提取出时间、性别、平台、展位、用户类别、用户uid;因为同一性别、平台、展位、用户类别的日志要发送到同一个Bolt,才能进行准确统计,这里根据性别、平台、展位、用户类别这些field进行Field Grouping发送到AnalysisReadmsg Bolt。
4. ExtractRecommendmsg Bolt 负责从推荐日志中提取出时间、性别、平台、展位、用户类别;因为同一性别、平台、展位、用户类别的日志要发送到同一个Bolt,才能进行准确统计,这里根据性别、平台、展位、用户类别这些field进行Field Grouping发送到AnalysisRecommendmsg Bolt。
5. ExtractSendmsg Bolt 负责从发信日志中提取出时间、发信用户性别、平台、展位、发信用户类别、发信用户uid、收信用户类别、收信用户uid;因为同一性别、平台、展位、用户类别的日志要发送到同一个Bolt,才能进行准确统计,这里根据性别、平台、展位、用户类别这些field进行Field Grouping发送到AnalysisSendmsg Bolt。
6. AnalysisReadmsg Bolt负责对提取出的读信信息进行统计,并存储到数据库;
7. AnalysisRecommendmsg Bolt负责对提取出的推荐信息进行统计,并存储到数据库
8. AnalysisSendmsg Bolt负责对提取出的收信和发信信息进行统计,并存储到数据库
9. StoreSignal Spout 每隔半个小时发送存储信号,当AnalysisReadmsg, AnalysisSendmsg和AnalysisRecommendmsg收到信号时,会将统计信息存储到数据库;因为这三个Bolt都需要接受该信号,所以采用All Grouping。
Storm的保障机制
1. 功能性保障:多粒度的并行化
2. 非功能性保障:多级别的可靠性
- 点赞
- 收藏
- 关注作者
评论(0)