Flink实例:处理IoT事件流

举报
TiAmoZhang 发表于 2023/06/06 08:47:53 2023/06/06
【摘要】 Flink实例:处理IoT事件流

01、Flink实例:处理IoT事件流

假设一台机器上安装了传感器,用户希望从这些传感器收集数据,并每5min计算每个传感器的平均温度。其架构如图1所示。

640.png

■ 图1 IOT事件处理架构

在这个场景中,假设传感器将信息发送给Kafka的主题temp,数据格式为(传感器id、时间戳、温度)。这里假设以字符串的形式接收Kafka主题中的事件,部分数据如下:

sensor_1,1629943899014,51.087254019871054
sensor_9,1629943899014,70.44743245583899
sensor_7,1629943899014,65.53215956486392
sensor_0,1629943899014,53.210570822216546
sensor_8,1629943899014,93.12876931817556
sensor_3,1629943899014,57.55153052162809
sensor_2,1629943899014,107.61249366604993
sensor_5,1629943899014,92.02083744773739
sensor_4,1629943899014,95.7688424087137
sensor_6,1629943899014,95.04398353316257
......

现在需要编写Flink流处理代码从Kafka的temp主题读取这些数据,并使用Flink转换处理数据,因此Kafka作为数据源,Flink流处理程序作为Kafka的消费者。

这里要考虑的是,既然有来自传感器的时间戳值,那么可以使用事件时间计算时间因素。这意味着可以处理乱序的传感器数据。

Scala代码如下:

import Java.time.Duration
import org.apache.flink.api.common.eventtime.{SerializableTimestampAssigner, WatermarkStrategy}
import org.apache.flink.api.common.functions.AggregateFunction
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.connector.kafka.source.KafkaSource
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.Windows.TimeWindow
import org.apache.flink.util.Collector

object KafkaIotDemo {

//case class,流数据类型
  case class SensorReading(id:String, timestamp:Long, temperature:Double)

  def main(args: Array[String]) {
//设置流执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    val source = KafkaSource.builder[String]
      .setBootstrapServers("localhost:9092")
      .setTopics("temp")
      .setGroupId("group-test")
      .setStartingOffsets(OffsetsInitializer.earliest)
      .setValueOnlyDeserializer(new SimpleStringSchema)
      .build

//水印策略
    val watermarkStrategy = WatermarkStrategy
      .forBoundedOutOfOrderness[String](Duration.ofSeconds(1))
      .withTimestampAssigner(new SerializableTimestampAssigner[String]() {
        override def extractTimestamp(s: String, l: Long): Long =
    s.split(",")(1).toLong
      })

    env
//读取Kafka数据源
      .fromSource(source, watermarkStrategy, "Sensor temperature Source")
//转换流数据类型
      .map(s => {
          val fields: Array[String] = s.split(",")
          SensorReading(fields(0), fields(1).toLong, fields(2).toDouble)
        })
//按key分区
      .keyBy(sr => sr.id)
//开大小为 5 minutes 的滚动窗口(这里为了测试,设置为 5s)
      .window(TumblingEventTimeWindows.of(Time.seconds(5)))
//执行增量聚合
      .aggregate(new AggAvgTemp(),new ProcessAvgTemp())
//输出结果
      .print()

//触发流程序执行
    env.execute("Flink Sensor Temperature Demo")
  }

//增量处理函数
  class AggAvgTemp extends AggregateFunction[SensorReading, (Double, Long), Double] {
//创建初始ACC
    override def createAccumulator() = (0.0, 0L)

//累加每个传感器(每个分区)的事件
    override def add(sr: SensorReading, acc: (Double, Long)) =
      (sr.temperature + acc._1, acc._2 + 1L)

//分区合并
    override def merge(acc1: (Double, Long), acc2: (Double, Long)) =
      (acc1._1 + acc2._1, acc1._2 + acc2._2)

//返回每个传感器的平均温度
    override def getResult(acc: (Double, Long)): Double = acc._1 / acc._2
  }

//窗口处理函数(注意这里引入的ProcessWindowFunction不要引错了Java的)
  class ProcessAvgTemp extends ProcessWindowFunction[Double, (String, Long, Double), String, TimeWindow] {
    override def process(key: String,
                         context: Context,
                         elements: Iterable[Double],
                         out: Collector[(String, Long, Double)]): Unit = {
//计算平均温度
      val average = Math.round(elements.iterator.next * 100) / 100.0
//发送到下游算子
      out.collect((key, context.window.getEnd, average))
    }
  }
}

Java代码如下:

import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.Windows.TimeWindow;
import org.apache.flink.util.Collector;
import Java.time.Duration;

public class KafkaIotDemo {

//POJO类,温度数据类型
   public static class SensorReading {

      public String id; //传感器id
      public long timestamp; //读取时的时间戳
      public double temperature; //读取的温度值

      public SensorReading() { }

      public SensorReading(String id, long timestamp, double temperature) {
         this.id = id;
         this.timestamp = timestamp;
         this.temperature = temperature;
      }

      public String toString() {
         return "(" + this.id + ", "
                      + this.timestamp + ", "
                      + this.temperature + ")";
      }
   }

   public static void main(String[] args) throws Exception {
//设置流执行环境
      final StreamExecutionEnvironment env =
    StreamExecutionEnvironment.getExecutionEnvironment();

//数据源
      KafkaSource<String> source = KafkaSource.<String>builder()
         .setBootstrapServers("localhost:9092")
         .setTopics("temp")
         .setGroupId("group-test")
         .setStartingOffsets(OffsetsInitializer.earliest())
         .setValueOnlyDeserializer(new SimpleStringSchema())
         .build();

//水印策略
      WatermarkStrategy<String> watermarkStrategy = WatermarkStrategy
       .<String>forBoundedOutOfOrderness(Duration.ofSeconds(2))
       .withTimestampAssigner(new SerializableTimestampAssigner<String>(){
            @Override
            public long extractTimestamp(String s, long l) {
               return Long.parseLong(s.split(",")[1]);
            }
         });

      env
//指定Kafka数据源
         .fromSource(source, watermarkStrategy, "Sensor temperature Source")
//转换为DataStream<SensorReading>
         .map(new MapFunction<String, SensorReading>() {
            @Override
            public SensorReading map(String s) throws Exception {
               String[] fields = s.split(",");
               return new SensorReading(fields[0], Long.parseLong(fields[1]),
Double.parseDouble(fields[2]));
            }
         })
//转换为KeyedStream
         .keyBy(new KeySelector<SensorReading, String>() {
            @Override
            public String getKey(SensorReading sensorReading) throws Exception {
               return sensorReading.id;
            }
         })
//开大小为 5 minutes 的滚动窗口(这里为了测试,设置为 5s)
         .window(TumblingEventTimeWindows.of(Time.seconds(5)))
//执行增量聚合
         .aggregate(new AggAvgTemp(),new ProcessAvgTemp())
         .print();

//触发流程序执行
      env.execute("Flink Sensor Temperature Demo");
   }

//增量处理函数
   public static class AggAvgTemp implements AggregateFunction<
                SensorReading, //input
                Tuple2<Double,Long>, //acc, <sum, count>
    Double> { //output, avg

//创建初始ACC
      @Override
      public Tuple2<Double,Long> createAccumulator() {
         return new Tuple2<>(0.0,0L);
      }

//累加每个传感器(每个分区)的事件
      @Override
      public Tuple2<Double,Long> add(SensorReading sr, Tuple2<Double,Long> acc) {
         return new Tuple2<>(sr.temperature+acc.f0,acc.f1+1);
      }

//分区合并
      @Override
      public Tuple2<Double,Long> merge(
      Tuple2<Double,Long> acc1,
      Tuple2<Double,Long> acc2) {
         return new Tuple2<>(acc1.f0+acc2.f0,acc1.f1+acc2.f1);
      }

//返回每个传感器的平均温度
      @Override
      public Double getResult(Tuple2<Double,Long> t2) {
         return t2.f0/t2.f1;
      }
   }

//窗口处理函数
   public static class ProcessAvgTemp extends ProcessWindowFunction<
                Double, //input type
                Tuple3<String, Long, Double>, //output type
                String, //key type
                TimeWindow> { //window type

      @Override
      public void process(String id, //key
          Context context,
          Iterable<Double> events,
          Collector<Tuple3<String, Long, Double>> out) {
         double average = Math.round(events.iterator().next()*100) / 100.0;
         out.collect(new Tuple3<>(id,context.window().getEnd(),average));
      }
   }
}

对以上程序打jar包。在命令行下,执行的命令如下:

$ mvn clean package


要执行作业,首先需要启动Zookeeper集群和Kafka服务,并创建主题temp。请按以下步骤操作:

(1) 启动zookeeper服务,启动kafka服务。

打开一个终端窗口,启动ZooKeeper(不要关闭),命令如下:

$ ./bin/zookeeper-server-start.sh config/zookeeper.properties


打开另一个终端窗口,启动Kafka服务(不要关闭),命令如下:

$ ./bin/kafka-server-start.sh config/server.properties


(2) 在Kafka中创建一个名为temp的主题(topic),命令如下:

$ ./bin/kafka-topics.sh --create --Bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic temp


查看已经创建的Topic,命令如下:

$ ./bin/kafka-topics.sh --list --Bootstrap-server localhost:9092


要将作业提交到Flink集群上运行,请按以下步骤操作:

(1) 编写Shell脚本streamiot.sh,读取每行iot数据,发送给kafka的temp主题,代码如下:

#!/bin/bash
BROKER=$1
if [ -z "$1" ]; then
        BROKER="localhost:9092"
fi

cat sensortemp.csv | while read line; do
        echo "$line"
        sleep 0.1
done | ~/bigdata/kafka_2.12-2.4.1/bin/kafka-console-producer.sh --broker-list $BROKER --topic temp

注意/


streamiot.sh脚本应该具有可执行权限。如果没有,使用以下命令添加执行权限:$ chmod a+x streamiot.sh

(2)然后执行脚本streamiot.sh,命令如下:

$ ./streamiot.sh localhost:9092


(3) 提交程序jar包到集群上运行,抓取kafka的temp主题中消息,并输出在控制台,命令如下:

$ cd ~/bigdata/flink-1.13.2/
$ ./bin/flink run --class com.xueai8.java.ch03.StreamingJob ~/flinkdemos/FlinkJavaDemo-1.0-SNAPSHOT.jar


当提交作业到Flink集群上运行时,标准输出其实是到了flink-hduser-taskexecutor-0-localhost.out文件中去了,因此要查看此结果,需要查看该文件才是。使用kafka消费者脚本测试kafka中主题内容,命令如下:

$ ./bin/kafka-console-consumer.sh --Bootstrap-server localhost:9092 --topic temp --consumer-property group.id=test


观察到输出结果如下:

2> (sensor_2,1629943900000,107.4)
7> (sensor_7,1629943900000,66.11)
1> (sensor_0,1629943900000,52.86)
8> (sensor_9,1629943900000,71.53)
3> (sensor_8,1629943900000,93.09)
1> (sensor_3,1629943900000,57.93)
7> (sensor_4,1629943900000,96.48)
5> (sensor_1,1629943900000,51.64)
6> (sensor_5,1629943900000,92.0)
1> (sensor_0,1629943905000,50.59)
3> (sensor_8,1629943905000,91.96)
8> (sensor_9,1629943905000,71.15)
2> (sensor_2,1629943905000,107.48)
3> (sensor_8,1629943910000,91.13)
1> (sensor_3,1629943905000,60.1)
6> (sensor_6,1629943900000,96.08)
7> (sensor_7,1629943905000,64.93)
5> (sensor_1,1629943905000,53.84)
2> (sensor_2,1629943910000,109.88)
3> (sensor_8,1629943915000,94.77)
1> (sensor_3,1629943910000,62.18)
5> (sensor_1,1629943910000,59.87)
7> (sensor_4,1629943905000,97.99)
5> (sensor_1,1629943915000,63.32)
6> (sensor_6,1629943905000,95.86)
8> (sensor_9,1629943910000,71.55)
7> (sensor_7,1629943910000,67.59)
1> (sensor_0,1629943910000,50.96)
2> (sensor_2,1629943915000,105.35)
7> (sensor_4,1629943910000,100.87)
8> (sensor_9,1629943915000,75.48)
6> (sensor_5,1629943905000,91.47)
7> (sensor_7,1629943915000,69.73)
1> (sensor_0,1629943915000,51.91)
...
【版权声明】本文为华为云社区用户原创内容,未经允许不得转载,如需转载请自行联系原作者进行授权。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。