ES写入内核流程

举报
xxlla 发表于 2020/11/07 14:41:25 2020/11/07
【摘要】 1 概述 在ES中,写入单个文档的请求称为Index请求,批量写入的请求称为Bulk请求。它们都使用相同的处理逻辑,被统一封装为BulkRequest。以下从源码角度分析ES的bulk请求写入流程。2 源码分析 2.1 Rest层请求转化为Transport层请求2.1.1 RestController.java1.ES会提前将处理各种http请求(...


1     概述

  ES中,写入单个文档的请求称为Index请求,批量写入的请求称为Bulk请求。它们都使用相同的处理逻辑,被统一封装为BulkRequest。以下从源码角度分析ESbulk请求写入流程。

2     源码分析

2.1      Rest层请求转化为Transport层请求

2.1.1        RestController.java

1.ES会提前将处理各种http请求GETPUTPOSTDELETE等)handler注册到一个handler列表中,RestController# registerHandler

image.png

2.http请求发送到ES后,RestController中进行实际的分发过程。首先根据http请求找到对应的handler,再调用handlerhandleRequest方法处理请求:

image.png

image.png

3handlerRequest调用prepareRequest

image.png

2.1.1        RestBulkAction.java

1.对于bulk操作,其请求对应的handlerRestBulkAction该类会在其构造函数中将其注册到RestController,代码如下:

image.png

2. RestBulkAction会将RestRequest解析并转化为BulkRequest,然后再对BulkRequest做处理,这块的逻辑在prepareRequest方法中,代码如下:

image.png

3.上图最后一行是NodeClientbulk请求的处理

2.1.1        NodeClient.java

NodeClient在处理BulkRequest请求时,会将请求的action转化为对应Transport层的action,然后再由Transport层的action来处理BulkRequestaction转化的代码如下:

image.png

image.png

TransportAction会调用一个请求过滤链来处理请求:

image.png

对于Bulk请求,TransportAction的具体实现类为TransportBulkAction,其doExecute方法继续执行写入逻辑。至此转化完成。

2.2      协调节点处理并转发请求

2.2.1        TransportBulkAction.java

TransportBulkAction#doExecute先判断bulk请求中的索引是否存在,不存在则调用自动创建流程:

image.png

image.png

可见,逻辑为先遍历bulk中的索引,如果开启了自动创建索引则放到autoCreateIndices集合中,最后通过createIndex方法创建。

 

创建完index后,index的各shard已在数据节点上创建完成,协调节点将转发写入请求到文档对应的primary shard。协调节点转发的入口为TransportBulkAction #executeBulk方法:

image.png

执行逻辑在BulkOperationdoRun方法中:

1)首先遍历bulkRequest请求,然后根据请求的操作类型执行相应逻辑。对于index请求,会先根据IndexMetaData信息为每条写入请求生成路由信息,如果用户没有指定doc id,则会在process方法中生成:

    image.png

    

2)然后根据每个IndexRequest请求的路由信息(默认为doc id)得到所要写入的目标shard id,再将DocWriteRequest封装为BulkItemRequest并添加到请求列表:

    image.png

    

3)然后将请求按shard分组封装为BulkShardRequest并交由TransportShardBulkAction来处理:

    image.png

    

4)执行逻辑最终进入TransportReplicationAction#doRun方法

    

2.2.2        TransportReplicationAction.java

TransportReplicationAction#doRun会通过ClusterState获取到primary shard的路由信息,然后得到primary shard所在的node,如果node为当前协调节点则直接将请求发往本地,否则发往远端:

    image.png

    

2.3      主分片再副本分片节点执行写入

2.3.1        ReplicationOperation.java

ReplicationOperation#execute方法执行主分片节点写入:

    image.png

    

primary.perform执行主分片写入,主分片写入完成调用handlerPrimaryResult方法,发送写副本分片的请求:

    image.png

2.3.2        TransportShardBulkAction.java

着重看写主分片的逻辑,在TransportShardBulkAction#shardOperationOnPrimary方法中:

    image.png

    

再调用InternalEngine.Index将数据写入lucene,再写入translog:

    image.png

lucenetranslog的整体流程如下:

    image.png

    

(1)数据写入buffer缓冲和translog日志文件

(2)每隔一秒钟,buffer中的数据被写入新的segment file,并进入os cache,此时segment被打开并供search使用

(3)buffer被清空

(4)重复1~3,新的segment不断添加,buffer不断被清空,而translog中的数据不断累加

(5)translog长度达到一定程度的时候,commit操作发生:

  5-1buffer中的所有数据写入一个新的segment,并写入os cache,打开供使用

  5-2buffer被清空

  5-3)一个commit ponit被写入磁盘,标明了所有的index segment

  5-4filesystem cache中的所有index segment file缓存数据,被fsync强行刷到磁盘上

  5-5)现有的translog被清空,创建一个新的translog

以上写lucenetranslog对应ES中几个关键概念:

(1)fresh: 内存缓冲区被清空写到段中,段被打开可进行搜索

(2)commit point: 记录当前所有可用的segement

(3)flush: 内存缓冲区被清空写到段中,一个提交点被写入硬盘,文件系统缓存通过 fsync 被刷新,老的 translog 被删除

(4)fsync: /_flush/sync命令,逻辑是flush translog并且将sync_id同步到各个分片,可以实现快速恢复


【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。