第一个Storm 程序
我的第一个Storm WordCount Topology
Storm WordCount执行流程分析
● 分布式单词计数的流程
1)首先它需要有数据源,我们在RandomSentenceSpout中定义了一个字符串数组sentences来模拟数据源。
2)这里我们将字符串数组中的每句话作为一个tuple发射。
3)然后,SplitSentenceBlot接收RandomSentenceSpout发射的tuple,它将每句话分割成每个单词,并将每个单词作为tuple发射。
4)接着,WordCountBolt接收SplitSentenceBlot发送的tuple,它将接收到的每一个单词统计计数,并将 作为tuple发射。
5)最后,ReportBolt接收WordCountBolt发送的tuple,将统计的结果存入HashMap中,并打印出结果。
● Topology的组成类
ISpout、IComponent、IBolt三个接口定义了一些最基本的方法,BaseRichSpout、BaseRichBolt是接口的实现类,自定义的Spout与Bolt通过继承实现类来完成工作。
Storm WordCount具体代码分析
● 在RandomSentenceSpout中定义了一个字符串数组sentences来模拟数据源。字符串数组中的每句话作为一个tuple发射。
RandomSentenceSpout.java 代码如下所示:
package storm.wordcount;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils;
import java.util.Map;
import java.util.Random;
● SplitSentenceBlot接收RandomSentenceSpout发射的tuple,它将每句话分割成每个单词,并将每个单词作为tuple发射。
SplitSentenceBlot.java 代码如下所示:
● WordCountBolt接收SplitSentenceBlot发送的tuple,它将接收到的每一个单词统计计数并将 <单词:出现次数> 作为tuple发射。
WordCountBolt.java 代码如下所示:
● ReportBolt接收WordCountBolt发送的tuple,将统计的结果存入HashMap中,并打印出结果。
ReportBolt.java 代码如下所示:
● 构建Wordcount Topology
WordCountTopology.java 代码如下所示:
我们可以构建一个maven 项目,将上述storm Wordcount代码复制到maven项目中。 运行Wordcount需要我们在pom.xml文件中引入storm核心包storm-core。
本地测试运行结果仅展示部分如下:
通过对Wordcount执行流程的分析,相信大家对storm数据处理流程有了进一步理解。
- 点赞
- 收藏
- 关注作者
评论(0)