spring boot项目集成Elasticsearch
【摘要】 一、添加elasticsearch 依赖
<dependency> <groupId>org.elasticsearch</groupId> <artifactId>elasticsearch</artifactId> </dependency> <dependency> <groupId>org.elasticsearch.client</groupId> <arti...
一、添加elasticsearch 依赖
<dependency>
<groupId>org.elasticsearchgroupId>
<artifactId>elasticsearchartifactId>
dependency>
<dependency>
<groupId>org.elasticsearch.clientgroupId>
<artifactId>elasticsearch-rest-clientartifactId>
dependency>
<dependency>
<groupId>org.elasticsearch.clientgroupId>
<artifactId>elasticsearch-rest-high-level-clientartifactId>
dependency>
二、在application.yml配置elasticsearch 连接信息
elasticsearch:
clusterNodes:
- "10.2.140.31:30869"
三、编写RestHighLevelClient 配置类
package com.hikvision.smbg.content.core.config;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.apache.http.HttpHost;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.client.RestHighLevelClient;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.FactoryBean;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.util.CollectionUtils;
import java.util.ArrayList;
import java.util.List;
/**
* @author yiyuxin
* @since 2020-05-13
*/
@Slf4j
@Configuration
@EnableConfigurationProperties(ElasticsearchConfig.ElasticsearchProperties.class)
public class ElasticsearchConfig {
private final ElasticsearchProperties elasticsearchProperties;
public ElasticsearchConfig(ElasticsearchProperties elasticsearchProperties) {
this.elasticsearchProperties = elasticsearchProperties;
}
@Bean("highLevelClient")
public RestHighLevelClientFactoryBean highLevelClient() {
return new RestHighLevelClientFactoryBean(elasticsearchProperties);
}
/**
* 实现FactoryBean,自定义获取RestHighLevelClient的bean实例,
* 默认单例(FactoryBean接口的isSingleton方法默认返回true)
* 实现Disposable接口,bean实例销毁的时候关闭RestHighLevelClient连接
*/
public static class RestHighLevelClientFactoryBean implements FactoryBean<RestHighLevelClient>, DisposableBean {
private RestHighLevelClient highLevelClient;
private ElasticsearchProperties properties;
public RestHighLevelClientFactoryBean(ElasticsearchProperties elasticsearchProperties) {
this.properties = elasticsearchProperties;
}
@Override
public RestHighLevelClient getObject() {
List clusterNodes = properties.getClusterNodes();
List httpHosts = new ArrayList<>(4);
if (!CollectionUtils.isEmpty(clusterNodes)) {
for (String clusterNode : clusterNodes) {
//以:分隔获取到ip 和 端口
String[] hostPort = clusterNode.split(":");
httpHosts.add(new HttpHost(hostPort[0], Integer.parseInt(hostPort[1])));
}
} else {
//如果没有配置ip和端口,默认为localhost:9200
httpHosts.add(new HttpHost("localhost", 9200));
}
RestClientBuilder clientBuilder = RestClient.builder(httpHosts.toArray(new HttpHost[0])).setRequestConfigCallback(requestConfigBuilder -> {
requestConfigBuilder.setConnectionRequestTimeout(10000); // 从连接池获取连接超时时间
requestConfigBuilder.setConnectTimeout(5000); // 连接服务端超时时间
requestConfigBuilder.setSocketTimeout(5000 * 60); // 读取超时
return requestConfigBuilder;
});
clientBuilder.setHttpClientConfigCallback(httpClientBuilder -> {
httpClientBuilder.setMaxConnTotal(100); //最大连接数
httpClientBuilder.setKeepAliveStrategy((response, context) -> 10 * 60 * 1000); //保活时间
httpClientBuilder.setMaxConnPerRoute(100); //针对一个域名同时间正在使用的最多的连接数
return httpClientBuilder;
});
highLevelClient = new RestHighLevelClient(clientBuilder);
return highLevelClient;
}
@Override
public Class getObjectType() {
return RestHighLevelClient.class;
}
@Override
public void destroy() {
try {
log.info("Closing elasticSearch client");
if (highLevelClient != null) {
highLevelClient.close();
}
} catch (final Exception e) {
log.error("Error closing ElasticSearch client: ", e);
}
}
}
@Data
@ConfigurationProperties(prefix = "elasticsearch")
public static class ElasticsearchProperties {
private List clusterNodes;
}
}
四、使用
package com.hikvision.smbg.content.core.rpc.impl;
import com.hikvision.smbg.content.common.dto.document.ContentDocument;
import com.hikvision.smbg.content.common.dto.document.Document;
import com.hikvision.smbg.content.common.exception.ContentCenterException;
import com.hikvision.smbg.content.common.exception.EnumErrorCode;
import com.hikvision.smbg.content.common.util.JsonUtils;
import com.hikvision.smbg.content.core.rpc.EsSyncService;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;
import org.elasticsearch.action.bulk.BulkItemResponse;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.reindex.BulkByScrollResponse;
import org.elasticsearch.index.reindex.UpdateByQueryRequest;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.script.Script;
import org.elasticsearch.script.ScriptType;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.elasticsearch.rest.RestClientAutoConfiguration;
import org.springframework.stereotype.Service;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
/**
* {@link RestClientAutoConfiguration}
*
* @author yiyuxin
* @since 2020-10-30
*/
@Slf4j
@Service
public class EsSyncServiceImpl implements EsSyncService {
/**
* 内容索引
*/
private static String index = ContentDocument.class.getAnnotation(Document.class).index();
/**
* 客户端
*/
@Autowired
private RestHighLevelClient highLevelClient;
@Override
public void index(ContentDocument document) {
// 构建索引请求
String id = String.valueOf(document.getId());
String source = Objects.requireNonNull(JsonUtils.toJson(document));
IndexRequest request = new IndexRequest(index).id(id).source(source, XContentType.JSON);
// 索引内容文档
IndexResponse response;
try {
response = highLevelClient.index(request, RequestOptions.DEFAULT);
} catch (Exception e) {
log.error("内容索引失败,document={}", source);
throw new ContentCenterException(EnumErrorCode.C_DEFAULT_ERR, e);
}
// 请求结果解析
RestStatus status = response.status();
if (!RestStatus.OK.equals(status) && !RestStatus.CREATED.equals(status)) {
log.error("内容索引失败,document={}, status={}", source, status);
throw new ContentCenterException(EnumErrorCode.C_DEFAULT_ERR);
}
if (log.isDebugEnabled()) {
log.debug("内容索引成功,document={}", source);
}
}
@Override
public void enableOrDisable(List contentIds, Integer contentStatus) {
UpdateByQueryRequest request = new UpdateByQueryRequest(index);
String[] ids = contentIds.stream().map(String::valueOf).toArray(String[]::new);
QueryBuilder query = QueryBuilders.idsQuery().addIds(ids);
request.setQuery(query);
Map<String, Object> params = new HashMap<>(contentIds.size());
for (String id : ids) {
params.put(id, contentStatus);
}
String code = "if(ctx._source.content_status==params[ctx._id]){ctx.op='noop'}"
+ "else{ctx._source.content_status=params[ctx._id]}";
Script script = new Script(ScriptType.INLINE, Script.DEFAULT_SCRIPT_LANG, code, params);
request.setScript(script);
request.setRefresh(true);
BulkByScrollResponse response;
try {
response = highLevelClient.updateByQuery(request, RequestOptions.DEFAULT);
} catch (IOException e) {
log.error("内容启用或停用失败,contentIds={},contentStatus={}", contentIds, contentStatus);
throw new ContentCenterException(EnumErrorCode.C_DEFAULT_ERR, e);
}
List failures = response.getBulkFailures();
if (CollectionUtils.isNotEmpty(failures)) {
log.error("内容启用或停用失败,contentIds={},contentStatus={}, failures={}", contentIds, contentStatus, failures);
throw new ContentCenterException(EnumErrorCode.C_DEFAULT_ERR);
}
if (log.isDebugEnabled()) {
log.debug("内容启用或停用成功,contentIds={},contentStatus={}", contentIds, contentStatus);
}
}
@Override
public void syncStatistics(List documents) {
// 构建批量请求
BulkRequest bulkRequest = new BulkRequest(index);
bulkRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
for (ContentDocument document : documents) {
UpdateRequest request = new UpdateRequest();
request.id(String.valueOf(document.getId()));
request.doc(JsonUtils.toJson(document), XContentType.JSON);
bulkRequest.add(request);
}
// 批量请求响应处理
BulkResponse bulkResponse;
try {
bulkResponse = highLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT);
} catch (IOException e) {
log.error("内容统计数据同步失败,documents={}", JsonUtils.toJson(documents));
throw new ContentCenterException(EnumErrorCode.C_DEFAULT_ERR, e);
}
if (bulkResponse.hasFailures()) {
log.error("内容统计数据同步失败,documents={}, failures={}", JsonUtils.toJson(documents), bulkResponse.buildFailureMessage());
throw new ContentCenterException(EnumErrorCode.C_DEFAULT_ERR);
}
if (log.isDebugEnabled()) {
log.debug("内容统计数据同步成功,documents={}", JsonUtils.toJson(documents));
}
}
}
package com.hikvision.smbg.content.core.rpc.impl;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.hikvision.smbg.content.common.dto.document.ContentDocument;
import com.hikvision.smbg.content.common.dto.document.Document;
import com.hikvision.smbg.content.common.dto.request.ContentSearchRequest;
import com.hikvision.smbg.content.common.dto.request.ContentSearchSuggestRequest;
import com.hikvision.smbg.content.common.dto.response.ContentSearchResponse;
import com.hikvision.smbg.content.common.dto.response.ContentSearchSuggestResponse;
import com.hikvision.smbg.content.common.enums.ContentFromEnum;
import com.hikvision.smbg.content.common.enums.ContentStatusEnum;
import com.hikvision.smbg.content.common.exception.ContentCenterException;
import com.hikvision.smbg.content.common.exception.EnumErrorCode;
import com.hikvision.smbg.content.common.util.AppPermissionBitComputeUtils;
import com.hikvision.smbg.content.common.util.JsonUtils;
import com.hikvision.smbg.content.core.rpc.EsSearchService;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.Aggregations;
import org.elasticsearch.search.aggregations.BucketOrder;
import org.elasticsearch.search.aggregations.bucket.terms.ParsedStringTerms;
import org.elasticsearch.search.aggregations.bucket.terms.Terms;
import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.sort.SortOrder;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.Set;
/**
* @author yiyuxin
* @since 2020-10-30
*/
@Slf4j
@Service
public class EsSearchServiceImpl implements EsSearchService {
/**
* 内容索引
*/
private static String index = ContentDocument.class.getAnnotation(Document.class).index();
/**
* 客户端
*/
@Autowired
private RestHighLevelClient highLevelClient;
@Override
public IPage search(ContentSearchRequest request) {
IPage page = new Page<>();
// 1. 查询请求
SearchRequest searchRequest = new SearchRequest(index);
SearchSourceBuilder source = new SearchSourceBuilder();
searchRequest.source(source);
source.trackTotalHits(true);
// 2. 分页
source.from((request.getPage() - 1) * request.getSize());
source.size(request.getSize());
// 3. 排序
if (StringUtils.isNotBlank(request.getSort())
&& StringUtils.isNotBlank(request.getOrder())) {
source.sort(request.getSort(), SortOrder.fromString(request.getOrder()));
}
source.sort("gmt_publish", SortOrder.DESC);
source.sort("id", SortOrder.DESC);
// 4. 查询条件
BoolQueryBuilder boolQuery = QueryBuilders.boolQuery();
source.query(boolQuery);
// 内容状态
boolQuery.filter().add(QueryBuilders.termQuery("content_status", ContentStatusEnum.ENABLED.getStatus()));
// 内容类型
if (Objects.nonNull(request.getContentType())) {
boolQuery.filter().add(QueryBuilders.termQuery("content_type", request.getContentType()));
}
// 标签 ID
if (Objects.nonNull(request.getLabelId())) {
boolQuery.filter().add(QueryBuilders.termQuery("label_ids", request.getLabelId()));
}
// 一级行业 ID
if (Objects.nonNull(request.getOneIndustryId())) {
boolQuery.filter().add(QueryBuilders.termQuery("one_industry_id", request.getOneIndustryId()));
}
// 二级行业 ID
if (Objects.nonNull(request.getTwoIndustryId())) {
boolQuery.filter().add(QueryBuilders.termQuery("two_industry_id", request.getTwoIndustryId()));
}
// 应用权限
if (Objects.nonNull(request.getApp())) {
Set scopes = AppPermissionBitComputeUtils.getAllPermissionValueSetByAppPermissionBit(request.getApp());
scopes.add(0);
boolQuery.filter().add(QueryBuilders.termsQuery("scope", scopes));
}
// 关键词
if (StringUtils.isNotBlank(request.getKeyword())) {
BoolQueryBuilder keywordBoolQuery = QueryBuilders.boolQuery();
boolQuery.must().add(keywordBoolQuery);
// 内容标题
keywordBoolQuery.should().add(QueryBuilders.wildcardQuery("title", StringUtils.join("*", request.getKeyword(), "*")));
// 内容简介
keywordBoolQuery.should().add(QueryBuilders.termQuery("profile", request.getKeyword()));
}
// 创建人
if (StringUtils.isNotBlank(request.getCreator())) {
BoolQueryBuilder creatorBoolQuery = QueryBuilders.boolQuery();
boolQuery.must().add(creatorBoolQuery);
// 平台
creatorBoolQuery.should().add(QueryBuilders.termQuery("from", ContentFromEnum.PLATFORM.getCode()));
// 商家
BoolQueryBuilder mCreatorBoolQuery = QueryBuilders.boolQuery();
creatorBoolQuery.should().add(mCreatorBoolQuery);
mCreatorBoolQuery.must(QueryBuilders.termQuery("user_id", request.getCreator()));
mCreatorBoolQuery.must(QueryBuilders.termQuery("from", ContentFromEnum.MERCHANT.getCode()));
}
if (log.isDebugEnabled()) {
log.debug("内容搜索,query={}", source);
}
// 5. 内容查询
SearchResponse searchResponse;
try {
searchResponse = highLevelClient.search(searchRequest, RequestOptions.DEFAULT);
} catch (Exception e) {
log.error("内容搜索失败,request={}", JsonUtils.toJson(request));
throw new ContentCenterException(EnumErrorCode.C_DEFAULT_ERR, e);
}
// 6. 搜索结果处理
RestStatus status = searchResponse.status();
if (!status.equals(RestStatus.OK)) {
log.error("内容搜索失败,request={}, status={}", JsonUtils.toJson(request), status);
throw new ContentCenterException(EnumErrorCode.C_DEFAULT_ERR);
}
// 总数量
SearchHits hits = searchResponse.getHits();
long total = hits.getTotalHits().value;
if (total == 0) {
return page;
}
page.setTotal(total);
// 数据记录
List records = new ArrayList<>(request.getSize());
page.setRecords(records);
SearchHit[] hitArr = hits.getHits();
for (SearchHit hit : hitArr) {
ContentDocument document = JsonUtils.fromJson(hit.getSourceAsString(), ContentDocument.class);
if (Objects.isNull(document)) continue;
ContentSearchResponse response = new ContentSearchResponse();
BeanUtils.copyProperties(document, response);
page.getRecords().add(response);
}
return page;
}
@Override
public ContentSearchSuggestResponse suggest(ContentSearchSuggestRequest request) {
// 1. 查询请求
SearchRequest searchRequest = new SearchRequest(index);
SearchSourceBuilder source = new SearchSourceBuilder();
searchRequest.source(source);
source.size(0);
// 2. 查询条件
BoolQueryBuilder boolQuery = QueryBuilders.boolQuery();
source.query(boolQuery);
// 内容状态
boolQuery.filter().add(QueryBuilders.termQuery("content_status", ContentStatusEnum.ENABLED.getStatus()));
// 关键字
BoolQueryBuilder keywordBoolQuery = QueryBuilders.boolQuery();
boolQuery.must().add(keywordBoolQuery);
// 内容标题
keywordBoolQuery.should().add(QueryBuilders.wildcardQuery("title", StringUtils.join("*", request.getKeyword(), "*")));
// 内容简介
keywordBoolQuery.should().add(QueryBuilders.termQuery("profile", request.getKeyword()));
// 内容类型
boolQuery.filter().add(QueryBuilders.termQuery("content_type", request.getContentType()));
// 应用权限
if (Objects.nonNull(request.getApp())) {
Set scopes = AppPermissionBitComputeUtils.getAllPermissionValueSetByAppPermissionBit(request.getApp());
scopes.add(0);
boolQuery.filter().add(QueryBuilders.termsQuery("scope", scopes));
}
// 内容发布人
if (StringUtils.isNotBlank(request.getCreator())) {
BoolQueryBuilder creatorBoolQuery = QueryBuilders.boolQuery();
boolQuery.must().add(creatorBoolQuery);
// 平台
creatorBoolQuery.should().add(QueryBuilders.termQuery("from", ContentFromEnum.PLATFORM.getCode()));
// 商家
BoolQueryBuilder mCreatorBoolQuery = QueryBuilders.boolQuery();
creatorBoolQuery.should().add(mCreatorBoolQuery);
mCreatorBoolQuery.must(QueryBuilders.termQuery("user_id", request.getCreator()));
mCreatorBoolQuery.must(QueryBuilders.termQuery("from", ContentFromEnum.MERCHANT.getCode()));
}
// 聚合
TermsAggregationBuilder aggs = AggregationBuilders.terms("title-terms")
.field("title").size(10).order(BucketOrder.compound(BucketOrder.count(false), BucketOrder.key(false)));
source.aggregation(aggs);
if (log.isDebugEnabled()) {
log.debug("内容搜索提示,query={}", source);
}
// 3. 内容查询
SearchResponse searchResponse;
try {
searchResponse = highLevelClient.search(searchRequest, RequestOptions.DEFAULT);
} catch (Exception e) {
log.error("内容搜索提示查询失败,request={}", request);
throw new ContentCenterException(EnumErrorCode.C_DEFAULT_ERR, e);
}
// 4. 搜索结果处理
RestStatus status = searchResponse.status();
if (!status.equals(RestStatus.OK)) {
log.error("内容搜索提示查询失败,request={}, status={}", request, status);
throw new ContentCenterException(EnumErrorCode.C_DEFAULT_ERR);
}
ContentSearchSuggestResponse response = new ContentSearchSuggestResponse(request.getContentType(),
new ArrayList<>(10));
Aggregations aggregations = searchResponse.getAggregations();
ParsedStringTerms terms = aggregations.get("title-terms");
List buckets = terms.getBuckets();
for (Terms.Bucket bucket : buckets) {
response.getSuggests().add(bucket.getKeyAsString());
}
return response;
}
}
文章来源: blog.csdn.net,作者:血煞风雨城2018,版权归原作者所有,如需转载,请联系作者。
原文链接:blog.csdn.net/qq_31905135/article/details/109814141
【版权声明】本文为华为云社区用户转载文章,如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱:
cloudbbs@huaweicloud.com
- 点赞
- 收藏
- 关注作者
作者其他文章
评论(0)