elasticsearch 实现亿级数据查询 (图文加Java代码教程)
前言
这是我在这个网站整理的笔记,有错误的地方请指出,关注我,接下来还会持续更新。
作者:神的孩子都在歌唱
一. 问题及解决方案
如果一个索引存储超过亿级数据,es分页排序查询的时候会很慢,往往需要几十秒,并且如果你elasticsearch中的from size进行分页查询,它是不支持深分页的,只能查询到1000条。那么我们如何能够进行深分页 并且 做到毫秒级查询呢?
问题一: 如何深分页
解决: es中推荐使用search_after进行滚动分页,它不受条数的限制,页数很大也不会影响性能,唯一的缺点就是不能跨页查询,只能一页页查询
问题二: 如何做到毫秒级查询
解决: 我们可以将一个索引分成多个索引(可以通过定义策略,索引模版和别名去管理这些索引亿级数据变成了查询千万数据,这样子就能够很快查询出来。如下图
问题三: 有人就会问了,如果要查询全部的索引呢,全部索引加起来的数据也有亿级了,这不就又要一次性查询亿级数据了么?
其实道理都是一样的,你可以先查询第一个,第一个查询完后在查询第二个,这样子就能解决要一次性查询亿级数据的问题,看下图
一般项目中的查询都是会加上时间范围,所以我们也没必要全部索引都查一遍,可以先根据时间范围和索引的创建时间过滤出来要查询那些索引,这样子就可以根据指定的时间范围查询对应的索引,在对索引排序,在根据时间范围从单个索引里面检索出对应的数据。
二. java代码实现
温馨提示:代码写的有点丑陋,以下只是提供思路,代码并不能直接运行
获取要查询的索引
/**
* 获取要查询索引
*/
private List<String> getIndexList(EsQueryDTO dto) throws IOException {
// 获取索引名
List<String> indexNameList = new ArrayList<>();
// 如果是第一次查询就获取第一个索引返回,并写入到缓存中
if (ObjectUtil.isNotNull(dto.getIsFirst()) && dto.getIsFirst()) {
List<String> stringList = new ArrayList<>();
// 查询别名下的所有索引名称
Iterator<Map.Entry<String, IndexState>> iterator = elasticsearchClient.indices().getSettings(g -> g.index(LamsConfig.getIndexName() + "*")).result().entrySet().iterator();
long beginTime = DateTimeUtils.toDate(dto.getBeginTime(), DateTimeUtils.y4M2d2H2m2s2).getTime();
long endTime = DateTimeUtils.toDate(dto.getEndTime(), DateTimeUtils.y4M2d2H2m2s2).getTime();
// 获取当前写入的索引
String writeIndex = null;
while (iterator.hasNext()) {
Map.Entry<String, IndexState> next = iterator.next();
IndexSettings settings = next.getValue().settings();
if (settings != null && settings.index() != null) {
if (settings.index().lifecycle() != null && ObjectUtil.isNull(settings.index().lifecycle().indexingComplete())) {
writeIndex = next.getKey();
}
// 获取创建时间
Long aLong = settings.index().creationDate();
// 比较是否在这个时间范围内,在的话就写入到列表中
if (aLong != null && beginTime <= aLong && endTime >= aLong) {
stringList.add(next.getKey());
}
}
}
if (stringList.isEmpty()) {
stringList.add(writeIndex);
}
// 通过索引名降序排序
stringList.sort(Collections.reverseOrder());
// 将查询出来的索引列表 和当前查询的索引 写入缓存
RedisUtils.setCacheListToJson(ElasticSearchConstant.SEARCH_INDEX + SecurityUtils.getUserId(), stringList);
RedisUtils.setCacheObject(ElasticSearchConstant.CURRENT_SEARCH_INDEX + SecurityUtils.getUserId(), stringList.get(0));
indexNameList.add(stringList.get(0));
return indexNameList;
}
List<String> cacheList = RedisUtils.getCacheJsonToList(ElasticSearchConstant.SEARCH_INDEX + SecurityUtils.getUserId(), String.class);
String currentIndex = RedisUtils.getCacheObject(ElasticSearchConstant.CURRENT_SEARCH_INDEX + SecurityUtils.getUserId());
// 要查询当前索引就直接返回
if (dto.getCurrentIndex()) {
indexNameList.add(currentIndex);
return indexNameList;
}
if (cacheList != null && !cacheList.isEmpty()) {
// 否则获取下一个索引
int indexNum = cacheList.indexOf(currentIndex);
// 如果是向上排序就获取上一个索引
if ("0".equals(dto.getPageUpOrDown())) {
if (indexNum > 0) {
indexNameList.add(cacheList.get(indexNum - 1));
RedisUtils.setCacheObject(ElasticSearchConstant.CURRENT_SEARCH_INDEX + SecurityUtils.getUserId(), cacheList.get(indexNum - 1));
}
} else {
// 否则就获取下一个索引
if (indexNum != -1 && indexNum < cacheList.size() - 1) {
indexNameList.add(cacheList.get(indexNum + 1));
// 更新当前查询的索引
RedisUtils.setCacheObject(ElasticSearchConstant.CURRENT_SEARCH_INDEX + SecurityUtils.getUserId(), cacheList.get(indexNum + 1));
}
}
return indexNameList;
}
return indexNameList;
}
这里只会返回一个当前查询的索引
然后分页查询出来,并且后面判断如果查询的文档数不满足就查询下一个索引
/**
* 分页查询构造条件查询返回结果
*/
private <T> List<Hit<T>> searchData(EsQueryDTO dto, List<String> indexNameList, List<Hit<T>> result, Class<T> tDocumentClass) throws IOException {
try {
String time = "5m";
// 根据索引列表创建一个pit,这里是使用了search_after去查询的
String pit = pitService.createPit(new CreatePointRequest().setIndexList(indexNameList).setTime(time).setUserId(ElasticSearchConstant.SEARCH_PIT + SecurityUtils.getUserId()));
// 降序排列查询
SearchRequest.Builder timestamp = new SearchRequest.Builder().size(dto.getPageSize()).pit(p -> p.id(pit).keepAlive(k -> k.time(time)));
timestamp.sort(sort -> sort.field(f -> f.field("timestamp").order(SortOrder.Desc)));
// 如果查询下一页就添加上search_after游标
if (ObjectUtil.isNotNull(dto.getSearchAfter())) {
List<FieldValue> fieldValues = new ArrayList<>();
List<Long> sorts = FastJsonUtils.toList(dto.getSearchAfter(), Long.class);
for (Long sort : sorts) {
FieldValue build = new FieldValue.Builder().longValue(sort).build();
fieldValues.add(build);
}
timestamp.searchAfter(fieldValues);
}
SearchResponse<T> search = elasticsearchClient.search(timestamp.build(), tDocumentClass);
if (search.hits().total() != null) {
result.addAll(search.hits().hits());
int value = search.hits().hits().size();
// 重点这里,如果查询的数字小于给定的pageSize,就循环递归去查询,直到满足为止
if (value < dto.getPageSize()) {
dto.setCurrentIndex(false);
dto.setIsFirst(false);
dto.setPageSize(dto.getPageSize() - value);
List<String> indexList = this.getIndexList(dto);
if (indexList.isEmpty()) {
return result;
}
// 重新创建pit
pitService.createPit(new CreatePointRequest().setIndexList(indexList).setTime(time).setUserId(ElasticSearchConstant.SEARCH_PIT + SecurityUtils.getUserId()));
this.searchData(dto, indexList, result, tDocumentClass);
}
}
return result;
} catch (ElasticsearchException e) {
}
}
以上就是具体的实现思路,更多的细节需要自己补充
作者:神的孩子都在歌唱
本人博客:https://blog.csdn.net/weixin_46654114
- 点赞
- 收藏
- 关注作者
评论(0)