Spark 操作 Elasticsearch 性能优化
1 - 背景说明
Elasticsearch 在对大批量数据进行统计、聚合等操作时,性能差,主要原因有:
ES 是通过 批量加载数据到内存中,然后进行计算的,其 scroll.size 的默认最大值为 10000,超过此值就会报错 —— 需要修改配置文件;
ES 使用 JVM 堆内存进行计算,但官方建议单个 ES 实例的堆内存要低于 32 GB(不能等于),否则将有资源的浪费、性能的损耗 —— 主要与 JVM 的指针压缩算法有关。
基于此,在大批量数据下的统计、聚合、排序等场景,可借助 Spark 提升运算性能。
2 - 开发方法
2.1 引入依赖
ES 官方提供了一款工具:elasticsearch-hadoop
,通过 Maven 引入即可使用。
<!-- 我们可以引入 elasticsearch-hadoop 下的 elasticsearch-spark -->
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch-spark-20_2.11</artifactId>
<version>7.6.0</version>
</dependency>
可参考官方文档:Elasticsearch for Apache Hadoop 7.6 » Apache Spark
2.2 代码开发
开发步骤:
创建 SparkContext;
从 ES 中加载数据,分组统计,然后排序;
将排序后的结果保存到 HDFS 中。
示例代码如下:
public static void main(String[] args) {
LOG.info("***** Start to run the Spark on ES test.");
try {
// Create a configuration class SparkConf,
// meanwhile set the Secure configuration that the Elasticsearch Cluster needed,
// finally create a SparkContext.
SparkConf conf = new SparkConf()
.setAppName("SparkOnEs")
.set("es.nodes", esServerHost)
// when you specified in es.nodes, then es.port is not necessary
// .set("es.port", "24100")
.set("es.nodes.discovery", "true")
.set("es.index.auto.create", "true")
.set("es.internal.spark.sql.pushdown", "true")
// 要返回的字段,多个时以“,”分隔,此配置的性能要高于 es.read.field.include
.set("es.read.source.filter", "name,age")
// 滚动查询时,批大小最大值默认为10000
.set("es.scroll.size", "10000")
// 每个分区最多处理100万条数据
.set("es.input.max.docs.per.partition", "1000000");
JavaSparkContext jsc = new JavaSparkContext(conf);
// Group data from ES
groupBySpark(jsc);
jsc.stop();
} catch (IOException e) {
LOG.error("***** There are exceptions in main.", e);
}
}
/**
* 1. query all data from ES by JavaEsSpark,
* 2. group by specified field and sort the result,
* 3. save the result to HDFS or local files
*
* @param jsc the Java Spark Context which has already initialization
*/
public static void groupBySpark(JavaSparkContext jsc) {
long begin = System.currentTimeMillis();
JavaPairRDD<string, map> pairRDD = JavaEsSpark.esRDD(jsc, esIndex);
// 根据 age 字段进行分组
final String field = "age";
JavaPairRDDresultRdd = pairRDD.mapPartitionsToPair(
new PairFlatMapFunction<iterator<tuple2<string, map>>, String, Long>() {
@Override
public Iterator<tuple2> call(
Iterator<tuple2<string, map>> iterator) throws Exception {
List<tuple2> list = new ArrayList<>(10000);
iterator.forEachRemaining(
row -> list.add(new Tuple2<>(row._2.get(field).toString(), 1L)));
return list.iterator();
}
})
.reduceByKey((v1, v2) -> (v1 + v2))
.mapToPair(row -> new Tuple2<>(row._2, row._1))
// 对不同年龄的人数,倒序排序
.sortByKey(false)
.mapToPair(row -> new Tuple2<>(row._2, row._1));
long end = System.currentTimeMillis();
long spentTime = end - begin;
LOG.info("***** GroupBy data from ES successful, spent time: {} ms", spentTime);
resultRdd.saveAsTextFile("/user/spark-on-es/group-result/");
LOG.info("***** Save all result to HDFS successful.");
}
3 - 运行任务
3.1 打包项目
在 FI 8.0.0 中下载 Spark 客户端,获取样例代码后解压,用 IDEA 打开:File -> Open,选中项目的 pom.xml 文件 -> OK,即可完成项目的导入。
(1)修改项目中的相关配置,与要测试集群中的信息一致;
(2) 通过 IDEA 自带的 Maven 工具,打包项目,生成 target\SparkOnES-1.0.jar
;
(3)将打包生成的 jar 包上传到 Spark 客户端所在的服务器下,这里以 /opt/spark-on-es/
为例;
(4)将 esParams.properties
、user.keytab
、krb5.conf
三个文件上传到 /opt/spark-on-es/
下;
(5)将项目所需的 jar 包上传到 /opt/spark-on-es/libs/
下。
说明:样例代码运行至少需要如下 jar 包,请从 Elasticsearch 的客户端、Maven 中心仓等处获取相关包。
3.2 client 模式提交 Spark 任务
运行命令如下:
cd /opt/spark-on-es/
# 下述命令为一条命令,其中 /opt/spark-on-es/libs/ 是外部依赖的jar包路径:
spark-submit --class com.huawei.bigdata.spark.examples.SparkOnEs \
--master yarn --deploy-mode client \
--jars $(files=(/opt/spark-on-es/libs/*.jar); IFS=,; echo "${files[*]}") \
./SparkOnEs-1.0.jar
3.3 cluster 模式提交 Spark 任务
运行命令如下:
cd /opt/spark-on-es/
# 下述命令为一条命令,其中 --files 参数指定配置文件:
spark-submit --class com.huawei.bigdata.spark.examples.SparkOnEs \
--master yarn --deploy-mode cluster \
--jars $(files=(/opt/spark-on-es/libs/*.jar); IFS=,; echo "${files[*]}") \
--files ./user.keytab,./krb5.conf,./esParams.properties \
--driver-memory 6g \
--executor-cores 5 \
--num-executors 150 \
--executor-memory 5g \
./SparkOnEs-1.0.jar
3.4 查看运行结果
(1) 查询 Elasticsearch 中的数据:
# 安全模式集群下:kinit认证后,查看Elasticsearch中的index:
curl -XGET --tlsv1.2 --negotiate -k -u : 'https://10.10.10.11:24100/_cat/indices?v'
# 普通模式集群下:使用http(而非https)查询即可:
curl -XGET 'http://10.10.10.11:24100/_cat/indices?v'
# 通过下述命令对people索引中的数据进行范围查询:
curl -XPOST 'http://10.10.10.11:24100/people/_search?pretty' -H 'Content-Type:application/json' -d '
{
"query": {
"range": {
"createdTime": {"gte": "2010-01-01T00:00:00Z", "lt": "2015-12-31T23:59:59Z"}
}
}
}'
注:Elasticsearch相关查询命令,请参考【业务操作指南】-【Elasticsearch】-【Linux下curl命令的使用】。
(2) 查询 HDFS 中的分组文件:
样例代码中将分组结果保存到 HDFS 中,安装客户端、kinit 认证后,可通过下述命令进行统计:
# 查看所有的文件,及其大小:
[root@10.10.10.11 spark-on-es]# hdfs dfs -du -s -h /user/spark-on-es/result/*
0 0 /user/spark-on-es/result/_SUCCESS
709 2.1 K /user/spark-on-es/result/part-00000
3.5 K 10.4 K /user/spark-on-es/result/part-00001
2.1 K 6.4 K /user/spark-on-es/result/part-00002
3.1 K 9.4 K /user/spark-on-es/result/part-00003
......
# 统计结果集的个数:
[root@10.10.10.11 spark-on-es]# hdfs dfs -cat /user/spark-on-es/result/* | wc -l
2000000
# 查看某个文件中的内容:
[root@10.10.10.11 spark-on-es]# hdfs dfs -cat /user/spark-on-es/result/part-00000
(573267,99)
(1929095,98)
4 - 性能优化方法
此样例代码的性能瓶颈:Spark 读取 Elasticsearch 中全量数据的过程,耗时最久,优化思路有:
1)增加索引的分片个数:elasticsearch-spark 工具读取 Elasticsearch 中的数据时,任务的并行度默认是索引的分片个数,因此分片个数越多,并行度越高;
Elasticsearch 中索引的分片个数不宜太大,此时可通过
es.input.max.docs.per.partition
参数规划 Spark 读取 Elasticsearch 中数据的 Partition 个数,也可提升并行度。(详见示例代码)
2)增大 scroll.size 的值:elasticsearch-spark 工具通过 Scroll 滚动读取数据,其大小默认是50,可以提高至10000,建议不高于50000,否则容易产生 OOM;
3)合理使用 Yarn 资源:参考 《产品文档-业务操作指南-Yarn-性能调优-节点配置调优》 中的说明,合理设置 Yarn 的资源,以 cluster 模式运行任务时,应适当修改 spark-submit 的参数:
--driver-memory 6g ## Driver的内存大小 --executor-cores 5 ## 每个Executor可用的 CPU 核数 --num-executors 150 ## Executor的个数,num-executors * executor-cores 不能超过 Yarn的VCores数 --executor-memory 5g ## 每个Executor的内存大小,num-executors * executor-memory 不能超过Yarn的最大内存
5 - 参考资料
Elasticsearch for Apache Hadoop 7.6 » Configuration
6 - 其他代码参考
通过 Scroll 滚动查询大批量数据的逻辑: /** * Query data from ES by ES rest client */ private static void queryDataByRestHighLevelClient(JavaSparkContext jsc) { LOG.info("=====> Query data from ES by Rest High Level Client beginning..."); LOG.info("=====> Query string: {}", esQueryJsonString); long begin = System.currentTimeMillis(); List<Map<String, Object>> resultMap = new ArrayList<>(1024 * 100); try { // query data by scroll api, avoid the OOM SearchRequest searchRequest = new SearchRequest(index); final Scroll scroll = new Scroll(TimeValue.timeValueMinutes(1L)); searchRequest.scroll(scroll); // set the size of result, note: if the number of size was too large, may cause the OOM SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder().size(2000); searchSourceBuilder.query(QueryBuilders.rangeQuery(esQueryField).gte(esQueryRangeBegin).lt(esQueryRangeEnd)); String[] includeFields = new String[] {"id", "name", "birthday"}; String[] excludeFields = new String[] {"age"}; searchSourceBuilder.fetchSource(includeFields, excludeFields); searchRequest.source(searchSourceBuilder); SearchResponse searchResponse = highLevelClient.search(searchRequest, RequestOptions.DEFAULT); String scrollId = searchResponse.getScrollId(); SearchHit[] searchHits = searchResponse.getHits().getHits(); while (searchHits != null && searchHits.length > 0) { for (SearchHit hit : searchHits) { Map<String, Object> source = hit.getSourceAsMap(); resultMap.add(source); } // continue scroll search SearchScrollRequest scrollRequest = new SearchScrollRequest(scrollId); scrollRequest.scroll(scroll); searchResponse = highLevelClient.scroll(scrollRequest, RequestOptions.DEFAULT); scrollId = searchResponse.getScrollId(); searchHits = searchResponse.getHits().getHits(); } } catch (Exception e) { LOG.error("***** Query data failed, exception occurred.", e); } JavaRDD<Map<String, Object>> rdd = jsc.parallelize(resultMap); long end = System.currentTimeMillis(); long spentTime = end - begin; LOG.info("=====> Query data from ES by Rest High Level Client, rdd's count: {}, spent time: {} ms", rdd.count(), spentTime); } 通过 bulkPut 创建大量测试数据的逻辑: /** * Put data by a bulk request * * @param restClient the Client of Elasticsearch */ private static void putDataByBulk(RestClient restClient) { LOG.info("***** Bulk put data beginning..."); // total number of documents need to index long totalRecordNum = 100000; // number of document per bulk request long oneCommit = 500; long circleNumber = totalRecordNum / oneCommit; StringEntity entity; Gson gson = new Gson(); Map<String, Object> esMap = new HashMap<>(); String str = "{ \"index\" : { \"_index\" : \"" + index + "\", \"_type\" : \"" + type + "\"} }"; for (int i = 0; i < circleNumber; i++) { StringBuilder builder = new StringBuilder(); for (int j = 1; j <= oneCommit; j++) { esMap.clear(); esMap.put("id", (i * oneCommit + j) + ""); esMap.put("name", getName()); esMap.put("age", ThreadLocalRandom.current().nextInt(1, 30)); esMap.put("birthday", getBirthday()); /* esMap.clear(); id = i * oneCommit + j + ""; esMap.put("id", id); esMap.put("name", "name-" + id); esMap.put("age", ThreadLocalRandom.current().nextInt(1, 30)); esMap.put("birthday", new Date()); */ String strJson = gson.toJson(esMap); builder.append(str).append("\n"); builder.append(strJson).append("\n"); } entity = new StringEntity(builder.toString(), ContentType.APPLICATION_JSON); entity.setContentEncoding("UTF-8"); Response response; try { Request request = new Request("PUT", "/_bulk"); request.addParameter("pretty", "true"); request.setEntity(entity); response = restClientTest.performRequest(request); if (HttpStatus.SC_OK == response.getStatusLine().getStatusCode()) { LOG.info("***** Already input documents: " + oneCommit * i); } else { LOG.error("***** Bulk failed."); } LOG.info("Bulk response entity is : " + EntityUtils.toString(response.getEntity())); } catch (Exception e) { LOG.error("Bulk failed, exception occurred.", e); } } } // 较真实的数据 private static String[] firstName = {"Jenny", "James", "Linda", "Judy", "Karen", "Kelly", "Margaret", "Rose", "Nora", "Wendy"}; private static String[] lastName = {"Abel", "Abraham", "Kent", "Brown", "White", "Cotton", "Hawk", "George", "Henry", "David"}; private static String getName() { int index = ThreadLocalRandom.current().nextInt(0, 10); return firstName[index] + " " + lastName[index]; } private static String getBirthday() { ThreadLocalRandom random = ThreadLocalRandom.current(); int year = random.nextInt(1990, 2021); int month = random.nextInt(1, 13); int day = random.nextInt(1, 29); int hour = random.nextInt(0, 24); int minute = random.nextInt(0, 60); int second = random.nextInt(0, 60); LocalDateTime time = LocalDateTime.of(year, Month.of(month), day, hour, minute, second); DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); return formatter.format(time); } }
- 点赞
- 收藏
- 关注作者
评论(0)