Elasticsearch客户端使用指南
前言
ES提供transport client方式访问,默认端口是9300,由于这种访问方式和es节点间的的metadata元数据信息交互使用相同的端口,当业务访问量大的时候会导致es集群的不稳定,在6.x以后的版本中使用transport client的方式官方已经不推荐使用了,并且在7.x 版本中已经废弃掉了这种访问方式。官方推荐是high level客户端的方式使用ES,默认的端口是9200。
以下介绍java和python方式访问ES的方式。
ES客户端连接elasticsearch
ES提供highlevel客户端方式连接Elasticsearch,以下介绍几种连接Elasticsearch客户端的代码示例。
maven引入
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
<version>${elasticsearch_version}</version>
</dependency>
以下给出High Level Client 使用代码。,创建RestHighLevelClient
//create high client
public ESHighLevelClient build() throws IOException {
RestClientBuilder builder = RestClient.builder(new HttpHost(host, port, "http"))
.setRequestConfigCallback(
config -> config.setConnectTimeout(connectTimeout)
.setConnectionRequestTimeout(connectionRequestTimeout)
.setSocketTimeout(socketTimeout))
.setHttpClientConfigCallback(
httpClientBuilder -> {
final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
if (ESHighLevelClient.this.certification) {
credentialsProvider.setCredentials(AuthScope.ANY,
new UsernamePasswordCredentials(username, password));
httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
}
httpClientBuilder.setMaxConnTotal(100);
httpClientBuilder.setMaxConnPerRoute(50);
List<Header> headers = new ArrayList<>(2);
headers.add(new BasicHeader("Connection", "keep-alive"));
headers.add(new BasicHeader("Keep-Alive", "720"));
httpClientBuilder.setDefaultHeaders(headers);
httpClientBuilder.setKeepAliveStrategy(CustomConnectionKeepAliveStrategy.INSTANCE);
try {
DefaultConnectingIOReactor ioReactor = new DefaultConnectingIOReactor();
ioReactor.setExceptionHandler(new IOReactorExceptionHandler() {
@Override
public boolean handle(IOException e) {
logger.warn("System may be unstable: IOReactor encountered a checked exception : "
+ e.getMessage(), e);
return true; // Return true to note this exception as handled, it will not be re-thrown
}
@Override
public boolean handle(RuntimeException e) {
logger.warn("System may be unstable: IOReactor encountered a runtime exception : "
+ e.getMessage(), e);
return true; // Return true to note this exception as handled, it will not be re-thrown
}
});
httpClientBuilder.setConnectionManager(new PoolingNHttpClientConnectionManager(ioReactor));
} catch (IOReactorException e) {
throw new RuntimeException(e);
}
return httpClientBuilder;
}
);
this.client = new RestHighLevelClient(builder);
logger.info("es rest client build success {} ", client);
ClusterHealthRequest request = new ClusterHealthRequest();
ClusterHealthResponse response = this.client.cluster().health(request, RequestOptions.DEFAULT);
logger.info("es rest client health response {} ", response);
return this;
}
public static class CustomConnectionKeepAliveStrategy extends DefaultConnectionKeepAliveStrategy {
public static final CustomConnectionKeepAliveStrategy INSTANCE = new CustomConnectionKeepAliveStrategy();
private CustomConnectionKeepAliveStrategy() {
super();
}
/**
* 最大keep alive的时间(分钟)
* 这里默认为10分钟,可以根据实际情况设置。可以观察客户端机器状态为TIME_WAIT的TCP连接数,如果太多,可以增大此值。
*/
private final long MAX_KEEP_ALIVE_MINUTES = 10;
@Override
public long getKeepAliveDuration(HttpResponse response, HttpContext context) {
long keepAliveDuration = super.getKeepAliveDuration(response, context);
// <0 为无限期keepalive
// 将无限期替换成一个默认的时间
if (keepAliveDuration < 0) {
return TimeUnit.MINUTES.toMillis(MAX_KEEP_ALIVE_MINUTES);
}
return keepAliveDuration;
}
}
springboot集成elasticsearch
ES官方提供的SDK的方式访问ES在使用方式上还是不太方便,封装JSON,处理返回值,构造查询条件,这些都需要大量的代码处理,在spring项目中有没有类似JPA封装JDBC的功能来处理ES的数据?
Spring Data Elasticsearch 是SpringData的子项目。
Spring Data的使命是为数据访问提供熟悉且一致的基于Spring的编程模型,同时仍保留底层数据存储的特殊特性。 SpringData有一系列子项目。
spring-data-elasticsearch
Spring Boot 通过整合Spring Data ElasticSearch为我们提供了非常便捷的检索功能支持Elasticsearch,在新的项目中可以优先采用此方式。
主要特性
- 支持Spring的基于
@Configuration
的java配置方式,或者XML配置方式,实现配置自动注入 - 提供了用于操作ES的便捷工具类**
ElasticsearchTemplate
**。包括实现文档到POJO之间的自动智能映射。 - 利用Spring的数据转换服务实现的功能丰富的对象映射
- 基于注解的元数据映射方式,而且可扩展以支持更多不同的数据格式
- 根据持久层接口自动生成对应实现方法,无需人工编写基本操作代码(类似mybatis,根据接口自动得到实现)。当然,也支持人工定制查询
spring-data-elasticsearch 使用ES high level client连接es,在早期的版本使用transport client。
其中Spring Data Elasticsearch和ES版本一一对应,版本选择上选择匹配版本使用。
Spring Data Release Train | Spring Data Elasticsearch | Elasticsearch | Spring Boot |
---|---|---|---|
2020.0.0[1] | 4.1.x[1] | 7.9.3 | 2.4.x[1] |
Neumann | 4.0.x | 7.6.2 | 2.3.x |
Moore | 3.2.x | 6.8.12 | 2.2.x |
Lovelace | 3.1.x | 6.2.2 | 2.1.x |
Kay[2] | 3.0.x[2] | 5.5.0 | 2.0.x[2] |
Ingalls[2] | 2.1.x[2] | 2.4.0 | 1.5.x[2] |
maven引入Java包
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-elasticsearch</artifactId>
<version>2.3.7.RELEASE</version>
</dependency>
注意springboot版本和elasticsearch版本之间的差异。
定义model
@Document(indexName = "book_info", shards = 3, replicas = 1)
@Data
public class Book {
@Id
private String id;
@Field(type = FieldType.Text, analyzer = "standard")
private String name;
@Field(type = FieldType.Text, analyzer = "standard")
private String author;
@Field(type = FieldType.Date, format = DateFormat.basic_date)
private Date createTime;
@Field(type = FieldType.Date, format = DateFormat.basic_date)
private Date updateTime;
@Field(type = FieldType.Double)
private Double price;
}
id必须定义,其对应底层es中_id 字段,写入数据的时候如果id不赋值,es会自动生成 _id.
定义Repository
使用上和jpa类似,可以使用Repository提供的方法,也可以自定义方法。
public interface BookRepository extends ElasticsearchRepository<Book, String> {
Book findByName(String name);
List<Book> findByAuthor(String author);
Book findBookById(String id);
}
使用
@RestController
@RequestMapping("/book")
public class BookController {
@Autowired
BookRepository bookRepository;
@PostMapping(value = "/add")
public ResponseEntity<String> indexDoc(@RequestBody Book book) {
book.setCreateTime(new Date());
book.setUpdateTime(new Date());
System.out.println("book===" + book);
bookRepository.save(book);
return new ResponseEntity<>("save executed!", HttpStatus.OK);
}
@GetMapping()
public ResponseEntity<Iterable<Book>> getAll() {
Iterable<Book> all = bookRepository.findAll();
return new ResponseEntity<>(all, HttpStatus.OK);
}
@GetMapping(value = "/{name}")
public ResponseEntity<Book> getByName(@PathVariable("name") String name) {
Book book = bookRepository.findByName(name);
return new ResponseEntity<>(book, HttpStatus.OK);
}
@PutMapping(value = "/{id}")
public ResponseEntity<Book> updateBook(@PathVariable("id") String id,
@RequestBody Book updateBook) {
Book book = bookRepository.findBookById(id);
book.setId(updateBook.getId());
book.setName(updateBook.getName());
book.setAuthor(updateBook.getAuthor());
book.setPrice(updateBook.getPrice());
book.setUpdateTime(new Date());
bookRepository.save(book);
return new ResponseEntity<>(book, HttpStatus.OK);
}
@DeleteMapping(value = "/{id}")
public ResponseEntity<String> deleteBook(@PathVariable("id") String id) {
bookRepository.deleteById(id);
return new ResponseEntity<>("delete execute!", HttpStatus.OK);
}
}
python连接ES
直接引入elasticsearch包就可以使用了。
from elasticsearch import Elasticsearch
以下给出简单的代码示例,详细使用可以查看官网进一步了解。
class EsClient:
def __init__(self, url, username, password):
self.url = url
self.username = username
self.password = password
self.es = Elasticsearch([self.url],
sniff_on_connection_fail=False,
http_auth=(self.username, self.password), port=9200)
def createIndex(self, index, properties, number_of_shards=1):
mappings = {
"settings": {
"index": {
"vector": True,
"soft_deletes": {
"enabled": False
},
"number_of_shards": number_of_shards,
"number_of_replicas": 1,
},
},
"mappings": {
properties
}
}
result = self.es.indices.create(index=index, body=mappings)
print(result)
def exists(self, index, _id):
result = self.es.get(index, id=_id)
print(result)
def delete(self, index, _id):
result = self.es.delete(index=index, id=_id, ignore=[400, 404])
print(result)
参考:
1、https://docs.spring.io/spring-data/elasticsearch/docs/current/reference/html/#elasticsearch.clients
- 点赞
- 收藏
- 关注作者
评论(0)