MRS : Flink的ProcessFunction编程demo

举报
剑指南天 发表于 2020/09/19 17:46:00 2020/09/19
【摘要】 本文主要利用Flink的ProcessFunction api、状态编程,实现传感器温度持续上升时发出告警的功能

MRS : FlinkProcessFunction编程demo

关键词:MRS  Flink  yarn  KeyedProcessFunction  状态编程  定时器

摘要:本文主要利用FlinkProcessFunction api、状态编程,实现传感器温度持续上升时发出告警的功能

 

前期准备

MRS1.9.2(非安全)Flink1.7.0

 

场景说明

假定用户有10组传感器,编号为0-9,传感器每三秒传递一条数据.当传感器温度上升时,程序注册定时器,定时为10s.如果10s内传感器温度持续上升,则发出告警,如果出现温度下降,则删除定时器

 

代码实现

package com.huawei.flink.example.alarm

import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.functions.source.{RichSourceFunction, SourceFunction}
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collector
import scala.util.Random

object ProcessFunctionTest {
 
//传感器的样例类
 
case class Sensor(id: Int, time: Long, temp: Double)

 
//数据源类
 
class ReadFrom extends RichSourceFunction[String] {
   
override def run(sourceContext: SourceFunction.SourceContext[String]): Unit = {
     
var time = 0L
     
val random: Random = new Random()
     
while (true) {
        time = time +
1000L
       
sourceContext.collect(0+ " " + time + " " + (random.nextDouble()*40).formatted("%.2f"))
        sourceContext.collect(
1+ " " + time + " " + (random.nextDouble()*40).formatted("%.2f"))
        sourceContext.collect(
2 + " " + time + " " + (random.nextDouble()*40).formatted("%.2f"))
        sourceContext.collect(
3 + " " + time + " " + (random.nextDouble()*40).formatted("%.2f"))
        sourceContext.collect(
4+ " " + time + " " + (random.nextDouble()*40).formatted("%.2f"))
        sourceContext.collect(
5+ " " + time + " " + (random.nextDouble()*40).formatted("%.2f"))
        sourceContext.collect(
6 + " " + time + " " + (random.nextDouble()*40).formatted("%.2f"))
        sourceContext.collect(
7 + " " + time + " " + (random.nextDouble()*40).formatted("%.2f"))
        sourceContext.collect(
8 + " " + time + " " + (random.nextDouble()*40).formatted("%.2f"))
        sourceContext.collect(
9 + " " + time + " " + (random.nextDouble()*40).formatted("%.2f"))
        Thread.sleep(
3000)
      }
    }
   
override def cancel(): Unit = {}
  }

 
//底层api的实现类
 
class MyProcess extends KeyedProcessFunction[Int, Sensor, String] {
   
//记录传感器上一个温度值状态
   
lazy val lastTempState: ValueState[Double] = getRuntimeContext.
      getState(
new ValueStateDescriptor[Double]("lastTemp", classOf[Double],40.0))
   
//记录定时器定时时间值状态
   
lazy val timerTimeState: ValueState[Long] = getRuntimeContext.
      getState(
new ValueStateDescriptor[Long]("timeTimer", classOf[Long]))


   
override def processElement(i: Sensor, context: KeyedProcessFunction[Int, Sensor, String]#Context,
                               
collector: Collector[String]): Unit = {
     
val lastTemp: Double = lastTempState.value()
     
val timerTime: Long = timerTimeState.value()
     
//更新传感器的温度状态
     
lastTempState.update(i.temp)
     
//实现定时任务
      //当温度上升,并且定时器关闭时,注册定时器
     
if (i.temp > lastTemp && timerTime == 0) {
       
val setTimer: Long = context.timerService().currentProcessingTime() + 10000L
       
//注册定时器
       
context.timerService().registerProcessingTimeTimer(setTimer)
       
//更新定时器定时时间值状态
       
timerTimeState.update(setTimer)
       
//当温蒂下降,注销定时器
     
} else if (i.temp < lastTemp) {
       
//注销定时器
       
context.timerService().deleteProcessingTimeTimer(timerTime)
       
//清空定时器定时时间值状态
       
timerTimeState.clear()
      }
    }
   
override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[Int, Sensor, String]#OnTimerContext,
                        
out: Collector[String]): Unit = {
     
//定时时间到,发出告警
     
out.collect("sensor " + ctx.getCurrentKey+"=>" + " temperature go up!!!")
     
//清空定时器定时时间值状态
     
timerTimeState.clear()
    }
  }

 
def main(args: Array[String]): Unit = {
   
val senv: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
   
senv.setParallelism(
1)
   
//数据源
   
val line: DataStream[String] = senv.addSource(new ReadFrom())
    line.print(
"line info")
   
val sensor: DataStream[Sensor] = line.map(data => {
     
val words: Array[String] = data.split(" ")
      Sensor(words(
0).toInt, words(1).toLong, words(2).toDouble)
    })
   
val keyedSensor: KeyedStream[Sensor, Int] = sensor.keyBy(_.id)
   
val alarm: DataStream[String] = keyedSensor.process(new MyProcess())
   
//发出告警
   
alarm.print("alarm coming...    ")
    senv.execute(
"ALARM")
  }
}

 

调测程序

1.       进入MRS集群master节点,执行”source /opt/client/bigdata_env”

2.       利用flink shell进行测试程序,执行”start-scala-shell.sh yarn -n 2”

3.       将程序的代码输入shell工具里面

import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}

import org.apache.flink.streaming.api.functions.KeyedProcessFunction

import org.apache.flink.streaming.api.functions.source.{RichSourceFunction, SourceFunction}

import org.apache.flink.streaming.api.scala._

import org.apache.flink.util.Collector

import scala.util.Random

case class Sensor(id: Int, time: Long, temp: Double)

class ReadFrom extends RichSourceFunction[String] {

  override def run(sourceContext: SourceFunction.SourceContext[String]): Unit = {

    var time = 0L

    val random: Random = new Random()

    while (true) {

      time = time + 1000L

      sourceContext.collect(0 + " " + time + " " + (random.nextDouble() * 40).formatted("%.2f"))

      sourceContext.collect(1 + " " + time + " " + (random.nextDouble() * 40).formatted("%.2f"))

      sourceContext.collect(2 + " " + time + " " + (random.nextDouble() * 40).formatted("%.2f"))

      sourceContext.collect(3 + " " + time + " " + (random.nextDouble() * 40).formatted("%.2f"))

      sourceContext.collect(4 + " " + time + " " + (random.nextDouble() * 40).formatted("%.2f"))

      sourceContext.collect(5 + " " + time + " " + (random.nextDouble() * 40).formatted("%.2f"))

      sourceContext.collect(6 + " " + time + " " + (random.nextDouble() * 40).formatted("%.2f"))

      sourceContext.collect(7 + " " + time + " " + (random.nextDouble() * 40).formatted("%.2f"))

      sourceContext.collect(8 + " " + time + " " + (random.nextDouble() * 40).formatted("%.2f"))

      sourceContext.collect(9 + " " + time + " " + (random.nextDouble() * 40).formatted("%.2f"))

      Thread.sleep(3000)

    }

  }

  override def cancel(): Unit = {}

}

 

class MyProcess extends KeyedProcessFunction[Int, Sensor, String] {

  lazy val lastTempState: ValueState[Double] = getRuntimeContext.getState(new ValueStateDescriptor[Double]("lastTemp", classOf[Double], 40.0))

  lazy val timerTimeState: ValueState[Long] = getRuntimeContext.

    getState(new ValueStateDescriptor[Long]("timeTimer", classOf[Long]))

 

  override def processElement(i: Sensor, context: KeyedProcessFunction[Int, Sensor, String]#Context,

                              collector: Collector[String]): Unit = {

    val lastTemp: Double = lastTempState.value()

    val timerTime: Long = timerTimeState.value()

    lastTempState.update(i.temp)

    if (i.temp > lastTemp && timerTime == 0) {

      val setTimer: Long = context.timerService().currentProcessingTime() + 10000L

      context.timerService().registerProcessingTimeTimer(setTimer)

      timerTimeState.update(setTimer)

    } else if (i.temp < lastTemp) {

      context.timerService().deleteProcessingTimeTimer(timerTime)

      timerTimeState.clear()

    }

  }

 

  override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[Int, Sensor, String]#OnTimerContext, out: Collector[String]): Unit = {

    out.collect("sensor " + ctx.getCurrentKey + "=>" + " temperature go up!!!")

    timerTimeState.clear()

  }

}

senv.setParallelism(1)

val line: DataStream[String] = senv.addSource(new ReadFrom())

line.print("line info")

val sensor: DataStream[Sensor] = line.map(data => {

val words: Array[String] = data.split(" ")

Sensor(words(0).toInt, words(1).toLong, words(2).toDouble)

})

val keyedSensor: KeyedStream[Sensor, Int] = sensor.keyBy(_.id)

val alarm: DataStream[String] = keyedSensor.process(new MyProcess())

alarm.print("alarm coming...    ")

senv.execute("ALARM")

4.       查看结果


【版权声明】本文为华为云社区用户原创内容,未经允许不得转载,如需转载请自行联系原作者进行授权。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。