MRS : Flink的ProcessFunction编程demo
MRS : Flink的ProcessFunction编程demo
关键词:MRS Flink yarn KeyedProcessFunction 状态编程 定时器
摘要:本文主要利用Flink的ProcessFunction api、状态编程,实现传感器温度持续上升时发出告警的功能
前期准备
MRS1.9.2(非安全)、Flink1.7.0
场景说明
假定用户有10组传感器,编号为0-9,传感器每三秒传递一条数据.当传感器温度上升时,程序注册定时器,定时为10s.如果10s内传感器温度持续上升,则发出告警,如果出现温度下降,则删除定时器
代码实现
package com.huawei.flink.example.alarm
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.functions.source.{RichSourceFunction, SourceFunction}
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collector
import scala.util.Random
object ProcessFunctionTest {
//传感器的样例类
case class Sensor(id: Int, time: Long, temp: Double)
//数据源类
class ReadFrom extends RichSourceFunction[String] {
override def run(sourceContext: SourceFunction.SourceContext[String]): Unit = {
var time = 0L
val random: Random = new Random()
while (true) {
time = time + 1000L
sourceContext.collect(0+ " " + time + " " + (random.nextDouble()*40).formatted("%.2f"))
sourceContext.collect(1+ " " + time + " " + (random.nextDouble()*40).formatted("%.2f"))
sourceContext.collect(2 + " " + time + " " + (random.nextDouble()*40).formatted("%.2f"))
sourceContext.collect(3 + " " + time + " " + (random.nextDouble()*40).formatted("%.2f"))
sourceContext.collect(4+ " " + time + " " + (random.nextDouble()*40).formatted("%.2f"))
sourceContext.collect(5+ " " + time + " " + (random.nextDouble()*40).formatted("%.2f"))
sourceContext.collect(6 + " " + time + " " + (random.nextDouble()*40).formatted("%.2f"))
sourceContext.collect(7 + " " + time + " " + (random.nextDouble()*40).formatted("%.2f"))
sourceContext.collect(8 + " " + time + " " + (random.nextDouble()*40).formatted("%.2f"))
sourceContext.collect(9 + " " + time + " " + (random.nextDouble()*40).formatted("%.2f"))
Thread.sleep(3000)
}
}
override def cancel(): Unit = {}
}
//底层api的实现类
class MyProcess extends KeyedProcessFunction[Int, Sensor, String] {
//记录传感器上一个温度值状态
lazy val lastTempState: ValueState[Double] = getRuntimeContext.
getState(new ValueStateDescriptor[Double]("lastTemp", classOf[Double],40.0))
//记录定时器定时时间值状态
lazy val timerTimeState: ValueState[Long] = getRuntimeContext.
getState(new ValueStateDescriptor[Long]("timeTimer", classOf[Long]))
override def processElement(i: Sensor, context: KeyedProcessFunction[Int, Sensor, String]#Context,
collector: Collector[String]): Unit = {
val lastTemp: Double = lastTempState.value()
val timerTime: Long = timerTimeState.value()
//更新传感器的温度状态
lastTempState.update(i.temp)
//实现定时任务
//当温度上升,并且定时器关闭时,注册定时器
if (i.temp > lastTemp && timerTime == 0) {
val setTimer: Long = context.timerService().currentProcessingTime() + 10000L
//注册定时器
context.timerService().registerProcessingTimeTimer(setTimer)
//更新定时器定时时间值状态
timerTimeState.update(setTimer)
//当温蒂下降,注销定时器
} else if (i.temp < lastTemp) {
//注销定时器
context.timerService().deleteProcessingTimeTimer(timerTime)
//清空定时器定时时间值状态
timerTimeState.clear()
}
}
override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[Int, Sensor, String]#OnTimerContext,
out: Collector[String]): Unit = {
//定时时间到,发出告警
out.collect("sensor " + ctx.getCurrentKey+"=>" + " temperature go up!!!")
//清空定时器定时时间值状态
timerTimeState.clear()
}
}
def main(args: Array[String]): Unit = {
val senv: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
senv.setParallelism(1)
//数据源
val line: DataStream[String] = senv.addSource(new ReadFrom())
line.print("line info")
val sensor: DataStream[Sensor] = line.map(data => {
val words: Array[String] = data.split(" ")
Sensor(words(0).toInt, words(1).toLong, words(2).toDouble)
})
val keyedSensor: KeyedStream[Sensor, Int] = sensor.keyBy(_.id)
val alarm: DataStream[String] = keyedSensor.process(new MyProcess())
//发出告警
alarm.print("alarm coming... ")
senv.execute("ALARM")
}
}
调测程序
1. 进入MRS集群master节点,执行”source /opt/client/bigdata_env”
2. 利用flink shell进行测试程序,执行”start-scala-shell.sh yarn -n 2”
3. 将程序的代码输入shell工具里面
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.functions.source.{RichSourceFunction, SourceFunction}
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collector
import scala.util.Random
case class Sensor(id: Int, time: Long, temp: Double)
class ReadFrom extends RichSourceFunction[String] {
override def run(sourceContext: SourceFunction.SourceContext[String]): Unit = {
var time = 0L
val random: Random = new Random()
while (true) {
time = time + 1000L
sourceContext.collect(0 + " " + time + " " + (random.nextDouble() * 40).formatted("%.2f"))
sourceContext.collect(1 + " " + time + " " + (random.nextDouble() * 40).formatted("%.2f"))
sourceContext.collect(2 + " " + time + " " + (random.nextDouble() * 40).formatted("%.2f"))
sourceContext.collect(3 + " " + time + " " + (random.nextDouble() * 40).formatted("%.2f"))
sourceContext.collect(4 + " " + time + " " + (random.nextDouble() * 40).formatted("%.2f"))
sourceContext.collect(5 + " " + time + " " + (random.nextDouble() * 40).formatted("%.2f"))
sourceContext.collect(6 + " " + time + " " + (random.nextDouble() * 40).formatted("%.2f"))
sourceContext.collect(7 + " " + time + " " + (random.nextDouble() * 40).formatted("%.2f"))
sourceContext.collect(8 + " " + time + " " + (random.nextDouble() * 40).formatted("%.2f"))
sourceContext.collect(9 + " " + time + " " + (random.nextDouble() * 40).formatted("%.2f"))
Thread.sleep(3000)
}
}
override def cancel(): Unit = {}
}
class MyProcess extends KeyedProcessFunction[Int, Sensor, String] {
lazy val lastTempState: ValueState[Double] = getRuntimeContext.getState(new ValueStateDescriptor[Double]("lastTemp", classOf[Double], 40.0))
lazy val timerTimeState: ValueState[Long] = getRuntimeContext.
getState(new ValueStateDescriptor[Long]("timeTimer", classOf[Long]))
override def processElement(i: Sensor, context: KeyedProcessFunction[Int, Sensor, String]#Context,
collector: Collector[String]): Unit = {
val lastTemp: Double = lastTempState.value()
val timerTime: Long = timerTimeState.value()
lastTempState.update(i.temp)
if (i.temp > lastTemp && timerTime == 0) {
val setTimer: Long = context.timerService().currentProcessingTime() + 10000L
context.timerService().registerProcessingTimeTimer(setTimer)
timerTimeState.update(setTimer)
} else if (i.temp < lastTemp) {
context.timerService().deleteProcessingTimeTimer(timerTime)
timerTimeState.clear()
}
}
override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[Int, Sensor, String]#OnTimerContext, out: Collector[String]): Unit = {
out.collect("sensor " + ctx.getCurrentKey + "=>" + " temperature go up!!!")
timerTimeState.clear()
}
}
senv.setParallelism(1)
val line: DataStream[String] = senv.addSource(new ReadFrom())
line.print("line info")
val sensor: DataStream[Sensor] = line.map(data => {
val words: Array[String] = data.split(" ")
Sensor(words(0).toInt, words(1).toLong, words(2).toDouble)
})
val keyedSensor: KeyedStream[Sensor, Int] = sensor.keyBy(_.id)
val alarm: DataStream[String] = keyedSensor.process(new MyProcess())
alarm.print("alarm coming... ")
senv.execute("ALARM")
4. 查看结果
- 点赞
- 收藏
- 关注作者
评论(0)