【详解】Storm简单编程案例

举报
皮牙子抓饭 发表于 2025/04/15 19:29:56 2025/04/15
【摘要】 Storm简单编程案例Apache Storm 是一个免费的开源分布式实时计算系统。它使得处理大量流数据变得容易,能够可靠地处理无限的数据流。本文将通过一个简单的编程案例来介绍如何使用 Apache Storm。1. 环境搭建在开始之前,确保你的开发环境已经安装了以下工具:Java JDK 8 或更高版本Maven 3.0 或更高版本Apache Storm 1.2.3 或更高版本安装步骤安...

Storm简单编程案例

Apache Storm 是一个免费的开源分布式实时计算系统。它使得处理大量流数据变得容易,能够可靠地处理无限的数据流。本文将通过一个简单的编程案例来介绍如何使用 Apache Storm。

1. 环境搭建

在开始之前,确保你的开发环境已经安装了以下工具:

  • Java JDK 8 或更高版本
  • Maven 3.0 或更高版本
  • Apache Storm 1.2.3 或更高版本

安装步骤

  1. 安装 Java 和 Maven:根据官方文档安装 Java 和 Maven。
  2. 下载并解压 Storm:从 ​​Apache Storm 官方网站​​ 下载最新版本的 Storm,并解压到你选择的目录。
  3. 配置环境变量:将 Storm 的 ​​bin​​ 目录添加到系统的 PATH 环境变量中。
  4. 启动 Storm 集群:运行 ​​storm nimbus​​ 和 ​​storm supervisor​​ 命令启动 Nimbus 和 Supervisor 进程。

2. 创建一个简单的 Storm 拓扑

我们将创建一个简单的 Storm 拓扑,该拓扑从一个句子生成单词流,并统计每个单词出现的次数。

2.1 创建 Maven 项目

首先,使用 Maven 创建一个新的 Java 项目,并在 ​​pom.xml​​ 文件中添加 Storm 的依赖:

<dependencies>
    <dependency>
        <groupId>org.apache.storm</groupId>
        <artifactId>storm-core</artifactId>
        <version>1.2.3</version>
        <scope>provided</scope>
    </dependency>
</dependencies>

2.2 编写 Spout 和 Bolt

2.2.1 SentenceSpout

​SentenceSpout​​ 是一个数据源,用于生成句子。

import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;

import java.util.Map;
import java.util.Random;

public class SentenceSpout extends BaseRichSpout {
    private SpoutOutputCollector collector;
    private String[] sentences = new String[]{
            "the cow jumped over the moon",
            "an apple a day keeps the doctor away",
            "four score and seven years ago",
            "snow white and the seven dwarfs",
            "i am at two with nature"
    };
    private Random rand = new Random();

    @Override
    public void open(Map<String, Object> conf, TopologyContext context, SpoutOutputCollector collector) {
        this.collector = collector;
    }

    @Override
    public void nextTuple() {
        String sentence = sentences[rand.nextInt(sentences.length)];
        collector.emit(new Values(sentence));
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("sentence"));
    }
}
2.2.2 SplitBolt

​SplitBolt​​ 用于将句子拆分成单词。

import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;

import java.util.Map;

public class SplitBolt extends BaseRichBolt {
    private OutputCollector collector;

    @Override
    public void prepare(Map<String, Object> conf, TopologyContext context, OutputCollector collector) {
        this.collector = collector;
    }

    @Override
    public void execute(Tuple tuple) {
        String sentence = tuple.getStringByField("sentence");
        String[] words = sentence.split(" ");
        for (String word : words) {
            collector.emit(new Values(word));
        }
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word"));
    }
}
2.2.3 CountBolt

​CountBolt​​ 用于统计每个单词出现的次数。

import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;

import java.util.HashMap;
import java.util.Map;

public class CountBolt extends BaseRichBolt {
    private Map<String, Integer> counts = new HashMap<>();
    private OutputCollector collector;

    @Override
    public void prepare(Map<String, Object> conf, TopologyContext context, OutputCollector collector) {
        this.collector = collector;
    }

    @Override
    public void execute(Tuple tuple) {
        String word = tuple.getStringByField("word");
        int count = counts.containsKey(word) ? counts.get(word) + 1 : 1;
        counts.put(word, count);
        collector.emit(new Values(word, count));
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word", "count"));
    }
}

2.3 构建拓扑

最后,我们构建并提交拓扑。

import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.tuple.Fields;

public class WordCountTopology {
    public static void main(String[] args) throws Exception {
        TopologyBuilder builder = new TopologyBuilder();

        builder.setSpout("spout", new SentenceSpout(), 5);
        builder.setBolt("split", new SplitBolt(), 8).shuffleGrouping("spout");
        builder.setBolt("count", new CountBolt(), 12).fieldsGrouping("split", new Fields("word"));

        Config config = new Config();
        config.setDebug(true);

        if (args != null && args.length > 0) {
            config.setNumWorkers(3);
            StormSubmitter.submitTopology(args[0], config, builder.createTopology());
        } else {
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology("word-count", config, builder.createTopology());

            Thread.sleep(10000);

            cluster.shutdown();
        }
    }
}

3. 运行拓扑

3.1 本地测试

在本地测试拓扑时,只需运行 ​​WordCountTopology​​ 类即可。如果一切正常,你将在控制台看到每个单词及其出现的次数。

3.2 提交到集群

要将拓扑提交到 Storm 集群,可以使用以下命令:

storm jar target/your-jar-file.jar com.yourpackage.WordCountTopology your-topology-name

torm 的强大之处在于其能够处理大规模的数据流,并且提供了丰富的 API 来构建复杂的实时处理应用。Apache Storm 是一个免费开源的分布式实时计算系统,常用于处理流数据。它能够保证每个消息都被处理,并且能够处理大量的数据流。下面我将通过一个简单的Word Count示例来展示如何使用Storm进行编程。这个例子会读取句子流,然后统计每个单词出现的次数。

环境准备

在开始之前,确保你已经安装了Java和Maven,并且下载了Apache Storm。你可以从Apache官网下载最新版本的Storm并按照官方文档完成安装。

创建Maven项目

首先,创建一个新的Maven项目,并在​​pom.xml​​文件中添加Storm的依赖:

<dependencies>
    <dependency>
        <groupId>org.apache.storm</groupId>
        <artifactId>storm-core</artifactId>
        <version>2.2.0</version>
        <scope>provided</scope>
    </dependency>
</dependencies>

编写Spout

Spout是数据流的源头,负责生成数据流。在这个例子中,我们将创建一个随机生成句子的Spout。

import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;

import java.util.Map;
import java.util.Random;

public class SentenceSpout extends BaseRichSpout {
    private SpoutOutputCollector collector;
    private String[] sentences = new String[]{
            "the cow jumped over the moon",
            "an apple a day keeps the doctor away",
            "four score and seven years ago",
            "snow white and the seven dwarfs",
            "i am at two with nature"
    };
    private Random rand = new Random();

    @Override
    public void open(Map<String, Object> config, TopologyContext context, SpoutOutputCollector collector) {
        this.collector = collector;
    }

    @Override
    public void nextTuple() {
        String sentence = sentences[rand.nextInt(sentences.length)];
        collector.emit(new Values(sentence));
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("sentence"));
    }
}

编写Bolt

Bolt负责处理数据流中的数据。在这个例子中,我们将创建两个Bolt:一个是用来分割句子为单词,另一个是用来计数。

SplitSentenceBolt
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;

import java.util.Map;

public class SplitSentenceBolt extends BaseRichBolt {
    private OutputCollector collector;

    @Override
    public void prepare(Map<String, Object> config, TopologyContext context, OutputCollector collector) {
        this.collector = collector;
    }

    @Override
    public void execute(Tuple tuple) {
        String sentence = tuple.getStringByField("sentence");
        String[] words = sentence.split(" ");
        for (String word : words) {
            collector.emit(new Values(word));
        }
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word"));
    }
}
WordCountBolt
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;

import java.util.HashMap;
import java.util.Map;

public class WordCountBolt extends BaseRichBolt {
    private Map<String, Integer> counts = new HashMap<>();
    private OutputCollector collector;

    @Override
    public void prepare(Map<String, Object> config, TopologyContext context, OutputCollector collector) {
        this.collector = collector;
    }

    @Override
    public void execute(Tuple tuple) {
        String word = tuple.getStringByField("word");
        Integer count = counts.getOrDefault(word, 0);
        counts.put(word, count + 1);
        collector.emit(new Values(word, count + 1));
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word", "count"));
    }
}

定义Topology

最后,我们需要定义一个Topology来连接这些组件。

import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.tuple.Fields;

public class WordCountTopology {
    public static void main(String[] args) throws Exception {
        TopologyBuilder builder = new TopologyBuilder();

        builder.setSpout("spout", new SentenceSpout(), 5);
        builder.setBolt("split", new SplitSentenceBolt(), 8).shuffleGrouping("spout");
        builder.setBolt("count", new WordCountBolt(), 12).fieldsGrouping("split", new Fields("word"));

        Config conf = new Config();
        conf.setDebug(true);

        if (args != null && args.length > 0) {
            conf.setNumWorkers(3);
            StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());
        } else {
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology("word-count", conf, builder.createTopology());

            Thread.sleep(10000);

            cluster.shutdown();
        }
    }
}

运行Topology

如果你有一个运行中的Storm集群,可以提交这个拓扑到集群中。如果没有,可以在本地模式下运行。上述代码中,如果传递了参数,则尝试提交到远程集群;否则,在本地模式下运行。

以上就是使用Apache Storm实现的一个简单的Word Count应用的完整示例。希望这能帮助你理解Storm的基本用法。Apache Storm 是一个免费开源的分布式实时计算系统。它使得处理大量数据流变得容易,可以用于实时分析、在线机器学习、持续计算等场景。下面我将通过一个简单的Storm编程案例来介绍其基本使用方法。这个例子是一个经典的单词计数(Word Count)程序,用于演示如何在Storm中处理数据流。

1. 环境准备

首先确保你的开发环境中已经安装了Java和Maven,并且配置好了Apache Storm。你可以从Apache官网下载Storm并按照官方文档进行安装。

2. 创建项目

使用Maven创建一个新的Java项目,并添加Storm依赖到​​pom.xml​​文件中:

<dependencies>
    <dependency>
        <groupId>org.apache.storm</groupId>
        <artifactId>storm-core</artifactId>
        <version>2.3.0</version>
        <scope>provided</scope>
    </dependency>
</dependencies>

3. 编写Spout

Spout是Storm拓扑中的数据源。在这个例子中,我们将创建一个模拟生成句子的Spout。

import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;

import java.util.Map;
import java.util.Random;

public class SentenceSpout extends BaseRichSpout {
    private SpoutOutputCollector collector;
    private String[] sentences = {
            "the cow jumped over the moon",
            "an apple a day keeps the doctor away",
            "four score and seven years ago",
            "snow white and the seven dwarfs",
            "i am at two with nature"
    };
    private Random rand = new Random();

    @Override
    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
        this.collector = collector;
    }

    @Override
    public void nextTuple() {
        String sentence = sentences[rand.nextInt(sentences.length)];
        collector.emit(new Values(sentence));
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("sentence"));
    }
}

4. 编写Bolt

Bolt是Storm拓扑中的处理单元,负责处理来自Spout或其他Bolt的数据流。这里我们定义两个Bolt:一个用于分割句子为单词,另一个用于统计每个单词出现的次数。

SplitSentence Bolt:

import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;

import java.util.Map;

public class SplitSentenceBolt extends BaseRichBolt {
    private OutputCollector collector;

    @Override
    public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
        this.collector = collector;
    }

    @Override
    public void execute(Tuple tuple) {
        String sentence = tuple.getStringByField("sentence");
        String[] words = sentence.split(" ");
        for (String word : words) {
            collector.emit(new Values(word));
        }
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word"));
    }
}

WordCount Bolt:

import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;

import java.util.HashMap;
import java.util.Map;

public class WordCountBolt extends BaseRichBolt {
    private Map<String, Integer> counts = new HashMap<>();
    private OutputCollector collector;

    @Override
    public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
        this.collector = collector;
    }

    @Override
    public void execute(Tuple tuple) {
        String word = tuple.getStringByField("word");
        Integer count = counts.getOrDefault(word, 0);
        counts.put(word, count + 1);
        collector.emit(new Values(word, count + 1));
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word", "count"));
    }
}

5. 构建拓扑

最后,我们需要构建一个拓扑来连接这些组件。

import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.tuple.Fields;

public class WordCountTopology {
    public static void main(String[] args) throws Exception {
        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("spout", new SentenceSpout(), 5);
        builder.setBolt("split", new SplitSentenceBolt(), 8).shuffleGrouping("spout");
        builder.setBolt("count", new WordCountBolt(), 12).fieldsGrouping("split", new Fields("word"));

        Config config = new Config();
        config.setDebug(true);

        if (args != null && args.length > 0) {
            config.setNumWorkers(3);
            StormSubmitter.submitTopologyWithProgressBar(args[0], config, builder.createTopology());
        } else {
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology("word-count", config, builder.createTopology());
            Thread.sleep(10000);
            cluster.shutdown();
        }
    }
}

6. 运行和测试

如果是在本地运行,直接执行上述​​main​​方法即可。如果是提交到集群,需要提供集群的地址和其他必要的配置信息。

以上就是使用Apache Storm实现的一个简单的单词计数案例。通过这个例子,你可以看到如何定义Spout和Bolt,以及如何将它们组合成一个完整的拓扑。希望这对你有所帮助!

【声明】本内容来自华为云开发者社区博主,不代表华为云及华为云开发者社区的观点和立场。转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息,否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。