实时大数据技术Flink之CEP
Flink的复杂事件处理CEP
复杂事件处理(CEP)是一种基于流处理的技术,将系统数据看作不同类型的事件,通过分析事件之间的关系,建立不同的事件关系序列库,并利用过滤、关联、聚合等技术,最终由简单事件产生高级事件,并通过模式规则的方式对重要信息进行跟踪和分析,从实时数据中发掘有价值的信息。复杂事件处理主要应用于防范网络欺诈、设备故障检测、风险规避和智能营销等领域。Flink基于DataStrem API提供了Flink CEP组件栈,专门用于对复杂事件的处理,帮助用户从流式数据中发掘有价值的信息。
CEP(Complex Event Processing)就是在无界事件流中检测事件模式,让我们掌握数据中重要的部分。flink CEP是在flink中实现的复杂事件处理库。
1. CEP相关概念
1) 配置依赖
在使用FlinkCEP组件之前,需要将FlinkCEP的依赖库引入项目工程中。
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-cep_2.11</artifactId>
<version>1.13.6</version>
</dependency>
2) 事件定义
简单事件:简单事件存在于现实场景中,主要的特点为处理单一事件,事件的定义可以直接观察出来,处理过程中无须关注多个事件之间的关系,能够通过简单的数据处理手段将结果计算出来。
复杂事件:相对于简单事件,复杂事件处理的不仅是单一的事件,也处理由多个事件组成的复合事件。复杂事件处理监测分析事件流(Event Streaming),当特定事件发生时来触发某些动作。
2. Pattern API
Flink CEP中提供了Pattern API用于对输入流数据的复杂事件规则定义,并从事件流中抽取事件结果。包含四个步骤:
1、输入事件流的创建
2、Pattern的定义
3、Pattern应用在事件流上检测
4、选取结果
1) 模式定义
定义Pattern可以是单次执行模式,也可以是循环执行模式。单次执行模式一次只接受一个事件,循环执行模式可以接收一个或者多个事件。通常情况下,可以通过指定循环次数将单次执行模式变为循环执行模式。每种模式能够将多个条件组合应用到同一事件之上,条件组合可以通过where方法进行叠加。每个Pattern都是通过begin方法定义的。下一步通过Pattern.where()方法在Pattern上指定Condition,只有当Condition满足之后,当前的Pattern才会接受事件。
Pattern.<LoginEvent>begin("first").where(new SimpleCondition<LoginEvent>() {
@Override
public boolean filter(LoginEvent value) throws Exception {
return value.getType().equals("fail");
}
})
1.设置循环次数
对于已经创建好的Pattern,可以指定循环次数,形成循环执行的Pattern。
- times:可以通过times指定固定的循环执行次数。
//指定循环触发4次
start.times(4);
//可以执行触发次数范围,让循环执行次数在该范围之内
start.times(2, 4); - oneOrMore:可以通过oneOrMore方法指定触发一次或多次。
// 触发一次或者多次
start.oneOrMore(); - timesOrMore:通过timesOrMore方法可以指定触发固定次数以上,例如执行两次以上。
// 触发两次或者多次
start.timesOrMore(2);
2.定义条件
每个模式都需要指定触发条件,作为事件进入到该模式是否接受的判断依据,当事件中的数值满足了条件时,便进行下一步操作。在FlinkCFP中通过pattern.where()、pattern.or()及pattern.until()方法来为Pattern指定条件,且Pattern条件有Simple Conditions及Combining Conditions等类型。
-
简单条件:Simple Condition继承于Iterative Condition类,其主要根据事件中的字段信息进行判断,决定是否接受该事件。
// 把通话成功的事件挑选出来
start.where(_.getCallType == "success") -
组合条件:组合条件是将简单条件进行合并,通常情况下也可以使用where方法进行条件的组合,默认每个条件通过AND逻辑相连。如果需要使用OR逻辑,直接使用or方法连接条件即可。
// 把通话成功,或者通话时长大于10秒的事件挑选出来
val start = Pattern.begin[StationLog]("start_pattern")
.where(_.callType=="success")
.or(_.duration>10) -
终止条件:如果程序中使用了oneOrMore或者oneOrMore().optional()方法,则必须指定终止条件,否则模式中的规则会一直循环下去,如下终止条件通过until()方法指定。
pattern.oneOrMore.until(_.callOut.startsWith("186"))
3.模式序列
将相互独立的模式进行组合然后形成模式序列。模式序列基本的编写方式和独立模式一致,各个模式之间通过邻近条件进行连接即可,其中有严格邻近、宽松邻近、非确定宽松邻近三种邻近连接条件。
- 严格邻近:严格邻近条件中,需要所有的事件都按照顺序满足模式条件,不允许忽略任意不满足的模式。
- 宽松邻近:在宽松邻近条件下,会忽略没有成功匹配模式条件,并不会像严格邻近要求得那么高,可以简单理解为OR的逻辑关系。
2) 模式检测
调用 CEP.pattern(),给定输入流和模式,就能得到一个 PatternStream
//cep 做模式检测
PatternStream<LoginEvent> patternStream = CEP.pattern(ds1.keyBy(value -> value.getUserId()), pattern);
3) 选择结果
得到PatternStream类型的数据集后,接下来数据获取都基于PatternStream进行。该数据集中包含了所有的匹配事件。目前在FlinkCEP中提供select和flatSelect两种方法从PatternStream提取事件结果事件。
1.通过Select Funciton抽取正常事件
可以通过在PatternStream的Select方法中传入自定义Select Funciton完成对匹配事件的转换与输出。其中Select Funciton的输入参数为Map[String, Iterable[IN]],Map中的key为模式序列中的Pattern名称,Value为对应Pattern所接受的事件集合,格式为输入事件的数据类型。
def selectFunction(pattern : Map[String, Iterable[IN]]): OUT = {
//获取pattern中的startEvent
val startEvent = pattern.get("start_pattern").get.next
//获取Pattern中middleEvent
val middleEvent = pattern.get("middle").get.next
//返回结果
OUT(startEvent, middleEvent)
}
2.通过Flat Select Funciton抽取正常事件
Flat Select Funciton和Select Function相似,不过Flat Select Funciton在每次调用可以返回任意数量的结果。因为Flat Select Funciton使用Collector作为返回结果的容器,可以将需要输出的事件都放置在Collector中返回。
def flatSelectFn(pattern : Map[String, Iterable[IN]], collector : Collector[OUT]) = { //获取pattern中startEvent
val startEvent = pattern.get("start_pattern").get.next
//获取Pattern中middleEvent
val middleEvent = pattern.get("middle").get.next
//并根据startEvent的Value数量进行返回
for (i <- 0 to startEvent.getValue) {
collector.collect(OUT(startEvent, middleEvent))
}
}
3.通过Select Funciton抽取超时事件
如果模式中有within(time),那么就很有可能有超时的数据存在,通过PatternStream. Select方法分别获取超时事件和正常事件。首先需要创建OutputTag来标记超时事件,然后在PatternStream.select方法中使用OutputTag,就可以将超时事件从PatternStream中抽取出来。
// 通过CEP.pattern方法创建
val patternStream: PatternStream[Event] = CEP.pattern(input, pattern) //创建OutputTag,并命名为timeout-output
val timeoutTag = OutputTag[String]("timeout-output")
//调用PatternStream select()并指定timeoutTag val result: SingleOutputStreamOperator[NormalEvent] = patternStream.select(timeoutTag){
//超时事件获取
(pattern: Map[String, Iterable[Event]], timestamp: Long) =>
TimeoutEvent()//返回异常事件
} {
//正常事件获取
pattern: Map[String, Iterable[Event]] =>
NormalEvent()
//返回正常事件
}
//调用getSideOutput方法,并指定timeoutTag将超时事件输出val timeoutResult: DataStream[TimeoutEvent] = result.getSideOutput(timeoutTag)
总结
flink-cep
依赖(如 Scala 2.11 版本对应flink-cep_2.11
);核心概念分两类事件:简单事件(单一、可直接处理)、复杂事件(多事件按关系组成,需模式匹配)。- 定义 Pattern:通过
Pattern API
描述规则,支持单次 / 循环模式(循环可设次数如times(2,4)
、oneOrMore()
),并通过where()
/or()
/until()
设匹配条件; - 模式检测:调用
CEP.pattern()
将 Pattern 应用到事件流(可按 Key 分区),生成PatternStream
; - 结果提取:用
select()
(单次返回 1 条结果)或flatSelect()
(返回多条)取正常事件;若设within(time)
,通过OutputTag
从侧输出流取超时事件。
- 点赞
- 收藏
- 关注作者
评论(0)