Esper剖析
最近在看论文,发现文中有些语言自己未曾见过,经过一番搜索,才发觉是自己接触到了新知识。
官网:
Esper为一款开源的实时分析引擎。是一个强大的支持ESP(Event Stream Process)和CEP(Complex Event Process)分析的引擎,其EPL解析语言可以通过简单的写一条类似SQL一样的语句完成统计。
说到Esper,不得不说一下CEP。CEP即Complex Event Process,中文意思就是“复杂事件处理”。听起来好像很复杂,实际上就是基于事件流进行数据处理,把要分析的数据抽象成事件,然后将数据发送到CEP引擎,引擎就会根据事件的输入和最初注册的处理模型,得到事件处理结果。
CEP是一种标准,Esper只是对这个标准的一种开源实现。除了Esper,很多大公司也有类似的商业软件,比如IBM,Sybase等等,听说巨贵无比。CEP的一个重要特点就是他是一个内存计算工具和类SQL语句。内存计算可以说是一把双刃剑。好处自不必说,一个字:快!坏处也显而易见,数据有丢失的风险,而且还有容量的限制(实时计算其实并不受制于内存大小,而是得看如何对实时进行定义,也就是具体的业务来决定了)。所以如果业务不能容忍数据丢失,那么高可用方案就必须做好,不过Esper的高可用很不好做。
CEP的类SQL语句,可以理解为处理模型的定义与描述。这是运行在CEP引擎中的特殊语句,之所以叫他类SQL,是因为它和SQL确实很像,除了select,insert,delete,update,而且也有avg,count等函数。所以对于会SQL的人来说,他的语法结构大致还是能猜出一二的。在Esper中,这个句子叫做EPL,即Event Process Language。作为Esper的核心内容,对于它的讲解有三四百页的英文文档。
相比之下,storm,twitter做的,大公司大品牌,完全开源,看上去的确不错。但是看了一下功能,相当于是一个实时hadoop,只能帮你计算,但是怎么算还得你自己写程序,离我想要的功能上有一些距离。目前想使用实时分析的产品一大堆,大部分产品数据量都不大,每个都给他们写段分析代码,加上调试,能累死人,维护代价高。
首先介绍一下ESPER的大体结构,esper从内容上分为两块,esper的核心esper-4.x.x.jar和esper-io。
(1)esper的核心包包含了EPL语法解析引擎,事件监听机制,事件处理等核心模块。
(2)esper的io包含从各种数据源读取数据以及将输出结果写入各种数据源,包括excel,database,JMS,http,socket,XML。
1. Event对象:ESPER处理的事件的最小单位,一个任意的JavaBean对象,属性支持简单的java类型、数组、map、以及嵌套JavaBean,很灵活,下面是一个简单的Event对象:
public class OrderEvent {
private String itemName;
private double price;
public OrderEvent(String itemName, double price) {
this.itemName = itemName;
this.price = price;
}
public String getItemName() {
return itemName;
}
public double getPrice() {
return price;
}
}
2.EPL:EPL是ESPER的核心,它类似于SQL,但是和SQL的执行方式不同。SQL是数据在那里,你每次执行SQL就会触发一次查询;而EPL是查询在这里,数据输入达到一定条件即可触发查询。
这个条件可以有多种:
a).每个event对象来就触发一次查询,并只处理当前对象
select * from OrderEvent
这个EPL语句会在每个OrderEvent对象到达后,并将该event交给后续的Listener进行处理。但是这种用法不多见,意义不大。
b).窗口处理模式:
EPL最大的特色就是这个窗口处理模式,有两种窗口,时间窗口和长度窗口。
时间窗口:大家想一下,如果有一个场景,要获取最近3秒内OrderEvent的price的平均值,那该怎么做呢?一般的做法需要做个后台线程来做3秒的时间统计,时间到了再做后续处理,虽然不复杂,但是也挺繁琐的。
来看看EPL是怎么做的:
select avg(price) from test.OrderEvent.win:time(3 sec)
win:time(3sec)就是定义了3秒的时间窗口,avg(price)就是统计了3秒内的OrderEvent对象的price的平均值。
长度窗口:长度窗口和时间窗口比较类似
select avg(price) from test.OrderEvent.win:length(100)
win:length(10)就是定义了10个Event的,avg(price)就是统计了最近10个的OrderEvent对象的price的平均值。
EPL语法
EPL,全称Event Processing Language,是一种类似SQL的语言,包含了SELECT, FROM, WHERE, GROUP BY, HAVING 和 ORDER BY子句,同时用事件流代替了table作为数据源,并且能像SQL那样join,filtering和aggregation。所以如果各位有SQL基础的话,简单的EPL很容易掌握。除了select,EPL也有insert into,update,delete,不过含义和SQL并不是很接近。另外还有pattern和output子句,这两个是SQL所没有的。EPL还定义了一个叫view的东西,类似SQL的table,来决定哪些数据是可用的,Esper提供了十多个view,并且保证这些view可以被重复使用。而且用户还可以扩展view成为自定义view来满足需求。在view的基础上,EPL还提供了named window的定义,作用和view类似,但是更加灵活。。。
Select Clause和From Clause。这个两个可以说是写EPL必备,要想得到事件流的处理结果,基本上就靠他们俩了(Pattern除外)。
Select Clause
1.查询事件流的所有属性及特定属性
EPL的select和SQL的select很相近,SQL用*表示查询表的所有字段,而EPL用*表示查询事件流的所有属性值。SQL查询某个字段名,直接在select后跟字段名就ok,EPL也是将要查询的属性名放在select之后。若查多个属性值,则用逗号分割。和SQL一样,EPL查询属性也可以设置别名。示例如下:
// EPL:查询完整的User对象
select * from User
// 获取User对象
User u = newEvent.getUnderlying();
// EPL:查询User的name和id,id别名为i
select name, id as i from User
// 获取name和id
String name = (String)newEvent.get("name");
int id = (Integer)newEvent.get("i");
这里要注意,如果查询的是一个完整对象,需要调用getUnderlying()方法,而get方法是针对确定的属性名或者别名。另外*是不能设置别名的。
2.insert和remove事件流
Esper对于事件流分输入和移出两种,分别对应监听器的两个参数newEvents和oldEvents,关于监听器的内容可参看《Esper学习之三:进程模型》。newEvents通常对应事件的计算结果,oldEvents可以理解过上一次计算结果。默认情况下,只有newEvents有值,oldEvents为null。如果需要查看oldEvents,则需要使用一个参数。例如:
select rstream * from User
如果使用了该参数,则会将上一次计算结果放入newEvents内,而不是oldEvents。并且无法获得当前的计算结果。
select irstream * from User
如果使用了该参数,则会将当前的计算结果放入newEvents内,上一次的计算结果放入oldEvents内。
3.Aggregation
和SQL一样,EPL也有Aggregation,即聚合函数。语法如下:
aggregate_function([all|distinct] expression)
aggregate_function就是聚合函数的名字,比如avg,sum等。expression通常是事件流的某个属性,也可以是不同事件流的多个属性,或者是属性和常量、函数之间的运算。举例如下。
// 查询最新5秒的Apple的平均价格
select avg(price) as aPrice from Apple.win:time(5 sec)
// 查询最新10个Apple的价格总和的两倍
select sum(price*2) as sPrice from Apple.win:length(10)
// 查询最新10个Apple的价格,并用函数计算后再算平均值
select avg(Compute.getResult(price)) from Apple.win:length(10)
以上就是聚合函数的使用方法,除此之外需要注意以下几点:
1.聚合函数能用于Select和Having,但是不能用于Where
2.sum,avg,media,stddev,avedev只能计算数值,至于media,stddev和avedev代表什么意思,请自行百度。
3.Esper会忽略expression为null不让参与聚合运算,但是count函数除外,即使是null也认为是一个事件。如果事件流集合中没有包含任何事件,或者包含的事件中用于聚合计算的expression都是null(比如收集5秒内进入的事件即为一个事件流集合),则所有聚合函数都返回null。
4.Insert into
4.1 简单用法
EPL的Insert into和SQL的有比较大的区别。SQL是往一张表里插入数据,而EPL是把一个事件流的计算结果放入另一个事件流,然后可以对这个事件流进行别的计算。所以Insert into的一个好处就是可以将是事件流的计算结果不断级联,对于那种需要将上一个业务的结果数据放到下一个业务处理的场景再适合不过了。除此之外,Insert into还有合并多个计算结果的作用。到这里相信大家已经对他越来越好奇了,不急,咱们先来看看语法:
insert [istream | irstream | rstream] into event_stream_name [ (property_name [, property_name] ) ]
event_stream_name定义了事件流的名称,在执行完insert的定义之后,我们可以使用select对这个事件流进行别的计算。
istream | irstream | rstream表示该事件流允许另一个事件的输入/输入数据和输出/输出数据能够进入(解释好像很绕。。一会儿看例子就能明白了)
property_name表示该事件流里包含的属性名称,多个属性名之间用逗号分割,并且用小括号括起来。
上面的说明可能不是很好理解,咱们先看个例子:
//将新进入的Asus事件传递到Computer,且Asus的id,size和Computer的cid,csize对应
insert into Computer(cid,csize) select id,size from Asus
// 第二种写法
insert into Computer select id as cid, size as csize Asus
从例子中可以看到,insert into需要配合select进行使用,以表明前一个事件流有哪些计算结果将进入insert into定义的事件流。并且在select中的字段要和insert里的事件流的属性要对应(这里指的对应是数据类型对应,而且属性数量也必须一样)。如果说insert定义的事件流名称在之前已经定义过(insert into中定义的除外),重名是不允许的。
我个人推荐第二种写法,通过as设置的别名即为insert定义的事件流的属性,这样可以避免属性的个数不一致的错误。
参考博文:
- 点赞
- 收藏
- 关注作者
评论(0)