Elasticsearch客户端使用指南

举报
css_blog 发表于 2021/08/02 09:21:06 2021/08/02
【摘要】 前言ES提供transport client方式访问,默认端口是9300,由于这种访问方式和es节点间的的metadata元数据信息交互使用相同的端口,当业务访问量大的时候会导致es集群的不稳定,在6.x以后的版本中使用transport client的方式官方已经不推荐使用了,并且在7.x 版本中已经废弃掉了这种访问方式。官方推荐是high level客户端的方式使用ES,默认的端口是9...

前言

ES提供transport client方式访问,默认端口是9300,由于这种访问方式和es节点间的的metadata元数据信息交互使用相同的端口,当业务访问量大的时候会导致es集群的不稳定,在6.x以后的版本中使用transport client的方式官方已经不推荐使用了,并且在7.x 版本中已经废弃掉了这种访问方式。官方推荐是high level客户端的方式使用ES,默认的端口是9200。

以下介绍java和python方式访问ES的方式。

ES客户端连接elasticsearch

ES提供highlevel客户端方式连接Elasticsearch,以下介绍几种连接Elasticsearch客户端的代码示例。

maven引入

<dependency>
    <groupId>org.elasticsearch.client</groupId>
    <artifactId>elasticsearch-rest-high-level-client</artifactId>
    <version>${elasticsearch_version}</version>
</dependency>

以下给出High Level Client 使用代码。,创建RestHighLevelClient

   //create high client
    public ESHighLevelClient build() throws IOException {
        RestClientBuilder builder = RestClient.builder(new HttpHost(host, port, "http"))
                .setRequestConfigCallback(
                        config -> config.setConnectTimeout(connectTimeout)
                                .setConnectionRequestTimeout(connectionRequestTimeout)
                                .setSocketTimeout(socketTimeout))
                .setHttpClientConfigCallback(
                        httpClientBuilder -> {
                            final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
                            if (ESHighLevelClient.this.certification) {
                                credentialsProvider.setCredentials(AuthScope.ANY,
                                        new UsernamePasswordCredentials(username, password));
                                httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
                            }
                            httpClientBuilder.setMaxConnTotal(100);
                            httpClientBuilder.setMaxConnPerRoute(50);
                            List<Header> headers = new ArrayList<>(2);
                            headers.add(new BasicHeader("Connection", "keep-alive"));
                            headers.add(new BasicHeader("Keep-Alive", "720"));
                            httpClientBuilder.setDefaultHeaders(headers);
                            httpClientBuilder.setKeepAliveStrategy(CustomConnectionKeepAliveStrategy.INSTANCE);

                            try {
                                DefaultConnectingIOReactor ioReactor = new DefaultConnectingIOReactor();
                                ioReactor.setExceptionHandler(new IOReactorExceptionHandler() {
                                    @Override
                                    public boolean handle(IOException e) {
                                        logger.warn("System may be unstable: IOReactor encountered a checked exception : "
                                                + e.getMessage(), e);
                                        return true; // Return true to note this exception as handled, it will not be re-thrown
                                    }

                                    @Override
                                    public boolean handle(RuntimeException e) {
                                        logger.warn("System may be unstable: IOReactor encountered a runtime exception : "
                                                + e.getMessage(), e);
                                        return true; // Return true to note this exception as handled, it will not be re-thrown
                                    }
                                });
                                httpClientBuilder.setConnectionManager(new PoolingNHttpClientConnectionManager(ioReactor));
                            } catch (IOReactorException e) {
                                throw new RuntimeException(e);
                            }
                            return httpClientBuilder;
                        }
                );
        this.client = new RestHighLevelClient(builder);
        logger.info("es rest client build success {} ", client);

        ClusterHealthRequest request = new ClusterHealthRequest();
        ClusterHealthResponse response = this.client.cluster().health(request, RequestOptions.DEFAULT);
        logger.info("es rest client health response {} ", response);
        return this;
    }


    public static class CustomConnectionKeepAliveStrategy extends DefaultConnectionKeepAliveStrategy {
        public static final CustomConnectionKeepAliveStrategy INSTANCE = new CustomConnectionKeepAliveStrategy();

        private CustomConnectionKeepAliveStrategy() {
            super();
        }

        /**
         * 最大keep alive的时间(分钟)
         * 这里默认为10分钟,可以根据实际情况设置。可以观察客户端机器状态为TIME_WAIT的TCP连接数,如果太多,可以增大此值。
         */
        private final long MAX_KEEP_ALIVE_MINUTES = 10;

        @Override
        public long getKeepAliveDuration(HttpResponse response, HttpContext context) {
            long keepAliveDuration = super.getKeepAliveDuration(response, context);
            // <0 为无限期keepalive
            // 将无限期替换成一个默认的时间
            if (keepAliveDuration < 0) {
                return TimeUnit.MINUTES.toMillis(MAX_KEEP_ALIVE_MINUTES);
            }
            return keepAliveDuration;
        }
    }

springboot集成elasticsearch

ES官方提供的SDK的方式访问ES在使用方式上还是不太方便,封装JSON,处理返回值,构造查询条件,这些都需要大量的代码处理,在spring项目中有没有类似JPA封装JDBC的功能来处理ES的数据?

Spring Data Elasticsearch 是SpringData的子项目。

Spring Data的使命是为数据访问提供熟悉且一致的基于Spring的编程模型,同时仍保留底层数据存储的特殊特性。 SpringData有一系列子项目。

spring-data-elasticsearch

Spring Boot 通过整合Spring Data ElasticSearch为我们提供了非常便捷的检索功能支持Elasticsearch,在新的项目中可以优先采用此方式。

主要特性

  • 支持Spring的基于@Configuration的java配置方式,或者XML配置方式,实现配置自动注入
  • 提供了用于操作ES的便捷工具类**ElasticsearchTemplate**。包括实现文档到POJO之间的自动智能映射。
  • 利用Spring的数据转换服务实现的功能丰富的对象映射
  • 基于注解的元数据映射方式,而且可扩展以支持更多不同的数据格式
  • 根据持久层接口自动生成对应实现方法,无需人工编写基本操作代码(类似mybatis,根据接口自动得到实现)。当然,也支持人工定制查询

spring-data-elasticsearch 使用ES high level client连接es,在早期的版本使用transport client。

其中Spring Data Elasticsearch和ES版本一一对应,版本选择上选择匹配版本使用。

Spring Data Release Train Spring Data Elasticsearch Elasticsearch Spring Boot
2020.0.0[1] 4.1.x[1] 7.9.3 2.4.x[1]
Neumann 4.0.x 7.6.2 2.3.x
Moore 3.2.x 6.8.12 2.2.x
Lovelace 3.1.x 6.2.2 2.1.x
Kay[2] 3.0.x[2] 5.5.0 2.0.x[2]
Ingalls[2] 2.1.x[2] 2.4.0 1.5.x[2]

maven引入Java包

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-elasticsearch</artifactId>
    <version>2.3.7.RELEASE</version>
</dependency>

注意springboot版本和elasticsearch版本之间的差异。

定义model

@Document(indexName = "book_info", shards = 3, replicas = 1)
@Data
public class Book {
    @Id
    private String id;

    @Field(type = FieldType.Text, analyzer = "standard")
    private String name;

    @Field(type = FieldType.Text, analyzer = "standard")
    private String author;

    @Field(type = FieldType.Date, format = DateFormat.basic_date)
    private Date createTime;

    @Field(type = FieldType.Date, format = DateFormat.basic_date)
    private Date updateTime;

    @Field(type = FieldType.Double)
    private Double price;
}

id必须定义,其对应底层es中_id 字段,写入数据的时候如果id不赋值,es会自动生成 _id.

定义Repository

使用上和jpa类似,可以使用Repository提供的方法,也可以自定义方法。

public interface BookRepository extends ElasticsearchRepository<Book, String> {

    Book findByName(String name);

    List<Book> findByAuthor(String author);

    Book findBookById(String id);
}

使用


@RestController
@RequestMapping("/book")
public class BookController {
    @Autowired
    BookRepository bookRepository;

    @PostMapping(value = "/add")
    public ResponseEntity<String> indexDoc(@RequestBody Book book) {
        book.setCreateTime(new Date());
        book.setUpdateTime(new Date());
        System.out.println("book===" + book);
        bookRepository.save(book);
        return new ResponseEntity<>("save executed!", HttpStatus.OK);
    }

    @GetMapping()
    public ResponseEntity<Iterable<Book>> getAll() {
        Iterable<Book> all =  bookRepository.findAll();
        return new ResponseEntity<>(all, HttpStatus.OK);
    }

    @GetMapping(value = "/{name}")
    public ResponseEntity<Book> getByName(@PathVariable("name") String name) {
        Book book = bookRepository.findByName(name);
        return new ResponseEntity<>(book, HttpStatus.OK);
    }

    @PutMapping(value = "/{id}")
    public ResponseEntity<Book> updateBook(@PathVariable("id") String id,
                                           @RequestBody Book updateBook) {
        Book book = bookRepository.findBookById(id);
        book.setId(updateBook.getId());
        book.setName(updateBook.getName());
        book.setAuthor(updateBook.getAuthor());
        book.setPrice(updateBook.getPrice());
        book.setUpdateTime(new Date());
        bookRepository.save(book);
        return new ResponseEntity<>(book, HttpStatus.OK);
    }

    @DeleteMapping(value = "/{id}")
    public ResponseEntity<String> deleteBook(@PathVariable("id") String id) {
        bookRepository.deleteById(id);
        return new ResponseEntity<>("delete execute!", HttpStatus.OK);
    }
}

python连接ES

直接引入elasticsearch包就可以使用了。

from elasticsearch import Elasticsearch

以下给出简单的代码示例,详细使用可以查看官网进一步了解。

class EsClient:
    def __init__(self, url, username, password):
        self.url = url
        self.username = username
        self.password = password
        self.es = Elasticsearch([self.url],
                                sniff_on_connection_fail=False,
                                http_auth=(self.username, self.password), port=9200)

    def createIndex(self, index, properties, number_of_shards=1):
        mappings = {
            "settings": {
                "index": {
                    "vector": True,
                    "soft_deletes": {
                        "enabled": False
                    },
                    "number_of_shards": number_of_shards,
                    "number_of_replicas": 1,
                },
            },
            "mappings": {
                properties
            }
        }
        result = self.es.indices.create(index=index, body=mappings)
        print(result)

    def exists(self, index, _id):
        result = self.es.get(index, id=_id)
        print(result)

    def delete(self, index, _id):
        result = self.es.delete(index=index, id=_id, ignore=[400, 404])
        print(result)

参考:

1、https://docs.spring.io/spring-data/elasticsearch/docs/current/reference/html/#elasticsearch.clients

2、https://www.jianshu.com/p/56e755415e63

【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。