如何使用elasticsearch实现千万级分组的精确聚合

举报
发表于 2020/06/30 16:18:57 2020/06/30
【摘要】 Elasticsearch是一个分布式的搜索引擎,数据放在一个个索引里面,索引由一个个分片组成,这里我们把分片叫做shard,每一个分片还可以设置多个副,副分片叫做replica。shard上的数据是整个索引的一部分,ES的查询为了返回正确的数据,需要分发到各个shard,每个shard返回数据,然后在汇总节点汇总返回结果。

首先如下逻辑结构图,index由三个shard组成。

   

正常场景下这种模式返回的数据都是精确的,比如我们经常使用的关键字查找,排序打分。因为求的是某一个文档的先后顺序,shard中文档在整个index中是独一份,不存在shard之间的歧义。但是在聚合(group by)场景下,这个模式会有一些问题,比如,我们要统计index中排名前三的国家及人口数量,按照es的分布式原理,会在每个shard中求出前3,然后再汇总节点根据每个shard的返回再汇总求top3,这个时候问题来了,如果数据是如下分布的

                                                

汇总节点拿到的数据是shard1/shard2/shard3返回的 A B C E 四个国家做最终汇总,但是很明显,国家D其实可以排入前三,其实ES在官网特意指出了这个问题。

https://www.elastic.co/guide/en/elasticsearch/reference/current/search-aggregations-bucket-terms-aggregation.html 

       

    大概意思就是讲如果group by最终的分组的个数(国家个数也叫term的个数)大于返回的top size,那么有可能出现应该返回的数据没有返回,类似上面的D国。

    如何解决呢?其实也有如下几个方法

         1、  每个shard返回更多的结果,每个节点返回top100,减少误差

         2、  将相同的term发送到同一个shard,比如A国都在shard1(route方法)

         3、  最暴力的方法,直接不分片,一个shard

    问题真的解决了吗?并没有,上面的方法缺陷都很明显

        方法1:到底需要返回多少冗余数据才会准确,并且返回冗余数据的计算代价(CPU 内存 磁盘IO等)也是昂贵的。

        方法2:如果term有成千上万甚至上亿,每一个index是TB级别呢,这个时候每个shard的会巨大,并且这个还需要业务感知,需要在入库的时候指定route

        方法3:单个shard容量是有上限的,大小一般建议值在30~50GB,并且单shard无法体会分布式的魅力,集群能力用不起来

HadoopMR过程

我们想象一下,如果能够将每个shard的数据(精确到每个term)发送到一个指定的地方计算,那么能够得到这个值得精确结果,其实在上文中的方法2也提到了,但是这个不经过处理的中间数据会巨大,很多场景不适用。我们这里回忆一下HadoopMR,用过hadoop的人肯定对Map-Reduce过程都不陌生,下图大概介绍了MR中如何操作一个Map-Reduce任务。

      

每个map任务产生了 1/2/3三份数据(这个三份数据可以有很多理解,比如表面上的三个国家,hash之后的三个取模值,甚至三个区间等等),这里有一个很特别的shuffle过程,shuffler过程会把每一个map的同一份数据送到一个reduce里面,对于这个reduce来说,对这一份数据进行统计就能够得到这一份数据的精确结果。其他的数据也类似,这样整个过程就能够得到精确值。

为什么ES不这样做?ES是一个商业公司,肯定很早就意识到这个问题了,为什么不做?更多的是为了性能考虑,现在很多场景ES已经把准实时搜索引擎写成实时搜索引擎了(大数据场景下,s级快速的返回99%准确性的数据比远比耗费大量时间在计算一个精确值上重要),除了这个之外,为了实时性,ES也舍弃了其他方面的东西,这里就不展开了。

当然我们下面介绍的方法跟这个还有点出入,不过核心思想是有一个调度中心能够讲相同的数据发送到一起并且能够满足可行性。

预聚合过程

我们并不把没有处理过的文档数据直接通过route发送到指定shard。我们会先使用预聚合来加速,什么是预聚合加速?

基本原理:用户在制作报表时,主要是对时序数据进行聚合,跨越不同粒度的时间生成报表时,大部分的数据是重复计算的。只要找到用户报表的最小时间粒度,根据其语句内容找到涉及的字段,自动进行聚合成粗粒度的数据,后续就仅仅需要在粗粒度上进行聚合即可

就是使用预先计算,缓存中间结果,达到加速的目的。我们看下面一张表举例,每一个时间内有源源不断地数据流入

   聚合后的中间数据形式,这里是三层聚合(时间/年龄段/教育程度),统计指标有最大薪水,平均薪水,数量。

聚合任务会把一个时间粒度内的某一个字段(这里是一分钟学历、年龄这两个字段)计算统计指标存入中间索引。同样,中间索引会根据term使用route将数据存入指定shard。具体我们可以看下图:

                                                                                                                  

这样做可以带来什么好处,假设一分钟原始数据有10w条,按照1分钟的粒度中间索引只需要(聚合的term粒度 * 统计指标)条数,大大减少了预聚合任务分发到中间索引的数据(这样使用route分发后大大降低膨胀的概率)。

我们曾经有过一些数据可以参考(100GB的原始数据,通过预聚合计算生成的中间索引大概在10MB

那么可能问题又来了,图中预聚合任务查询原始数据(过程1)不也面临数据不准确的问题吗?相对于直接查询大批量数据,因为预聚合任务的时间是有粒度的,任务会每个粒度周期执行一次,比如一分钟,这样分钟内的数据量相对可控,结果也相对准确。为了进一步提升预聚合任务的精确性,在预聚合任务时使用composite aggs翻页查找,得出精确值,关于composite aggs 可以参考:

htps://www.elastic.co/guide/en/elasticsearch/reference/current/search-aggregations-bucket-composite-aggregation.html

查询过程

  有了中间索引之后,对于已经建立好中间索引的查询,会优先使用中间索引加速,我们会自动识别,本应该查询源数据100GB的请求,直接会查询10MB中间索引,时延大幅降低。ES公司在他的商业插件X-pack中也提供了一个预聚合插件,叫做rollup功能,x-pack生成中间索引后,为了使用加速功能,需要修改原始的查询语句,这样用户感知并且无法兼容kibanasiren等可视化插件。

 

  巧的是,这个能力已经在华为云云搜索服务中实现了,云搜索服务是基于开源的elasticsearch的一款云服务,我们做了很多增强,官网链接华为云CSS服务

 

  关于云搜索服务的其他功能介绍可以参考:https://support.huaweicloud.com/usermanual-css/css_01_0007.html        

  更多关于云搜索服务的增强版功能使用,欢迎提工单进行咨询。

 

 


【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。