实时大数据技术Flink之CEP

举报
tea_year 发表于 2025/09/25 11:33:55 2025/09/25
【摘要】 Flink的复杂事件处理CEP复杂事件处理(CEP)是一种基于流处理的技术,将系统数据看作不同类型的事件,通过分析事件之间的关系,建立不同的事件关系序列库,并利用过滤、关联、聚合等技术,最终由简单事件产生高级事件,并通过模式规则的方式对重要信息进行跟踪和分析,从实时数据中发掘有价值的信息。复杂事件处理主要应用于防范网络欺诈、设备故障检测、风险规避和智能营销等领域。Flink基于DataStr...

Flink的复杂事件处理CEP

复杂事件处理(CEP)是一种基于流处理的技术,将系统数据看作不同类型的事件,通过分析事件之间的关系,建立不同的事件关系序列库,并利用过滤、关联、聚合等技术,最终由简单事件产生高级事件,并通过模式规则的方式对重要信息进行跟踪和分析,从实时数据中发掘有价值的信息。复杂事件处理主要应用于防范网络欺诈、设备故障检测、风险规避和智能营销等领域。Flink基于DataStrem API提供了Flink CEP组件栈,专门用于对复杂事件的处理,帮助用户从流式数据中发掘有价值的信息。

CEP(Complex Event Processing)就是在无界事件流中检测事件模式,让我们掌握数据中重要的部分。flink CEP是在flink中实现的复杂事件处理库。

image-20230716232008121.png


1. CEP相关概念

1) 配置依赖

在使用FlinkCEP组件之前,需要将FlinkCEP的依赖库引入项目工程中。

<dependency>
   <groupId>org.apache.flink</groupId>
   <artifactId>flink-cep_2.11</artifactId>
   <version>1.13.6</version>
</dependency>


2) 事件定义

简单事件:简单事件存在于现实场景中,主要的特点为处理单一事件,事件的定义可以直接观察出来,处理过程中无须关注多个事件之间的关系,能够通过简单的数据处理手段将结果计算出来。

复杂事件:相对于简单事件,复杂事件处理的不仅是单一的事件,也处理由多个事件组成的复合事件。复杂事件处理监测分析事件流(Event Streaming),当特定事件发生时来触发某些动作。

2. Pattern API

Flink CEP中提供了Pattern API用于对输入流数据的复杂事件规则定义,并从事件流中抽取事件结果。包含四个步骤:

1、输入事件流的创建

2、Pattern的定义

3、Pattern应用在事件流上检测

4、选取结果

1) 模式定义

定义Pattern可以是单次执行模式,也可以是循环执行模式。单次执行模式一次只接受一个事件,循环执行模式可以接收一个或者多个事件。通常情况下,可以通过指定循环次数将单次执行模式变为循环执行模式。每种模式能够将多个条件组合应用到同一事件之上,条件组合可以通过where方法进行叠加。每个Pattern都是通过begin方法定义的。下一步通过Pattern.where()方法在Pattern上指定Condition,只有当Condition满足之后,当前的Pattern才会接受事件。

Pattern.<LoginEvent>begin("first").where(new SimpleCondition<LoginEvent>() {
   @Override
   public boolean filter(LoginEvent value) throws Exception {
       return value.getType().equals("fail");
  }
})


1.设置循环次数

对于已经创建好的Pattern,可以指定循环次数,形成循环执行的Pattern。

  • times:可以通过times指定固定的循环执行次数。
    //指定循环触发4次
    start.times(4);
    //可以执行触发次数范围,让循环执行次数在该范围之内
    start.times(2, 4);
  • oneOrMore:可以通过oneOrMore方法指定触发一次或多次。
    // 触发一次或者多次
    start.oneOrMore();
  • timesOrMore:通过timesOrMore方法可以指定触发固定次数以上,例如执行两次以上。
    // 触发两次或者多次
    start.timesOrMore(2);


2.定义条件

每个模式都需要指定触发条件,作为事件进入到该模式是否接受的判断依据,当事件中的数值满足了条件时,便进行下一步操作。在FlinkCFP中通过pattern.where()、pattern.or()及pattern.until()方法来为Pattern指定条件,且Pattern条件有Simple Conditions及Combining Conditions等类型。

  • 简单条件:Simple Condition继承于Iterative Condition类,其主要根据事件中的字段信息进行判断,决定是否接受该事件。

    // 把通话成功的事件挑选出来
    start.where(_.getCallType == "success")
  • 组合条件:组合条件是将简单条件进行合并,通常情况下也可以使用where方法进行条件的组合,默认每个条件通过AND逻辑相连。如果需要使用OR逻辑,直接使用or方法连接条件即可。

    // 把通话成功,或者通话时长大于10秒的事件挑选出来
    val start = Pattern.begin[StationLog]("start_pattern")
    .where(_.callType=="success")
    .or(_.duration>10)
  • 终止条件:如果程序中使用了oneOrMore或者oneOrMore().optional()方法,则必须指定终止条件,否则模式中的规则会一直循环下去,如下终止条件通过until()方法指定。

    pattern.oneOrMore.until(_.callOut.startsWith("186"))

3.模式序列

将相互独立的模式进行组合然后形成模式序列。模式序列基本的编写方式和独立模式一致,各个模式之间通过邻近条件进行连接即可,其中有严格邻近、宽松邻近、非确定宽松邻近三种邻近连接条件。


  • 严格邻近:严格邻近条件中,需要所有的事件都按照顺序满足模式条件,不允许忽略任意不满足的模式。

wps2.jpg

  • 宽松邻近:在宽松邻近条件下,会忽略没有成功匹配模式条件,并不会像严格邻近要求得那么高,可以简单理解为OR的逻辑关系。

wps3.jpg


2) 模式检测

调用 CEP.pattern(),给定输入流和模式,就能得到一个 PatternStream

//cep 做模式检测
PatternStream<LoginEvent> patternStream = CEP.pattern(ds1.keyBy(value -> value.getUserId()), pattern);


3) 选择结果

得到PatternStream类型的数据集后,接下来数据获取都基于PatternStream进行。该数据集中包含了所有的匹配事件。目前在FlinkCEP中提供select和flatSelect两种方法从PatternStream提取事件结果事件。

1.通过Select Funciton抽取正常事件

可以通过在PatternStream的Select方法中传入自定义Select Funciton完成对匹配事件的转换与输出。其中Select Funciton的输入参数为Map[String, Iterable[IN]],Map中的key为模式序列中的Pattern名称,Value为对应Pattern所接受的事件集合,格式为输入事件的数据类型。

def selectFunction(pattern : Map[String, Iterable[IN]]): OUT = {  
   //获取pattern中的startEvent  
   val startEvent = pattern.get("start_pattern").get.next    
   //获取Pattern中middleEvent    
   val middleEvent = pattern.get("middle").get.next    
   //返回结果    
   OUT(startEvent, middleEvent)
}



2.通过Flat Select Funciton抽取正常事件

Flat Select Funciton和Select Function相似,不过Flat Select Funciton在每次调用可以返回任意数量的结果。因为Flat Select Funciton使用Collector作为返回结果的容器,可以将需要输出的事件都放置在Collector中返回。

def flatSelectFn(pattern : Map[String, Iterable[IN]], collector : Collector[OUT]) = {    //获取pattern中startEvent  
   val startEvent = pattern.get("start_pattern").get.next    
   //获取Pattern中middleEvent  
   val middleEvent = pattern.get("middle").get.next    
   //并根据startEvent的Value数量进行返回  
   for (i <- 0 to startEvent.getValue) {    
       collector.collect(OUT(startEvent, middleEvent))  
  }
}


3.通过Select Funciton抽取超时事件

如果模式中有within(time),那么就很有可能有超时的数据存在,通过PatternStream. Select方法分别获取超时事件和正常事件。首先需要创建OutputTag来标记超时事件,然后在PatternStream.select方法中使用OutputTag,就可以将超时事件从PatternStream中抽取出来。

// 通过CEP.pattern方法创建
val patternStream: PatternStream[Event] = CEP.pattern(input, pattern)  //创建OutputTag,并命名为timeout-output  
val timeoutTag = OutputTag[String]("timeout-output")  
//调用PatternStream select()并指定timeoutTag val result: SingleOutputStreamOperator[NormalEvent] =   patternStream.select(timeoutTag){  
//超时事件获取    
(pattern: Map[String, Iterable[Event]], timestamp: Long) =>      
TimeoutEvent()//返回异常事件  
} {
//正常事件获取
pattern: Map[String, Iterable[Event]] =>      
NormalEvent()
//返回正常事件  
}
//调用getSideOutput方法,并指定timeoutTag将超时事件输出val timeoutResult: DataStream[TimeoutEvent] = result.getSideOutput(timeoutTag)

总结

需引入flink-cep依赖(如 Scala 2.11 版本对应flink-cep_2.11);核心概念分两类事件:简单事件(单一、可直接处理)、复杂事件(多事件按关系组成,需模式匹配)。
  1. 定义 Pattern:通过Pattern API描述规则,支持单次 / 循环模式(循环可设次数如times(2,4)oneOrMore()),并通过where()/or()/until()设匹配条件;
  2. 模式检测:调用CEP.pattern()将 Pattern 应用到事件流(可按 Key 分区),生成PatternStream
  3. 结果提取:用select()(单次返回 1 条结果)或flatSelect()(返回多条)取正常事件;若设within(time),通过OutputTag从侧输出流取超时事件。
关键特性:支持严格 / 宽松邻近的模式序列,可灵活定义事件间关联规则,保障实时性与准确性。
【声明】本内容来自华为云开发者社区博主,不代表华为云及华为云开发者社区的观点和立场。转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息,否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。