【详解】搭建Storm集群

举报
皮牙子抓饭 发表于 2025/02/11 22:39:18 2025/02/11
【摘要】 搭建Storm集群Apache Storm 是一个免费开源的分布式实时计算系统。它使得处理大规模数据流变得简单而可靠。本文将详细介绍如何搭建一个基本的Storm集群,包括环境准备、配置文件修改以及启动集群等步骤。环境准备硬件要求至少3台机器(1个Nimbus节点和2个Supervisor节点)每台机器至少4GB内存,推荐8GB或以上每台机器至少1个CPU核心,推荐2个或以上软件要求Java ...

搭建Storm集群

Apache Storm 是一个免费开源的分布式实时计算系统。它使得处理大规模数据流变得简单而可靠。本文将详细介绍如何搭建一个基本的Storm集群,包括环境准备、配置文件修改以及启动集群等步骤。

环境准备

硬件要求

  • 至少3台机器(1个Nimbus节点和2个Supervisor节点)
  • 每台机器至少4GB内存,推荐8GB或以上
  • 每台机器至少1个CPU核心,推荐2个或以上

软件要求

  • Java 8 或更高版本
  • Apache ZooKeeper 3.4.x 版本
  • Apache Storm 1.2.x 版本

安装Java

确保所有节点上都安装了Java,并且版本不低于Java 8。可以通过以下命令检查Java版本:

java -version

安装ZooKeeper

  1. 下载并解压ZooKeeper:
wget https://archive.apache.org/dist/zookeeper/zookeeper-3.4.14/zookeeper-3.4.14.tar.gz
tar -zxvf zookeeper-3.4.14.tar.gz
  1. 配置zoo.cfg文件:
cd zookeeper-3.4.14/conf
cp zoo_sample.cfg zoo.cfg

编辑​​zoo.cfg​​文件,添加所有ZooKeeper节点的地址:

tickTime=2000
dataDir=/var/lib/zookeeper
clientPort=2181
initLimit=5
syncLimit=2
server.1=zookeeper1:2888:3888
server.2=zookeeper2:2888:3888
server.3=zookeeper3:2888:3888
  1. 在每台ZooKeeper节点的/var/lib/zookeeper目录下创建myid文件,并写入对应的ID(1, 2, 3):
echo "1" > /var/lib/zookeeper/myid
  1. 启动ZooKeeper:
bin/zkServer.sh start

安装Storm

  1. 下载并解压Storm:
wget https://archive.apache.org/dist/storm/apache-storm-1.2.3/apache-storm-1.2.3.tar.gz
tar -zxvf apache-storm-1.2.3.tar.gz
  1. 配置​​storm.yaml​​文件:
cd apache-storm-1.2.3/conf
cp storm.yaml.example storm.yaml

编辑​​storm.yaml​​文件,配置Nimbus和Supervisor节点:

storm.zookeeper.servers:
  - "zookeeper1"
  - "zookeeper2"
  - "zookeeper3"

nimbus.seeds: ["nimbus1"]

supervisor.slots.ports:
  - 6700
  - 6701
  - 6702
  - 6703

storm.local.dir: "/var/lib/storm"

启动Storm集群

启动Nimbus节点

在Nimbus节点上执行以下命令:

bin/storm nimbus &

启动Supervisor节点

在每个Supervisor节点上执行以下命令:

bin/storm supervisor &

启动UI

在任意一台节点上启动Storm UI,用于监控集群状态:

bin/storm ui &

验证集群状态

打开浏览器,访问​​http://<nimbus-node>:8080​​,你应该能看到Storm UI界面,显示集群的状态和拓扑信息。

通过上述步骤,我们成功搭建了一个基本的Storm集群。接下来,你可以开始部署和运行你的实时计算任务。希望本文对你有所帮助!

Apache Storm 是一个免费开源的分布式实时计算系统,常用于处理大规模的流数据。下面我将通过一个简单的例子来展示如何搭建一个基本的 Storm 集群,并编写一个简单的拓扑(Topology)来处理流数据。

1. 环境准备

首先,确保你的机器上安装了 Java 和 Zookeeper。Storm 使用 Zookeeper 来管理集群中的节点和状态信息。

  • Java: 至少需要 JDK 8。
  • Zookeeper: 可以单独安装或使用 Storm 自带的 Zookeeper。

2. 下载并配置 Storm

  1. 下载 Storm:
wget https://dlcdn.apache.org/storm/apache-storm-2.3.0/apache-storm-2.3.0.tar.gz
tar -xzf apache-storm-2.3.0.tar.gz
cd apache-storm-2.3.0
  1. 配置 Storm: 编辑 ​​conf/storm.yaml​​ 文件,设置 Nimbus 和 Supervisor 的主机名:
nimbus.seeds: ["nimbus-host"]
supervisor.slots.ports:
  - 6700
  - 6701
  - 6702
  - 6703

3. 启动 Storm 集群

在 Nimbus 节点上启动 Nimbus 和 UI 服务:

bin/storm nimbus &
bin/storm ui &

在每个 Supervisor 节点上启动 Supervisor 服务:

bin/storm supervisor &

4. 编写一个简单的 Storm 拓扑

假设我们有一个简单的拓扑,用于读取日志文件并统计其中的单词出现次数。

  1. 创建 Maven 项目: 创建一个新的 Maven 项目,并在 ​​pom.xml​​ 中添加 Storm 依赖:
<dependencies>
    <dependency>
        <groupId>org.apache.storm</groupId>
        <artifactId>storm-core</artifactId>
        <version>2.3.0</version>
    </dependency>
</dependencies>
  1. 编写 Spout: 创建一个 Spout 用于读取日志文件并发出单词:
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;

import java.io.BufferedReader;
import java.io.FileReader;
import java.util.Map;

public class LogFileSpout extends BaseRichSpout {
    private SpoutOutputCollector collector;
    private BufferedReader reader;

    @Override
    public void open(Map<String, Object> conf, TopologyContext context, SpoutOutputCollector collector) {
        this.collector = collector;
        try {
            reader = new BufferedReader(new FileReader("/path/to/logfile.log"));
        } catch (Exception e) {
            throw new RuntimeException("Error opening log file", e);
        }
    }

    @Override
    public void nextTuple() {
        String line;
        try {
            if ((line = reader.readLine()) != null) {
                String[] words = line.split("\\s+");
                for (String word : words) {
                    collector.emit(new Values(word));
                }
            }
        } catch (Exception e) {
            throw new RuntimeException("Error reading from log file", e);
        }
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word"));
    }
}
  1. 编写 Bolt: 创建一个 Bolt 用于统计单词出现次数:
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;

import java.util.HashMap;
import java.util.Map;

public class WordCountBolt extends BaseRichBolt {
    private OutputCollector collector;
    private Map<String, Integer> counts;

    @Override
    public void prepare(Map<String, Object> conf, TopologyContext context, OutputCollector collector) {
        this.collector = collector;
        this.counts = new HashMap<>();
    }

    @Override
    public void execute(Tuple tuple) {
        String word = tuple.getStringByField("word");
        int count = counts.getOrDefault(word, 0) + 1;
        counts.put(word, count);
        collector.emit(new Values(word, count));
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word", "count"));
    }
}
  1. 定义并提交拓扑: 创建一个主类来定义和提交拓扑:
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.tuple.Fields;

public class WordCountTopology {
    public static void main(String[] args) throws Exception {
        TopologyBuilder builder = new TopologyBuilder();

        builder.setSpout("spout", new LogFileSpout(), 1);
        builder.setBolt("count", new WordCountBolt(), 1).shuffleGrouping("spout");

        Config config = new Config();
        config.setDebug(true);

        if (args.length > 0) {
            config.setNumWorkers(3);
            StormSubmitter.submitTopology(args[0], config, builder.createTopology());
        } else {
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology("word-count", config, builder.createTopology());
            Thread.sleep(10000);
            cluster.shutdown();
        }
    }
}

5. 运行拓扑

  1. 本地测试: 在本地运行拓扑进行测试:
mvn clean package
java -cp target/your-project.jar com.yourcompany.WordCountTopology
  1. 提交到集群: 将拓扑提交到 Storm 集群:
storm jar target/your-project.jar com.yourcompany.WordCountTopology word-count-topology

6. 监控和调试

你可以通过 Storm UI (​​http://nimbus-host:8080​​) 来监控拓扑的运行情况,查看各个组件的状态和性能指标。

以上是一个基本的 Storm 集群搭建和拓扑编写的示例。根据实际需求,你可能需要对配置和代码进行更多的调整和优化。Apache Storm 是一个免费的、开源的分布式实时计算系统。它使得处理流数据变得简单和可靠,可以用于实时分析、在线机器学习、持续计算、分布式远程过程调用(RPC)等场景。在搭建一个基本的 Storm 集群时,涉及到的主要组件包括 Nimbus、Supervisor 和 ZooKeeper。下面将详细介绍如何通过代码配置这些组件来搭建一个简单的 Storm 集群。

1. 环境准备

首先,确保你的环境中已经安装了 Java、ZooKeeper 和 Apache Storm。ZooKeeper 是一个分布式的协调服务,Storm 使用它来进行节点之间的协调和状态管理。

  • Java: Storm 运行需要 Java 环境,建议使用 JDK 8 或更高版本。
  • ZooKeeper: 安装并启动 ZooKeeper 服务。你可以从 ZooKeeper 的官方网站下载最新版本,并按照官方文档进行安装和配置。
  • Apache Storm: 从 Apache Storm 官方网站下载最新版本的 Storm 并解压到你的服务器上。

2. 配置 Storm

Storm 的配置文件主要位于 ​​conf/storm.yaml​​ 中。你需要根据自己的环境对这个文件进行适当的修改。以下是一些关键配置项:

  • nimbus.host: 指定 Nimbus 节点的主机名或 IP 地址。
  • storm.zookeeper.servers: 列出所有 ZooKeeper 服务器的地址。
  • storm.local.dir: 指定本地目录,用于存储 Storm 的临时文件。
  • supervisor.slots.ports: 指定 Supervisor 可以使用的端口列表,每个端口对应一个 worker 进程。

示例配置:

nimbus.host: "nimbus-hostname"
storm.zookeeper.servers:
  - "zookeeper1-hostname"
  - "zookeeper2-hostname"
storm.local.dir: "/path/to/local/directory"
supervisor.slots.ports:
  - 6700
  - 6701
  - 6702
  - 6703

3. 启动 Storm 组件

启动 Nimbus

Nimbus 是 Storm 的主节点,负责分配任务给各个 Supervisor,并监控它们的状态。在 Nimbus 节点上运行以下命令:

$ storm nimbus
启动 Supervisor

Supervisor 是工作节点,负责监听 Nimbus 分配的任务,并启动 worker 进程执行任务。在每个 Supervisor 节点上运行以下命令:

$ storm supervisor
启动 UI

Storm 提供了一个 Web 界面,可以用来监控集群的状态和拓扑的信息。在任意节点上运行以下命令:

$ storm ui

默认情况下,UI 服务会运行在 8080 端口,你可以在浏览器中访问 ​​http://<your-host>:8080​​ 来查看 UI 界面。

4. 测试集群

为了验证集群是否正确搭建,可以提交一个简单的 Topology 进行测试。例如,创建一个简单的 WordCount 拓扑,然后使用以下命令提交:

$ storm jar /path/to/your/topology.jar com.yourcompany.topology.WordCountTopology wordcount

这里假设你的拓扑类名为 ​​WordCountTopology​​​,并且位于包 ​​com.yourcompany.topology​​ 中。

5. 监控和维护

通过 Storm UI,你可以监控拓扑的运行状态,包括处理速率、错误日志等信息。此外,定期检查 Nimbus 和 Supervisor 的日志文件,可以帮助你及时发现并解决问题。

以上就是搭建一个基本的 Storm 集群的过程。希望这些信息对你有所帮助!如果有任何问题或需要进一步的帮助,请随时提问。

【声明】本内容来自华为云开发者社区博主,不代表华为云及华为云开发者社区的观点和立场。转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息,否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。