spring boot项目集成Elasticsearch

举报
小米粒-biubiubiu 发表于 2020/11/30 23:21:05 2020/11/30
4.2k+ 0 0
【摘要】 一、添加elasticsearch 依赖 <dependency> <groupId>org.elasticsearch</groupId> <artifactId>elasticsearch</artifactId> </dependency> <dependency> <groupId>org.elasticsearch.client</groupId> <arti...

一、添加elasticsearch 依赖


      <dependency>
      <groupId>org.elasticsearchgroupId>
      <artifactId>elasticsearchartifactId>
      dependency>
      <dependency>
      <groupId>org.elasticsearch.clientgroupId>
      <artifactId>elasticsearch-rest-clientartifactId>
      dependency>
      <dependency>
      <groupId>org.elasticsearch.clientgroupId>
      <artifactId>elasticsearch-rest-high-level-clientartifactId>
      dependency>
  
 

二、在application.yml配置elasticsearch 连接信息


      elasticsearch:
       clusterNodes:
      - "10.2.140.31:30869"
  
 

三、编写RestHighLevelClient 配置类


      package com.hikvision.smbg.content.core.config;
      import lombok.Data;
      import lombok.extern.slf4j.Slf4j;
      import org.apache.http.HttpHost;
      import org.elasticsearch.client.RestClient;
      import org.elasticsearch.client.RestClientBuilder;
      import org.elasticsearch.client.RestHighLevelClient;
      import org.springframework.beans.factory.DisposableBean;
      import org.springframework.beans.factory.FactoryBean;
      import org.springframework.boot.context.properties.ConfigurationProperties;
      import org.springframework.boot.context.properties.EnableConfigurationProperties;
      import org.springframework.context.annotation.Bean;
      import org.springframework.context.annotation.Configuration;
      import org.springframework.util.CollectionUtils;
      import java.util.ArrayList;
      import java.util.List;
      /**
       * @author yiyuxin
       * @since 2020-05-13
       */
      @Slf4j
      @Configuration
      @EnableConfigurationProperties(ElasticsearchConfig.ElasticsearchProperties.class)
      public class ElasticsearchConfig {
      private final ElasticsearchProperties elasticsearchProperties;
      public ElasticsearchConfig(ElasticsearchProperties elasticsearchProperties) {
      this.elasticsearchProperties = elasticsearchProperties;
       }
      @Bean("highLevelClient")
      public RestHighLevelClientFactoryBean highLevelClient() {
      return new RestHighLevelClientFactoryBean(elasticsearchProperties);
       }
      /**
       * 实现FactoryBean,自定义获取RestHighLevelClient的bean实例,
       * 默认单例(FactoryBean接口的isSingleton方法默认返回true)
       * 实现Disposable接口,bean实例销毁的时候关闭RestHighLevelClient连接
       */
      public static class RestHighLevelClientFactoryBean implements FactoryBean<RestHighLevelClient>, DisposableBean {
      private RestHighLevelClient highLevelClient;
      private ElasticsearchProperties properties;
      public RestHighLevelClientFactoryBean(ElasticsearchProperties elasticsearchProperties) {
      this.properties = elasticsearchProperties;
       }
      @Override
      public RestHighLevelClient getObject() {
       List clusterNodes = properties.getClusterNodes();
       List httpHosts = new ArrayList<>(4);
      if (!CollectionUtils.isEmpty(clusterNodes)) {
      for (String clusterNode : clusterNodes) {
      //以:分隔获取到ip 和 端口
       String[] hostPort = clusterNode.split(":");
       httpHosts.add(new HttpHost(hostPort[0], Integer.parseInt(hostPort[1])));
       }
       } else {
      //如果没有配置ip和端口,默认为localhost:9200
       httpHosts.add(new HttpHost("localhost", 9200));
       }
       RestClientBuilder clientBuilder = RestClient.builder(httpHosts.toArray(new HttpHost[0])).setRequestConfigCallback(requestConfigBuilder -> {
       requestConfigBuilder.setConnectionRequestTimeout(10000); // 从连接池获取连接超时时间
       requestConfigBuilder.setConnectTimeout(5000); // 连接服务端超时时间
       requestConfigBuilder.setSocketTimeout(5000 * 60); // 读取超时
      return requestConfigBuilder;
       });
       clientBuilder.setHttpClientConfigCallback(httpClientBuilder -> {
       httpClientBuilder.setMaxConnTotal(100); //最大连接数
       httpClientBuilder.setKeepAliveStrategy((response, context) -> 10 * 60 * 1000); //保活时间
       httpClientBuilder.setMaxConnPerRoute(100);  //针对一个域名同时间正在使用的最多的连接数
      return httpClientBuilder;
       });
       highLevelClient = new RestHighLevelClient(clientBuilder);
      return highLevelClient;
       }
      @Override
      public Class getObjectType() {
      return RestHighLevelClient.class;
       }
      @Override
      public void destroy() {
      try {
       log.info("Closing elasticSearch client");
      if (highLevelClient != null) {
       highLevelClient.close();
       }
       } catch (final Exception e) {
       log.error("Error closing ElasticSearch client: ", e);
       }
       }
       }
      @Data
      @ConfigurationProperties(prefix = "elasticsearch")
      public static class ElasticsearchProperties {
      private List clusterNodes;
       }
      }
  
 

四、使用


      package com.hikvision.smbg.content.core.rpc.impl;
      import com.hikvision.smbg.content.common.dto.document.ContentDocument;
      import com.hikvision.smbg.content.common.dto.document.Document;
      import com.hikvision.smbg.content.common.exception.ContentCenterException;
      import com.hikvision.smbg.content.common.exception.EnumErrorCode;
      import com.hikvision.smbg.content.common.util.JsonUtils;
      import com.hikvision.smbg.content.core.rpc.EsSyncService;
      import lombok.extern.slf4j.Slf4j;
      import org.apache.commons.collections4.CollectionUtils;
      import org.elasticsearch.action.bulk.BulkItemResponse;
      import org.elasticsearch.action.bulk.BulkRequest;
      import org.elasticsearch.action.bulk.BulkResponse;
      import org.elasticsearch.action.index.IndexRequest;
      import org.elasticsearch.action.index.IndexResponse;
      import org.elasticsearch.action.support.WriteRequest;
      import org.elasticsearch.action.update.UpdateRequest;
      import org.elasticsearch.client.RequestOptions;
      import org.elasticsearch.client.RestHighLevelClient;
      import org.elasticsearch.common.xcontent.XContentType;
      import org.elasticsearch.index.query.QueryBuilder;
      import org.elasticsearch.index.query.QueryBuilders;
      import org.elasticsearch.index.reindex.BulkByScrollResponse;
      import org.elasticsearch.index.reindex.UpdateByQueryRequest;
      import org.elasticsearch.rest.RestStatus;
      import org.elasticsearch.script.Script;
      import org.elasticsearch.script.ScriptType;
      import org.springframework.beans.factory.annotation.Autowired;
      import org.springframework.boot.autoconfigure.elasticsearch.rest.RestClientAutoConfiguration;
      import org.springframework.stereotype.Service;
      import java.io.IOException;
      import java.util.HashMap;
      import java.util.List;
      import java.util.Map;
      import java.util.Objects;
      /**
       * {@link RestClientAutoConfiguration}
       *
       * @author yiyuxin
       * @since 2020-10-30
       */
      @Slf4j
      @Service
      public class EsSyncServiceImpl implements EsSyncService {
      /**
       * 内容索引
       */
      private static String index = ContentDocument.class.getAnnotation(Document.class).index();
      /**
       * 客户端
       */
      @Autowired
      private RestHighLevelClient highLevelClient;
      @Override
      public void index(ContentDocument document) {
      // 构建索引请求
      String id = String.valueOf(document.getId());
      String source = Objects.requireNonNull(JsonUtils.toJson(document));
       IndexRequest request = new IndexRequest(index).id(id).source(source, XContentType.JSON);
      // 索引内容文档
       IndexResponse response;
      try {
       response = highLevelClient.index(request, RequestOptions.DEFAULT);
       } catch (Exception e) {
       log.error("内容索引失败,document={}", source);
      throw new ContentCenterException(EnumErrorCode.C_DEFAULT_ERR, e);
       }
      // 请求结果解析
       RestStatus status = response.status();
      if (!RestStatus.OK.equals(status) && !RestStatus.CREATED.equals(status)) {
       log.error("内容索引失败,document={}, status={}", source, status);
      throw new ContentCenterException(EnumErrorCode.C_DEFAULT_ERR);
       }
      if (log.isDebugEnabled()) {
       log.debug("内容索引成功,document={}", source);
       }
       }
      @Override
      public void enableOrDisable(List contentIds, Integer contentStatus) {
       UpdateByQueryRequest request = new UpdateByQueryRequest(index);
      String[] ids = contentIds.stream().map(String::valueOf).toArray(String[]::new);
       QueryBuilder query = QueryBuilders.idsQuery().addIds(ids);
       request.setQuery(query);
      Map<String, Object> params = new HashMap<>(contentIds.size());
      for (String id : ids) {
       params.put(id, contentStatus);
       }
      String code = "if(ctx._source.content_status==params[ctx._id]){ctx.op='noop'}"
       + "else{ctx._source.content_status=params[ctx._id]}";
       Script script = new Script(ScriptType.INLINE, Script.DEFAULT_SCRIPT_LANG, code, params);
       request.setScript(script);
       request.setRefresh(true);
       BulkByScrollResponse response;
      try {
       response = highLevelClient.updateByQuery(request, RequestOptions.DEFAULT);
       } catch (IOException e) {
       log.error("内容启用或停用失败,contentIds={},contentStatus={}", contentIds, contentStatus);
      throw new ContentCenterException(EnumErrorCode.C_DEFAULT_ERR, e);
       }
       List failures = response.getBulkFailures();
      if (CollectionUtils.isNotEmpty(failures)) {
       log.error("内容启用或停用失败,contentIds={},contentStatus={}, failures={}", contentIds, contentStatus, failures);
      throw new ContentCenterException(EnumErrorCode.C_DEFAULT_ERR);
       }
      if (log.isDebugEnabled()) {
       log.debug("内容启用或停用成功,contentIds={},contentStatus={}", contentIds, contentStatus);
       }
       }
      @Override
      public void syncStatistics(List documents) {
      // 构建批量请求
       BulkRequest bulkRequest = new BulkRequest(index);
       bulkRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
      for (ContentDocument document : documents) {
       UpdateRequest request = new UpdateRequest();
       request.id(String.valueOf(document.getId()));
       request.doc(JsonUtils.toJson(document), XContentType.JSON);
       bulkRequest.add(request);
       }
      // 批量请求响应处理
       BulkResponse bulkResponse;
      try {
       bulkResponse = highLevelClient.bulk(bulkRequest, RequestOptions.DEFAULT);
       } catch (IOException e) {
       log.error("内容统计数据同步失败,documents={}", JsonUtils.toJson(documents));
      throw new ContentCenterException(EnumErrorCode.C_DEFAULT_ERR, e);
       }
      if (bulkResponse.hasFailures()) {
       log.error("内容统计数据同步失败,documents={}, failures={}", JsonUtils.toJson(documents), bulkResponse.buildFailureMessage());
      throw new ContentCenterException(EnumErrorCode.C_DEFAULT_ERR);
       }
      if (log.isDebugEnabled()) {
       log.debug("内容统计数据同步成功,documents={}", JsonUtils.toJson(documents));
       }
       }
      }
  
 

      package com.hikvision.smbg.content.core.rpc.impl;
      import com.baomidou.mybatisplus.core.metadata.IPage;
      import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
      import com.hikvision.smbg.content.common.dto.document.ContentDocument;
      import com.hikvision.smbg.content.common.dto.document.Document;
      import com.hikvision.smbg.content.common.dto.request.ContentSearchRequest;
      import com.hikvision.smbg.content.common.dto.request.ContentSearchSuggestRequest;
      import com.hikvision.smbg.content.common.dto.response.ContentSearchResponse;
      import com.hikvision.smbg.content.common.dto.response.ContentSearchSuggestResponse;
      import com.hikvision.smbg.content.common.enums.ContentFromEnum;
      import com.hikvision.smbg.content.common.enums.ContentStatusEnum;
      import com.hikvision.smbg.content.common.exception.ContentCenterException;
      import com.hikvision.smbg.content.common.exception.EnumErrorCode;
      import com.hikvision.smbg.content.common.util.AppPermissionBitComputeUtils;
      import com.hikvision.smbg.content.common.util.JsonUtils;
      import com.hikvision.smbg.content.core.rpc.EsSearchService;
      import lombok.extern.slf4j.Slf4j;
      import org.apache.commons.lang3.StringUtils;
      import org.elasticsearch.action.search.SearchRequest;
      import org.elasticsearch.action.search.SearchResponse;
      import org.elasticsearch.client.RequestOptions;
      import org.elasticsearch.client.RestHighLevelClient;
      import org.elasticsearch.index.query.BoolQueryBuilder;
      import org.elasticsearch.index.query.QueryBuilders;
      import org.elasticsearch.rest.RestStatus;
      import org.elasticsearch.search.SearchHit;
      import org.elasticsearch.search.SearchHits;
      import org.elasticsearch.search.aggregations.AggregationBuilders;
      import org.elasticsearch.search.aggregations.Aggregations;
      import org.elasticsearch.search.aggregations.BucketOrder;
      import org.elasticsearch.search.aggregations.bucket.terms.ParsedStringTerms;
      import org.elasticsearch.search.aggregations.bucket.terms.Terms;
      import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
      import org.elasticsearch.search.builder.SearchSourceBuilder;
      import org.elasticsearch.search.sort.SortOrder;
      import org.springframework.beans.BeanUtils;
      import org.springframework.beans.factory.annotation.Autowired;
      import org.springframework.stereotype.Service;
      import java.util.ArrayList;
      import java.util.List;
      import java.util.Objects;
      import java.util.Set;
      /**
       * @author yiyuxin
       * @since 2020-10-30
       */
      @Slf4j
      @Service
      public class EsSearchServiceImpl implements EsSearchService {
      /**
       * 内容索引
       */
      private static String index = ContentDocument.class.getAnnotation(Document.class).index();
       /**
       * 客户端
       */
      @Autowired
      private RestHighLevelClient highLevelClient;
      @Override
      public IPage search(ContentSearchRequest request) {
       IPage page = new Page<>();
      // 1. 查询请求
       SearchRequest searchRequest = new SearchRequest(index);
       SearchSourceBuilder source = new SearchSourceBuilder();
       searchRequest.source(source);
       source.trackTotalHits(true);
      // 2. 分页
       source.from((request.getPage() - 1) * request.getSize());
       source.size(request.getSize());
      // 3. 排序
      if (StringUtils.isNotBlank(request.getSort())
       && StringUtils.isNotBlank(request.getOrder())) {
       source.sort(request.getSort(), SortOrder.fromString(request.getOrder()));
       }
       source.sort("gmt_publish", SortOrder.DESC);
       source.sort("id", SortOrder.DESC);
      // 4. 查询条件
       BoolQueryBuilder boolQuery = QueryBuilders.boolQuery();
       source.query(boolQuery);
      // 内容状态
       boolQuery.filter().add(QueryBuilders.termQuery("content_status", ContentStatusEnum.ENABLED.getStatus()));
      // 内容类型
      if (Objects.nonNull(request.getContentType())) {
       boolQuery.filter().add(QueryBuilders.termQuery("content_type", request.getContentType()));
       }
      // 标签 ID
      if (Objects.nonNull(request.getLabelId())) {
       boolQuery.filter().add(QueryBuilders.termQuery("label_ids", request.getLabelId()));
       }
      // 一级行业 ID
      if (Objects.nonNull(request.getOneIndustryId())) {
       boolQuery.filter().add(QueryBuilders.termQuery("one_industry_id", request.getOneIndustryId()));
       }
      // 二级行业 ID
      if (Objects.nonNull(request.getTwoIndustryId())) {
       boolQuery.filter().add(QueryBuilders.termQuery("two_industry_id", request.getTwoIndustryId()));
       }
      // 应用权限
      if (Objects.nonNull(request.getApp())) {
       Set scopes = AppPermissionBitComputeUtils.getAllPermissionValueSetByAppPermissionBit(request.getApp());
       scopes.add(0);
       boolQuery.filter().add(QueryBuilders.termsQuery("scope", scopes));
       }
      // 关键词
      if (StringUtils.isNotBlank(request.getKeyword())) {
       BoolQueryBuilder keywordBoolQuery = QueryBuilders.boolQuery();
       boolQuery.must().add(keywordBoolQuery);
      // 内容标题
       keywordBoolQuery.should().add(QueryBuilders.wildcardQuery("title", StringUtils.join("*", request.getKeyword(), "*")));
      // 内容简介
       keywordBoolQuery.should().add(QueryBuilders.termQuery("profile", request.getKeyword()));
       }
      // 创建人
      if (StringUtils.isNotBlank(request.getCreator())) {
       BoolQueryBuilder creatorBoolQuery = QueryBuilders.boolQuery();
       boolQuery.must().add(creatorBoolQuery);
      // 平台
       creatorBoolQuery.should().add(QueryBuilders.termQuery("from", ContentFromEnum.PLATFORM.getCode()));
      // 商家
       BoolQueryBuilder mCreatorBoolQuery = QueryBuilders.boolQuery();
       creatorBoolQuery.should().add(mCreatorBoolQuery);
       mCreatorBoolQuery.must(QueryBuilders.termQuery("user_id", request.getCreator()));
       mCreatorBoolQuery.must(QueryBuilders.termQuery("from", ContentFromEnum.MERCHANT.getCode()));
       }
      if (log.isDebugEnabled()) {
       log.debug("内容搜索,query={}", source);
       }
      // 5. 内容查询
       SearchResponse searchResponse;
      try {
       searchResponse = highLevelClient.search(searchRequest, RequestOptions.DEFAULT);
       } catch (Exception e) {
       log.error("内容搜索失败,request={}", JsonUtils.toJson(request));
      throw new ContentCenterException(EnumErrorCode.C_DEFAULT_ERR, e);
       }
      // 6. 搜索结果处理
       RestStatus status = searchResponse.status();
      if (!status.equals(RestStatus.OK)) {
       log.error("内容搜索失败,request={}, status={}", JsonUtils.toJson(request), status);
      throw new ContentCenterException(EnumErrorCode.C_DEFAULT_ERR);
       }
      // 总数量
       SearchHits hits = searchResponse.getHits();
      long total = hits.getTotalHits().value;
      if (total == 0) {
      return page;
       }
       page.setTotal(total);
      // 数据记录
       List records = new ArrayList<>(request.getSize());
       page.setRecords(records);
       SearchHit[] hitArr = hits.getHits();
      for (SearchHit hit : hitArr) {
       ContentDocument document = JsonUtils.fromJson(hit.getSourceAsString(), ContentDocument.class);
      if (Objects.isNull(document)) continue;
       ContentSearchResponse response = new ContentSearchResponse();
       BeanUtils.copyProperties(document, response);
       page.getRecords().add(response);
       }
      return page;
       }
      @Override
      public ContentSearchSuggestResponse suggest(ContentSearchSuggestRequest request) {
      // 1. 查询请求
       SearchRequest searchRequest = new SearchRequest(index);
       SearchSourceBuilder source = new SearchSourceBuilder();
       searchRequest.source(source);
       source.size(0);
      // 2. 查询条件
       BoolQueryBuilder boolQuery = QueryBuilders.boolQuery();
       source.query(boolQuery);
      // 内容状态
       boolQuery.filter().add(QueryBuilders.termQuery("content_status", ContentStatusEnum.ENABLED.getStatus()));
      // 关键字
       BoolQueryBuilder keywordBoolQuery = QueryBuilders.boolQuery();
       boolQuery.must().add(keywordBoolQuery);
      // 内容标题
       keywordBoolQuery.should().add(QueryBuilders.wildcardQuery("title", StringUtils.join("*", request.getKeyword(), "*")));
      // 内容简介
       keywordBoolQuery.should().add(QueryBuilders.termQuery("profile", request.getKeyword()));
      // 内容类型
       boolQuery.filter().add(QueryBuilders.termQuery("content_type", request.getContentType()));
      // 应用权限
      if (Objects.nonNull(request.getApp())) {
       Set scopes = AppPermissionBitComputeUtils.getAllPermissionValueSetByAppPermissionBit(request.getApp());
       scopes.add(0);
       boolQuery.filter().add(QueryBuilders.termsQuery("scope", scopes));
       }
      // 内容发布人
      if (StringUtils.isNotBlank(request.getCreator())) {
       BoolQueryBuilder creatorBoolQuery = QueryBuilders.boolQuery();
       boolQuery.must().add(creatorBoolQuery);
      // 平台
       creatorBoolQuery.should().add(QueryBuilders.termQuery("from", ContentFromEnum.PLATFORM.getCode()));
      // 商家
       BoolQueryBuilder mCreatorBoolQuery = QueryBuilders.boolQuery();
       creatorBoolQuery.should().add(mCreatorBoolQuery);
       mCreatorBoolQuery.must(QueryBuilders.termQuery("user_id", request.getCreator()));
       mCreatorBoolQuery.must(QueryBuilders.termQuery("from", ContentFromEnum.MERCHANT.getCode()));
       }
      // 聚合
       TermsAggregationBuilder aggs = AggregationBuilders.terms("title-terms")
       .field("title").size(10).order(BucketOrder.compound(BucketOrder.count(false), BucketOrder.key(false)));
       source.aggregation(aggs);
      if (log.isDebugEnabled()) {
       log.debug("内容搜索提示,query={}", source);
       }
      // 3. 内容查询
       SearchResponse searchResponse;
      try {
       searchResponse = highLevelClient.search(searchRequest, RequestOptions.DEFAULT);
       } catch (Exception e) {
       log.error("内容搜索提示查询失败,request={}", request);
      throw new ContentCenterException(EnumErrorCode.C_DEFAULT_ERR, e);
       }
      // 4. 搜索结果处理
       RestStatus status = searchResponse.status();
      if (!status.equals(RestStatus.OK)) {
       log.error("内容搜索提示查询失败,request={}, status={}", request, status);
      throw new ContentCenterException(EnumErrorCode.C_DEFAULT_ERR);
       }
       ContentSearchSuggestResponse response = new ContentSearchSuggestResponse(request.getContentType(),
      new ArrayList<>(10));
       Aggregations aggregations = searchResponse.getAggregations();
       ParsedStringTerms terms = aggregations.get("title-terms");
       List buckets = terms.getBuckets();
       for (Terms.Bucket bucket : buckets) {
       response.getSuggests().add(bucket.getKeyAsString());
       }
      return response;
       }
      }
  
 

文章来源: blog.csdn.net,作者:血煞风雨城2018,版权归原作者所有,如需转载,请联系作者。

原文链接:blog.csdn.net/qq_31905135/article/details/109814141

【版权声明】本文为华为云社区用户转载文章,如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

作者其他文章

评论(0

抱歉,系统识别当前为高风险访问,暂不支持该操作

    全部回复

    上滑加载中

    设置昵称

    在此一键设置昵称,即可参与社区互动!

    *长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

    *长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。