spring boot项目集成Elasticsearch
【摘要】 一、添加elasticsearch 依赖
<dependency> <groupId>org.elasticsearch</groupId> <artifactId>elasticsearch</artifactId> </dependency> <dependency> <groupId>org.elasticsearch.client</groupId> <arti...
一、添加elasticsearch 依赖
-
-
<dependency>
-
<groupId>org.elasticsearchgroupId>
-
<artifactId>elasticsearchartifactId>
-
dependency>
-
<dependency>
-
<groupId>org.elasticsearch.clientgroupId>
-
<artifactId>elasticsearch-rest-clientartifactId>
-
dependency>
-
<dependency>
-
<groupId>org.elasticsearch.clientgroupId>
-
<artifactId>elasticsearch-rest-high-level-clientartifactId>
-
dependency>
二、在application.yml配置elasticsearch 连接信息
-
elasticsearch:
-
clusterNodes:
-
- "10.2.140.31:30869"
三、编写RestHighLevelClient 配置类
-
package com.hikvision.smbg.content.core.config;
-
-
import lombok.Data;
-
import lombok.extern.slf4j.Slf4j;
-
import org.apache.http.HttpHost;
-
import org.elasticsearch.client.RestClient;
-
import org.elasticsearch.client.RestClientBuilder;
-
import org.elasticsearch.client.RestHighLevelClient;
-
import org.springframework.beans.factory.DisposableBean;
-
import org.springframework.beans.factory.FactoryBean;
-
import org.springframework.boot.context.properties.ConfigurationProperties;
-
import org.springframework.boot.context.properties.EnableConfigurationProperties;
-
import org.springframework.context.annotation.Bean;
-
import org.springframework.context.annotation.Configuration;
-
import org.springframework.util.CollectionUtils;
-
-
import java.util.ArrayList;
-
import java.util.List;
-
-
/**
-
* @author yiyuxin
-
* @since 2020-05-13
-
*/
-
@Slf4j
-
@Configuration
-
@EnableConfigurationProperties(ElasticsearchConfig.ElasticsearchProperties.class)
-
public class ElasticsearchConfig {
-
-
private final ElasticsearchProperties elasticsearchProperties;
-
-
public ElasticsearchConfig(ElasticsearchProperties elasticsearchProperties) {
-
this.elasticsearchProperties = elasticsearchProperties;
-
}
-
-
@Bean("highLevelClient")
-
public RestHighLevelClientFactoryBean highLevelClient() {
-
return new RestHighLevelClientFactoryBean(elasticsearchProperties);
-
}
-
-
/**
-
* 实现FactoryBean,自定义获取RestHighLevelClient的bean实例,
-
* 默认单例(FactoryBean接口的isSingleton方法默认返回true)
-
* 实现Disposable接口,bean实例销毁的时候关闭RestHighLevelClient连接
-
*/
-
public static class RestHighLevelClientFactoryBean implements FactoryBean<RestHighLevelClient>, DisposableBean {
-
-
private RestHighLevelClient highLevelClient;
-
-
private ElasticsearchProperties properties;
-
-
public RestHighLevelClientFactoryBean(ElasticsearchProperties elasticsearchProperties) {
-
this.properties = elasticsearchProperties;
-
}
-
-
@Override
-
public RestHighLevelClient getObject() {
-
List clusterNodes = properties.getClusterNodes();
-
-
List httpHosts = new ArrayList<>(4);
-
if (!CollectionUtils.isEmpty(clusterNodes)) {
-
for (String clusterNode : clusterNodes) {
-
//以:分隔获取到ip 和 端口
-
String[] hostPort = clusterNode.split(":");
-
httpHosts.add(new HttpHost(hostPort[0], Integer.parseInt(hostPort[1])));
-
}
-
} else {
-
//如果没有配置ip和端口,默认为localhost:9200
-
httpHosts.add(new HttpHost("localhost", 9200));
-
}
-
-
RestClientBuilder clientBuilder = RestClient.builder(httpHosts.toArray(new HttpHost[0])).setRequestConfigCallback(requestConfigBuilder -> {
-
requestConfigBuilder.setConnectionRequestTimeout(10000); // 从连接池获取连接超时时间
-
requestConfigBuilder.setConnectTimeout(5000); // 连接服务端超时时间
-
requestConfigBuilder.setSocketTimeout(5000 * 60); // 读取超时
-
return requestConfigBuilder;
-
});
-
clientBuilder.setHttpClientConfigCallback(httpClientBuilder -> {
-
httpClientBuilder.setMaxConnTotal(100); //最大连接数
-
httpClientBuilder.setKeepAliveStrategy((response, context) -> 10 * 60 * 1000); //保活时间
-
httpClientBuilder.setMaxConnPerRoute(100); //针对一个域名同时间正在使用的最多的连接数
-
return httpClientBuilder;
-
});
-
highLevelClient = new RestHighLevelClient(clientBuilder);
-
-
return highLevelClient;
-
}
-
-
@Override
-
public Class getObjectType() {
-
return RestHighLevelClient.class;
-
}
-
-
@Override
-
public void destroy() {
-
try {
-
log.info("Closing elasticSearch client");
-
if (highLevelClient != null) {
-
highLevelClient.close();
-
}
-
} catch (final Exception e) {
-
log.error("Error closing ElasticSearch client: ", e);
-
}
-
}
-
}
-
-
@Data
-
@ConfigurationProperties(prefix = "elasticsearch")
-
public static class ElasticsearchProperties {
-
private List clusterNodes;
-
}
-
}
四、使用
-
package com.hikvision.smbg.content.core.rpc.impl;
-
-
import com.hikvision.smbg.content.common.dto.document.ContentDocument;
-
import com.hikvision.smbg.content.common.dto.document.Document;
-
import com.hikvision.smbg.content.common.exception.ContentCenterException;
-
import com.hikvision.smbg.content.common.exception.EnumErrorCode;
-
import com.hikvision.smbg.content.common.util.JsonUtils;
-
import com.hikvision.smbg.content.core.rpc.EsSyncService;
-
import lombok.extern.slf4j.Slf4j;
-
import org.apache.commons.collections4.CollectionUtils;
-
import org.elasticsearch.action.bulk.BulkItemResponse;
-
import org.elasticsearch.action.bulk.BulkRequest;
-
import org.elasticsearch.action.bulk.BulkResponse;
-
import org.elasticsearch.action.index.IndexRequest;
-
import org.elasticsearch.action.index.IndexResponse;
-
import org.elasticsearch.action.support.WriteRequest;
-
import org.elasticsearch.action.update.UpdateRequest;
-
import org.elasticsearch.client.RequestOptions;
-
import org.elasticsearch.client.RestHighLevelClient;
-
import org.elasticsearch.common.xcontent.XContentType;
-
import org.elasticsearch.index.query.QueryBuilder;
-
import org.elasticsearch.index.query.QueryBuilders;
-
import org.elasticsearch.index.reindex.BulkByScrollResponse;
-
import org.elasticsearch.index.reindex.UpdateByQueryRequest;
-
import org.elasticsearch.rest.RestStatus;
-
import org.elasticsearch.script.Script;
-
import org.elasticsearch.script.ScriptType;
-
import org.springframework.beans.factory.annotation.Autowired;
-
import org.springframework.boot.autoconfigure.elasticsearch.rest.RestClientAutoConfiguration;
-
import org.springframework.stereotype.Service;
-
-
import java.io.IOException;
-
import java.util.HashMap;
-
import java.util.List;
-
import java.util.Map;
-
import java.util.Objects;
-
-
/**
-
* {@link RestClientAutoConfiguration}
-
*
-
* @author yiyuxin
-
* @since 2020-10-30
-
*/
-
@Slf4j
-
@Service
-
public class EsSyncServiceImpl implements EsSyncService {
-
/**
-
* 内容索引
-
*/
-
private static String index = ContentDocument.class.getAnnotation(Document.class).index();
-
/**
-
* 客户端
-
*/
-
@Autowired
-
private RestHighLevelClient highLevelClient;
-
-
@Override
-
public void index(ContentDocument document) {
-
// 构建索引请求
-
String id = String.valueOf(document.getId());
-
String source = Objects.requireNonNull(JsonUtils.toJson(document));
-
IndexRequest request = new IndexRequest(index).id(id).source(source, XContentType.JSON);
-
-
// 索引内容文档
-
IndexResponse response;
-
try {
-
response = highLevelClient.index(request, RequestOptions.DEFAULT);
-
} catch (Exception e) {
-
log.error("内容索引失败,document={}", source);
-
throw new ContentCenterException(EnumErrorCode.C_DEFAULT_ERR, e);
-
}
-
-
// 请求结果解析
-
RestStatus status = response.status();
-
if (!RestStatus.OK.equals(status) && !RestStatus.CREATED.equals(status)) {
-
log.error("内容索引失败,document={}, status={}", source, status);
-
throw new ContentCenterException(EnumErrorCode.C_DEFAULT_ERR);
-
}
-
if (log.isDebugEnabled()) {
-
log.debug("内容索引成功,document={}", source);
-
}
-
}
-
-
@Override
-
public void enableOrDisable(List contentIds, Integer contentStatus) {
-
UpdateByQueryRequest request = new UpdateByQueryRequest(index);
-
String[] ids = contentIds.stream().map(String::valueOf).toArray(String[]::new);
-
QueryBuilder query = QueryBuilders.idsQuery().addIds(ids);
-
request.setQuery(query);
-
-
Map<String, Object> params = new HashMap<>(contentIds.size());
-
for (String id : ids) {
-
params.put(id, contentStatus);
-
}
-
String code = "if(ctx._source.content_status==params[ctx._id]){ctx.op='noop'}"
-
+ "else{ctx._source.content_status=params[ctx._id]}";
-
Script script = new Script(ScriptType.INLINE, Script.DEFAULT_SCRIPT_LANG, code, params);
-
-
request.setScript(script);
-
request.setRefresh(true);
-
-
BulkByScrollResponse response;
-
try {
-
response = highLevelClient.updateByQuery(request, RequestOptions.DEFAULT);
-
} catch (IOException e) {
-
log.error("内容启用或停用失败,contentIds={},contentStatus={}", contentIds, contentStatus);
-
throw new ContentCenterException(EnumErrorCode.C_DEFAULT_ERR, e);
-
}
-
-
List failures = response.getBulkFailures();
-
if (CollectionUtils.isNotEmpty(failures)) {
-
log.error("内容启用或停用失败,contentIds={},contentStatus={}, failures={}", contentIds, contentStatus, failures);
-
throw new ContentCenterException(EnumErrorCode.C_DEFAULT_ERR);
-
}
-
-
if (log.isDebugEnabled()) {
-
log.debug("内容启用或停用成功,contentIds={},contentStatus={}", contentIds, contentStatus);
-
}
-
}
-
-
@Override
-
public void syncStatistics(List documents) {
-
// 构建批量请求
-
BulkRequest bulkRequest = new BulkRequest(index);
-
bulkRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
-
for (ContentDocument document : documents) {
-
UpdateRequest request = new UpdateRequest();
-
request.id(String.valueOf(document.getId()));
-
request.doc(JsonUtils.toJson(document), XContentType.JSON);
-
-
bulkRequest.add(request);
-
}
-
-
// 批量请求响应处理
-
BulkResponse bulkResponse;
-
try {
-
bulkResponse = highLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT);
-
} catch (IOException e) {
-
log.error("内容统计数据同步失败,documents={}", JsonUtils.toJson(documents));
-
throw new ContentCenterException(EnumErrorCode.C_DEFAULT_ERR, e);
-
}
-
if (bulkResponse.hasFailures()) {
-
log.error("内容统计数据同步失败,documents={}, failures={}", JsonUtils.toJson(documents), bulkResponse.buildFailureMessage());
-
throw new ContentCenterException(EnumErrorCode.C_DEFAULT_ERR);
-
}
-
-
if (log.isDebugEnabled()) {
-
log.debug("内容统计数据同步成功,documents={}", JsonUtils.toJson(documents));
-
}
-
}
-
}
-
package com.hikvision.smbg.content.core.rpc.impl;
-
-
import com.baomidou.mybatisplus.core.metadata.IPage;
-
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
-
import com.hikvision.smbg.content.common.dto.document.ContentDocument;
-
import com.hikvision.smbg.content.common.dto.document.Document;
-
import com.hikvision.smbg.content.common.dto.request.ContentSearchRequest;
-
import com.hikvision.smbg.content.common.dto.request.ContentSearchSuggestRequest;
-
import com.hikvision.smbg.content.common.dto.response.ContentSearchResponse;
-
import com.hikvision.smbg.content.common.dto.response.ContentSearchSuggestResponse;
-
import com.hikvision.smbg.content.common.enums.ContentFromEnum;
-
import com.hikvision.smbg.content.common.enums.ContentStatusEnum;
-
import com.hikvision.smbg.content.common.exception.ContentCenterException;
-
import com.hikvision.smbg.content.common.exception.EnumErrorCode;
-
import com.hikvision.smbg.content.common.util.AppPermissionBitComputeUtils;
-
import com.hikvision.smbg.content.common.util.JsonUtils;
-
import com.hikvision.smbg.content.core.rpc.EsSearchService;
-
import lombok.extern.slf4j.Slf4j;
-
import org.apache.commons.lang3.StringUtils;
-
import org.elasticsearch.action.search.SearchRequest;
-
import org.elasticsearch.action.search.SearchResponse;
-
import org.elasticsearch.client.RequestOptions;
-
import org.elasticsearch.client.RestHighLevelClient;
-
import org.elasticsearch.index.query.BoolQueryBuilder;
-
import org.elasticsearch.index.query.QueryBuilders;
-
import org.elasticsearch.rest.RestStatus;
-
import org.elasticsearch.search.SearchHit;
-
import org.elasticsearch.search.SearchHits;
-
import org.elasticsearch.search.aggregations.AggregationBuilders;
-
import org.elasticsearch.search.aggregations.Aggregations;
-
import org.elasticsearch.search.aggregations.BucketOrder;
-
import org.elasticsearch.search.aggregations.bucket.terms.ParsedStringTerms;
-
import org.elasticsearch.search.aggregations.bucket.terms.Terms;
-
import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
-
import org.elasticsearch.search.builder.SearchSourceBuilder;
-
import org.elasticsearch.search.sort.SortOrder;
-
import org.springframework.beans.BeanUtils;
-
import org.springframework.beans.factory.annotation.Autowired;
-
import org.springframework.stereotype.Service;
-
-
import java.util.ArrayList;
-
import java.util.List;
-
import java.util.Objects;
-
import java.util.Set;
-
-
/**
-
* @author yiyuxin
-
* @since 2020-10-30
-
*/
-
@Slf4j
-
@Service
-
public class EsSearchServiceImpl implements EsSearchService {
-
/**
-
* 内容索引
-
*/
-
private static String index = ContentDocument.class.getAnnotation(Document.class).index();
-
/**
-
* 客户端
-
*/
-
@Autowired
-
private RestHighLevelClient highLevelClient;
-
-
@Override
-
public IPage search(ContentSearchRequest request) {
-
IPage page = new Page<>();
-
-
// 1. 查询请求
-
SearchRequest searchRequest = new SearchRequest(index);
-
SearchSourceBuilder source = new SearchSourceBuilder();
-
searchRequest.source(source);
-
source.trackTotalHits(true);
-
// 2. 分页
-
source.from((request.getPage() - 1) * request.getSize());
-
source.size(request.getSize());
-
-
// 3. 排序
-
if (StringUtils.isNotBlank(request.getSort())
-
&& StringUtils.isNotBlank(request.getOrder())) {
-
source.sort(request.getSort(), SortOrder.fromString(request.getOrder()));
-
}
-
source.sort("gmt_publish", SortOrder.DESC);
-
source.sort("id", SortOrder.DESC);
-
-
// 4. 查询条件
-
BoolQueryBuilder boolQuery = QueryBuilders.boolQuery();
-
source.query(boolQuery);
-
// 内容状态
-
boolQuery.filter().add(QueryBuilders.termQuery("content_status", ContentStatusEnum.ENABLED.getStatus()));
-
// 内容类型
-
if (Objects.nonNull(request.getContentType())) {
-
boolQuery.filter().add(QueryBuilders.termQuery("content_type", request.getContentType()));
-
}
-
// 标签 ID
-
if (Objects.nonNull(request.getLabelId())) {
-
boolQuery.filter().add(QueryBuilders.termQuery("label_ids", request.getLabelId()));
-
}
-
// 一级行业 ID
-
if (Objects.nonNull(request.getOneIndustryId())) {
-
boolQuery.filter().add(QueryBuilders.termQuery("one_industry_id", request.getOneIndustryId()));
-
}
-
// 二级行业 ID
-
if (Objects.nonNull(request.getTwoIndustryId())) {
-
boolQuery.filter().add(QueryBuilders.termQuery("two_industry_id", request.getTwoIndustryId()));
-
}
-
// 应用权限
-
if (Objects.nonNull(request.getApp())) {
-
Set scopes = AppPermissionBitComputeUtils.getAllPermissionValueSetByAppPermissionBit(request.getApp());
-
scopes.add(0);
-
boolQuery.filter().add(QueryBuilders.termsQuery("scope", scopes));
-
}
-
// 关键词
-
if (StringUtils.isNotBlank(request.getKeyword())) {
-
BoolQueryBuilder keywordBoolQuery = QueryBuilders.boolQuery();
-
boolQuery.must().add(keywordBoolQuery);
-
// 内容标题
-
keywordBoolQuery.should().add(QueryBuilders.wildcardQuery("title", StringUtils.join("*", request.getKeyword(), "*")));
-
// 内容简介
-
keywordBoolQuery.should().add(QueryBuilders.termQuery("profile", request.getKeyword()));
-
}
-
// 创建人
-
if (StringUtils.isNotBlank(request.getCreator())) {
-
BoolQueryBuilder creatorBoolQuery = QueryBuilders.boolQuery();
-
boolQuery.must().add(creatorBoolQuery);
-
// 平台
-
creatorBoolQuery.should().add(QueryBuilders.termQuery("from", ContentFromEnum.PLATFORM.getCode()));
-
// 商家
-
BoolQueryBuilder mCreatorBoolQuery = QueryBuilders.boolQuery();
-
creatorBoolQuery.should().add(mCreatorBoolQuery);
-
mCreatorBoolQuery.must(QueryBuilders.termQuery("user_id", request.getCreator()));
-
mCreatorBoolQuery.must(QueryBuilders.termQuery("from", ContentFromEnum.MERCHANT.getCode()));
-
}
-
-
if (log.isDebugEnabled()) {
-
log.debug("内容搜索,query={}", source);
-
}
-
-
// 5. 内容查询
-
SearchResponse searchResponse;
-
try {
-
searchResponse = highLevelClient.search(searchRequest, RequestOptions.DEFAULT);
-
} catch (Exception e) {
-
log.error("内容搜索失败,request={}", JsonUtils.toJson(request));
-
throw new ContentCenterException(EnumErrorCode.C_DEFAULT_ERR, e);
-
}
-
// 6. 搜索结果处理
-
RestStatus status = searchResponse.status();
-
if (!status.equals(RestStatus.OK)) {
-
log.error("内容搜索失败,request={}, status={}", JsonUtils.toJson(request), status);
-
throw new ContentCenterException(EnumErrorCode.C_DEFAULT_ERR);
-
}
-
// 总数量
-
SearchHits hits = searchResponse.getHits();
-
long total = hits.getTotalHits().value;
-
if (total == 0) {
-
return page;
-
}
-
page.setTotal(total);
-
-
// 数据记录
-
List records = new ArrayList<>(request.getSize());
-
page.setRecords(records);
-
SearchHit[] hitArr = hits.getHits();
-
for (SearchHit hit : hitArr) {
-
ContentDocument document = JsonUtils.fromJson(hit.getSourceAsString(), ContentDocument.class);
-
if (Objects.isNull(document)) continue;
-
-
ContentSearchResponse response = new ContentSearchResponse();
-
BeanUtils.copyProperties(document, response);
-
-
page.getRecords().add(response);
-
}
-
-
return page;
-
}
-
-
@Override
-
public ContentSearchSuggestResponse suggest(ContentSearchSuggestRequest request) {
-
// 1. 查询请求
-
SearchRequest searchRequest = new SearchRequest(index);
-
SearchSourceBuilder source = new SearchSourceBuilder();
-
searchRequest.source(source);
-
source.size(0);
-
// 2. 查询条件
-
BoolQueryBuilder boolQuery = QueryBuilders.boolQuery();
-
source.query(boolQuery);
-
// 内容状态
-
boolQuery.filter().add(QueryBuilders.termQuery("content_status", ContentStatusEnum.ENABLED.getStatus()));
-
-
// 关键字
-
BoolQueryBuilder keywordBoolQuery = QueryBuilders.boolQuery();
-
boolQuery.must().add(keywordBoolQuery);
-
// 内容标题
-
keywordBoolQuery.should().add(QueryBuilders.wildcardQuery("title", StringUtils.join("*", request.getKeyword(), "*")));
-
// 内容简介
-
keywordBoolQuery.should().add(QueryBuilders.termQuery("profile", request.getKeyword()));
-
-
// 内容类型
-
boolQuery.filter().add(QueryBuilders.termQuery("content_type", request.getContentType()));
-
-
// 应用权限
-
if (Objects.nonNull(request.getApp())) {
-
Set scopes = AppPermissionBitComputeUtils.getAllPermissionValueSetByAppPermissionBit(request.getApp());
-
scopes.add(0);
-
boolQuery.filter().add(QueryBuilders.termsQuery("scope", scopes));
-
}
-
// 内容发布人
-
if (StringUtils.isNotBlank(request.getCreator())) {
-
BoolQueryBuilder creatorBoolQuery = QueryBuilders.boolQuery();
-
boolQuery.must().add(creatorBoolQuery);
-
// 平台
-
creatorBoolQuery.should().add(QueryBuilders.termQuery("from", ContentFromEnum.PLATFORM.getCode()));
-
// 商家
-
BoolQueryBuilder mCreatorBoolQuery = QueryBuilders.boolQuery();
-
creatorBoolQuery.should().add(mCreatorBoolQuery);
-
mCreatorBoolQuery.must(QueryBuilders.termQuery("user_id", request.getCreator()));
-
mCreatorBoolQuery.must(QueryBuilders.termQuery("from", ContentFromEnum.MERCHANT.getCode()));
-
}
-
-
// 聚合
-
TermsAggregationBuilder aggs = AggregationBuilders.terms("title-terms")
-
.field("title").size(10).order(BucketOrder.compound(BucketOrder.count(false), BucketOrder.key(false)));
-
source.aggregation(aggs);
-
-
if (log.isDebugEnabled()) {
-
log.debug("内容搜索提示,query={}", source);
-
}
-
-
// 3. 内容查询
-
SearchResponse searchResponse;
-
try {
-
searchResponse = highLevelClient.search(searchRequest, RequestOptions.DEFAULT);
-
} catch (Exception e) {
-
log.error("内容搜索提示查询失败,request={}", request);
-
throw new ContentCenterException(EnumErrorCode.C_DEFAULT_ERR, e);
-
}
-
// 4. 搜索结果处理
-
RestStatus status = searchResponse.status();
-
if (!status.equals(RestStatus.OK)) {
-
log.error("内容搜索提示查询失败,request={}, status={}", request, status);
-
throw new ContentCenterException(EnumErrorCode.C_DEFAULT_ERR);
-
}
-
-
ContentSearchSuggestResponse response = new ContentSearchSuggestResponse(request.getContentType(),
-
new ArrayList<>(10));
-
-
Aggregations aggregations = searchResponse.getAggregations();
-
ParsedStringTerms terms = aggregations.get("title-terms");
-
List buckets = terms.getBuckets();
-
-
for (Terms.Bucket bucket : buckets) {
-
response.getSuggests().add(bucket.getKeyAsString());
-
}
-
-
return response;
-
}
-
}
文章来源: blog.csdn.net,作者:血煞风雨城2018,版权归原作者所有,如需转载,请联系作者。
原文链接:blog.csdn.net/qq_31905135/article/details/109814141
【版权声明】本文为华为云社区用户转载文章,如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱:
cloudbbs@huaweicloud.com
- 点赞
- 收藏
- 关注作者
评论(0)