【详解】Storm简单编程案例
Storm简单编程案例
Apache Storm 是一个免费的开源分布式实时计算系统。它使得处理大量流数据变得容易,能够可靠地处理无限的数据流。本文将通过一个简单的编程案例来介绍如何使用 Apache Storm。
1. 环境搭建
在开始之前,确保你的开发环境已经安装了以下工具:
- Java JDK 8 或更高版本
- Maven 3.0 或更高版本
- Apache Storm 1.2.3 或更高版本
安装步骤
- 安装 Java 和 Maven:根据官方文档安装 Java 和 Maven。
- 下载并解压 Storm:从 Apache Storm 官方网站 下载最新版本的 Storm,并解压到你选择的目录。
- 配置环境变量:将 Storm 的
bin
目录添加到系统的 PATH 环境变量中。 - 启动 Storm 集群:运行
storm nimbus
和 storm supervisor
命令启动 Nimbus 和 Supervisor 进程。
2. 创建一个简单的 Storm 拓扑
我们将创建一个简单的 Storm 拓扑,该拓扑从一个句子生成单词流,并统计每个单词出现的次数。
2.1 创建 Maven 项目
首先,使用 Maven 创建一个新的 Java 项目,并在 pom.xml
文件中添加 Storm 的依赖:
<dependencies>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>1.2.3</version>
<scope>provided</scope>
</dependency>
</dependencies>
2.2 编写 Spout 和 Bolt
2.2.1 SentenceSpout
SentenceSpout
是一个数据源,用于生成句子。
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
import java.util.Map;
import java.util.Random;
public class SentenceSpout extends BaseRichSpout {
private SpoutOutputCollector collector;
private String[] sentences = new String[]{
"the cow jumped over the moon",
"an apple a day keeps the doctor away",
"four score and seven years ago",
"snow white and the seven dwarfs",
"i am at two with nature"
};
private Random rand = new Random();
@Override
public void open(Map<String, Object> conf, TopologyContext context, SpoutOutputCollector collector) {
this.collector = collector;
}
@Override
public void nextTuple() {
String sentence = sentences[rand.nextInt(sentences.length)];
collector.emit(new Values(sentence));
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("sentence"));
}
}
2.2.2 SplitBolt
SplitBolt
用于将句子拆分成单词。
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import java.util.Map;
public class SplitBolt extends BaseRichBolt {
private OutputCollector collector;
@Override
public void prepare(Map<String, Object> conf, TopologyContext context, OutputCollector collector) {
this.collector = collector;
}
@Override
public void execute(Tuple tuple) {
String sentence = tuple.getStringByField("sentence");
String[] words = sentence.split(" ");
for (String word : words) {
collector.emit(new Values(word));
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
}
2.2.3 CountBolt
CountBolt
用于统计每个单词出现的次数。
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import java.util.HashMap;
import java.util.Map;
public class CountBolt extends BaseRichBolt {
private Map<String, Integer> counts = new HashMap<>();
private OutputCollector collector;
@Override
public void prepare(Map<String, Object> conf, TopologyContext context, OutputCollector collector) {
this.collector = collector;
}
@Override
public void execute(Tuple tuple) {
String word = tuple.getStringByField("word");
int count = counts.containsKey(word) ? counts.get(word) + 1 : 1;
counts.put(word, count);
collector.emit(new Values(word, count));
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word", "count"));
}
}
2.3 构建拓扑
最后,我们构建并提交拓扑。
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.tuple.Fields;
public class WordCountTopology {
public static void main(String[] args) throws Exception {
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("spout", new SentenceSpout(), 5);
builder.setBolt("split", new SplitBolt(), 8).shuffleGrouping("spout");
builder.setBolt("count", new CountBolt(), 12).fieldsGrouping("split", new Fields("word"));
Config config = new Config();
config.setDebug(true);
if (args != null && args.length > 0) {
config.setNumWorkers(3);
StormSubmitter.submitTopology(args[0], config, builder.createTopology());
} else {
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("word-count", config, builder.createTopology());
Thread.sleep(10000);
cluster.shutdown();
}
}
}
3. 运行拓扑
3.1 本地测试
在本地测试拓扑时,只需运行 WordCountTopology
类即可。如果一切正常,你将在控制台看到每个单词及其出现的次数。
3.2 提交到集群
要将拓扑提交到 Storm 集群,可以使用以下命令:
storm jar target/your-jar-file.jar com.yourpackage.WordCountTopology your-topology-name
torm 的强大之处在于其能够处理大规模的数据流,并且提供了丰富的 API 来构建复杂的实时处理应用。Apache Storm 是一个免费开源的分布式实时计算系统,常用于处理流数据。它能够保证每个消息都被处理,并且能够处理大量的数据流。下面我将通过一个简单的Word Count示例来展示如何使用Storm进行编程。这个例子会读取句子流,然后统计每个单词出现的次数。
环境准备
在开始之前,确保你已经安装了Java和Maven,并且下载了Apache Storm。你可以从Apache官网下载最新版本的Storm并按照官方文档完成安装。
创建Maven项目
首先,创建一个新的Maven项目,并在pom.xml
文件中添加Storm的依赖:
<dependencies>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>2.2.0</version>
<scope>provided</scope>
</dependency>
</dependencies>
编写Spout
Spout是数据流的源头,负责生成数据流。在这个例子中,我们将创建一个随机生成句子的Spout。
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
import java.util.Map;
import java.util.Random;
public class SentenceSpout extends BaseRichSpout {
private SpoutOutputCollector collector;
private String[] sentences = new String[]{
"the cow jumped over the moon",
"an apple a day keeps the doctor away",
"four score and seven years ago",
"snow white and the seven dwarfs",
"i am at two with nature"
};
private Random rand = new Random();
@Override
public void open(Map<String, Object> config, TopologyContext context, SpoutOutputCollector collector) {
this.collector = collector;
}
@Override
public void nextTuple() {
String sentence = sentences[rand.nextInt(sentences.length)];
collector.emit(new Values(sentence));
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("sentence"));
}
}
编写Bolt
Bolt负责处理数据流中的数据。在这个例子中,我们将创建两个Bolt:一个是用来分割句子为单词,另一个是用来计数。
SplitSentenceBolt
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import java.util.Map;
public class SplitSentenceBolt extends BaseRichBolt {
private OutputCollector collector;
@Override
public void prepare(Map<String, Object> config, TopologyContext context, OutputCollector collector) {
this.collector = collector;
}
@Override
public void execute(Tuple tuple) {
String sentence = tuple.getStringByField("sentence");
String[] words = sentence.split(" ");
for (String word : words) {
collector.emit(new Values(word));
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
}
WordCountBolt
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import java.util.HashMap;
import java.util.Map;
public class WordCountBolt extends BaseRichBolt {
private Map<String, Integer> counts = new HashMap<>();
private OutputCollector collector;
@Override
public void prepare(Map<String, Object> config, TopologyContext context, OutputCollector collector) {
this.collector = collector;
}
@Override
public void execute(Tuple tuple) {
String word = tuple.getStringByField("word");
Integer count = counts.getOrDefault(word, 0);
counts.put(word, count + 1);
collector.emit(new Values(word, count + 1));
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word", "count"));
}
}
定义Topology
最后,我们需要定义一个Topology来连接这些组件。
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.tuple.Fields;
public class WordCountTopology {
public static void main(String[] args) throws Exception {
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("spout", new SentenceSpout(), 5);
builder.setBolt("split", new SplitSentenceBolt(), 8).shuffleGrouping("spout");
builder.setBolt("count", new WordCountBolt(), 12).fieldsGrouping("split", new Fields("word"));
Config conf = new Config();
conf.setDebug(true);
if (args != null && args.length > 0) {
conf.setNumWorkers(3);
StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());
} else {
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("word-count", conf, builder.createTopology());
Thread.sleep(10000);
cluster.shutdown();
}
}
}
运行Topology
如果你有一个运行中的Storm集群,可以提交这个拓扑到集群中。如果没有,可以在本地模式下运行。上述代码中,如果传递了参数,则尝试提交到远程集群;否则,在本地模式下运行。
以上就是使用Apache Storm实现的一个简单的Word Count应用的完整示例。希望这能帮助你理解Storm的基本用法。Apache Storm 是一个免费开源的分布式实时计算系统。它使得处理大量数据流变得容易,可以用于实时分析、在线机器学习、持续计算等场景。下面我将通过一个简单的Storm编程案例来介绍其基本使用方法。这个例子是一个经典的单词计数(Word Count)程序,用于演示如何在Storm中处理数据流。
1. 环境准备
首先确保你的开发环境中已经安装了Java和Maven,并且配置好了Apache Storm。你可以从Apache官网下载Storm并按照官方文档进行安装。
2. 创建项目
使用Maven创建一个新的Java项目,并添加Storm依赖到pom.xml
文件中:
<dependencies>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>2.3.0</version>
<scope>provided</scope>
</dependency>
</dependencies>
3. 编写Spout
Spout是Storm拓扑中的数据源。在这个例子中,我们将创建一个模拟生成句子的Spout。
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
import java.util.Map;
import java.util.Random;
public class SentenceSpout extends BaseRichSpout {
private SpoutOutputCollector collector;
private String[] sentences = {
"the cow jumped over the moon",
"an apple a day keeps the doctor away",
"four score and seven years ago",
"snow white and the seven dwarfs",
"i am at two with nature"
};
private Random rand = new Random();
@Override
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
this.collector = collector;
}
@Override
public void nextTuple() {
String sentence = sentences[rand.nextInt(sentences.length)];
collector.emit(new Values(sentence));
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("sentence"));
}
}
4. 编写Bolt
Bolt是Storm拓扑中的处理单元,负责处理来自Spout或其他Bolt的数据流。这里我们定义两个Bolt:一个用于分割句子为单词,另一个用于统计每个单词出现的次数。
SplitSentence Bolt:
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import java.util.Map;
public class SplitSentenceBolt extends BaseRichBolt {
private OutputCollector collector;
@Override
public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
this.collector = collector;
}
@Override
public void execute(Tuple tuple) {
String sentence = tuple.getStringByField("sentence");
String[] words = sentence.split(" ");
for (String word : words) {
collector.emit(new Values(word));
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
}
WordCount Bolt:
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import java.util.HashMap;
import java.util.Map;
public class WordCountBolt extends BaseRichBolt {
private Map<String, Integer> counts = new HashMap<>();
private OutputCollector collector;
@Override
public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
this.collector = collector;
}
@Override
public void execute(Tuple tuple) {
String word = tuple.getStringByField("word");
Integer count = counts.getOrDefault(word, 0);
counts.put(word, count + 1);
collector.emit(new Values(word, count + 1));
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word", "count"));
}
}
5. 构建拓扑
最后,我们需要构建一个拓扑来连接这些组件。
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.tuple.Fields;
public class WordCountTopology {
public static void main(String[] args) throws Exception {
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("spout", new SentenceSpout(), 5);
builder.setBolt("split", new SplitSentenceBolt(), 8).shuffleGrouping("spout");
builder.setBolt("count", new WordCountBolt(), 12).fieldsGrouping("split", new Fields("word"));
Config config = new Config();
config.setDebug(true);
if (args != null && args.length > 0) {
config.setNumWorkers(3);
StormSubmitter.submitTopologyWithProgressBar(args[0], config, builder.createTopology());
} else {
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("word-count", config, builder.createTopology());
Thread.sleep(10000);
cluster.shutdown();
}
}
}
6. 运行和测试
如果是在本地运行,直接执行上述main
方法即可。如果是提交到集群,需要提供集群的地址和其他必要的配置信息。
以上就是使用Apache Storm实现的一个简单的单词计数案例。通过这个例子,你可以看到如何定义Spout和Bolt,以及如何将它们组合成一个完整的拓扑。希望这对你有所帮助!
- 点赞
- 收藏
- 关注作者
评论(0)