elasticsearch 实现亿级数据查询 (图文加Java代码教程)

举报
神的孩子在歌唱 发表于 2024/04/21 18:54:00 2024/04/21
【摘要】 elasticsearch 实现亿级数据查询 (图文加Java代码教程)前言这是我在这个网站整理的笔记,有错误的地方请指出,关注我,接下来还会持续更新。作者:神的孩子都在歌唱一. 问题及解决方案如果一个索引存储超过亿级数据,es分页排序查询的时候会很慢,往往需要几十秒,并且如果你elasticsearch中的from size进行分页查询,它是不支持深分页的,只能查询到1000条。那么我们如...

elasticsearch 实现亿级数据查询 (图文加Java代码教程)

前言

这是我在这个网站整理的笔记,有错误的地方请指出,关注我,接下来还会持续更新。

作者:神的孩子都在歌唱


一. 问题及解决方案

如果一个索引存储超过亿级数据,es分页排序查询的时候会很慢,往往需要几十秒,并且如果你elasticsearch中的from size进行分页查询,它是不支持深分页的,只能查询到1000条。那么我们如何能够进行深分页 并且 做到毫秒级查询呢?


问题一: 如何深分页

解决: es中推荐使用search_after进行滚动分页,它不受条数的限制,页数很大也不会影响性能,唯一的缺点就是不能跨页查询,只能一页页查询


问题二: 如何做到毫秒级查询

解决: 我们可以将一个索引分成多个索引(可以通过定义策略,索引模版和别名去管理这些索引),比如我的策略是每个索引只能存储一千万条数据,到达后就分索引。每次查询的时候只查询一个,这样子我们就从查询亿级数据变成了查询千万数据,这样子就能够很快查询出来。如下图


image-20240408171031179


问题三: 有人就会问了,如果要查询全部的索引呢,全部索引加起来的数据也有亿级了,这不就又要一次性查询亿级数据了么?


其实道理都是一样的,你可以先查询第一个,第一个查询完后在查询第二个,这样子就能解决要一次性查询亿级数据的问题,看下图

image-20240408171146881



一般项目中的查询都是会加上时间范围,所以我们也没必要全部索引都查一遍,可以先根据时间范围索引的创建时间过滤出来要查询那些索引,这样子就可以根据指定的时间范围查询对应的索引,在对索引排序,在根据时间范围单个索引里面检索出对应的数据。



二. java代码实现

温馨提示:代码写的有点丑陋,以下只是提供思路,代码并不能直接运行

获取要查询的索引

    /**
     * 获取要查询索引
     */
    private List<String> getIndexList(EsQueryDTO dto) throws IOException {
        // 获取索引名
        List<String> indexNameList = new ArrayList<>();
        // 如果是第一次查询就获取第一个索引返回,并写入到缓存中
        if (ObjectUtil.isNotNull(dto.getIsFirst()) && dto.getIsFirst()) {
            List<String> stringList = new ArrayList<>();
            // 查询别名下的所有索引名称
            Iterator<Map.Entry<String, IndexState>> iterator = elasticsearchClient.indices().getSettings(g -> g.index(LamsConfig.getIndexName() + "*")).result().entrySet().iterator();
            long beginTime = DateTimeUtils.toDate(dto.getBeginTime(), DateTimeUtils.y4M2d2H2m2s2).getTime();
            long endTime = DateTimeUtils.toDate(dto.getEndTime(), DateTimeUtils.y4M2d2H2m2s2).getTime();
            // 获取当前写入的索引
            String writeIndex = null;
            while (iterator.hasNext()) {
                Map.Entry<String, IndexState> next = iterator.next();
                IndexSettings settings = next.getValue().settings();
                if (settings != null && settings.index() != null) {
                    if (settings.index().lifecycle() != null && ObjectUtil.isNull(settings.index().lifecycle().indexingComplete())) {
                        writeIndex = next.getKey();
                    }
                    // 获取创建时间
                    Long aLong = settings.index().creationDate();
                    // 比较是否在这个时间范围内,在的话就写入到列表中
                    if (aLong != null && beginTime <= aLong && endTime >= aLong) {
                        stringList.add(next.getKey());
                    }
​
​
                }
            }
            if (stringList.isEmpty()) {
                stringList.add(writeIndex);
            }
            // 通过索引名降序排序
            stringList.sort(Collections.reverseOrder());
            // 将查询出来的索引列表 和当前查询的索引 写入缓存
            RedisUtils.setCacheListToJson(ElasticSearchConstant.SEARCH_INDEX + SecurityUtils.getUserId(), stringList);
            RedisUtils.setCacheObject(ElasticSearchConstant.CURRENT_SEARCH_INDEX + SecurityUtils.getUserId(), stringList.get(0));
            indexNameList.add(stringList.get(0));
            return indexNameList;
        }
​
        List<String> cacheList = RedisUtils.getCacheJsonToList(ElasticSearchConstant.SEARCH_INDEX + SecurityUtils.getUserId(), String.class);
        String currentIndex = RedisUtils.getCacheObject(ElasticSearchConstant.CURRENT_SEARCH_INDEX + SecurityUtils.getUserId());
        // 要查询当前索引就直接返回
        if (dto.getCurrentIndex()) {
            indexNameList.add(currentIndex);
            return indexNameList;
        }
        if (cacheList != null && !cacheList.isEmpty()) {
            // 否则获取下一个索引
            int indexNum = cacheList.indexOf(currentIndex);
            // 如果是向上排序就获取上一个索引
            if ("0".equals(dto.getPageUpOrDown())) {
                if (indexNum > 0) {
                    indexNameList.add(cacheList.get(indexNum - 1));
                    RedisUtils.setCacheObject(ElasticSearchConstant.CURRENT_SEARCH_INDEX + SecurityUtils.getUserId(), cacheList.get(indexNum - 1));
​
                }
            } else {
                // 否则就获取下一个索引
                if (indexNum != -1 && indexNum < cacheList.size() - 1) {
                    indexNameList.add(cacheList.get(indexNum + 1));
                    // 更新当前查询的索引
                    RedisUtils.setCacheObject(ElasticSearchConstant.CURRENT_SEARCH_INDEX + SecurityUtils.getUserId(), cacheList.get(indexNum + 1));
                }
            }
            return indexNameList;
        }
​
​
        return indexNameList;
    }


这里只会返回一个当前查询的索引


然后分页查询出来,并且后面判断如果查询的文档数不满足就查询下一个索引

 /**
     * 分页查询构造条件查询返回结果
     */
    private <T> List<Hit<T>> searchData(EsQueryDTO dto, List<String> indexNameList, List<Hit<T>> result, Class<T> tDocumentClass) throws IOException {
        try {
​
            String time =  "5m";
            // 根据索引列表创建一个pit,这里是使用了search_after去查询的
            String pit =  pitService.createPit(new CreatePointRequest().setIndexList(indexNameList).setTime(time).setUserId(ElasticSearchConstant.SEARCH_PIT + SecurityUtils.getUserId()));
​
​
            // 降序排列查询
            SearchRequest.Builder timestamp = new SearchRequest.Builder().size(dto.getPageSize()).pit(p -> p.id(pit).keepAlive(k -> k.time(time)));
            timestamp.sort(sort -> sort.field(f -> f.field("timestamp").order(SortOrder.Desc)));
            //  如果查询下一页就添加上search_after游标
            if (ObjectUtil.isNotNull(dto.getSearchAfter())) {
                List<FieldValue> fieldValues = new ArrayList<>();
                List<Long> sorts = FastJsonUtils.toList(dto.getSearchAfter(), Long.class);
                for (Long sort : sorts) {
                    FieldValue build = new FieldValue.Builder().longValue(sort).build();
                    fieldValues.add(build);
                }
                timestamp.searchAfter(fieldValues);
            }
            SearchResponse<T> search = elasticsearchClient.search(timestamp.build(), tDocumentClass);
​
            if (search.hits().total() != null) {
                result.addAll(search.hits().hits());
                int value = search.hits().hits().size();
                // 重点这里,如果查询的数字小于给定的pageSize,就循环递归去查询,直到满足为止
                if (value < dto.getPageSize()) {
                    dto.setCurrentIndex(false);
                    dto.setIsFirst(false);
                    dto.setPageSize(dto.getPageSize() - value);
                    List<String> indexList = this.getIndexList(dto);
                    if (indexList.isEmpty()) {
                        return result;
                    }
                    // 重新创建pit
                    pitService.createPit(new CreatePointRequest().setIndexList(indexList).setTime(time).setUserId(ElasticSearchConstant.SEARCH_PIT + SecurityUtils.getUserId()));
​
                    this.searchData(dto, indexList, result, tDocumentClass);
                }
            }
            return result;
​
        } catch (ElasticsearchException e) {
        }
    }


以上就是具体的实现思路,更多的细节需要自己补充



作者:神的孩子都在歌唱

本人博客:https://blog.csdn.net/weixin_46654114

转载说明:务必注明来源,附带本人博客连接。

【版权声明】本文为华为云社区用户原创内容,未经允许不得转载,如需转载请自行联系原作者进行授权。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。