Spark 操作 Elasticsearch 性能优化

举报
痩风 发表于 2020/08/04 14:26:57 2020/08/04
【摘要】 Elasticsearch 在对大批量数据进行统计、聚合等操作时,性能较差。在大批量数据下的统计、聚合、排序等场景,可借助 Spark 提升运算性能。

1 - 背景说明

Elasticsearch 在对大批量数据进行统计、聚合等操作时,性能差,主要原因有:

  1. ES 是通过 批量加载数据到内存中,然后进行计算的,其 scroll.size 的默认最大值为 10000,超过此值就会报错 —— 需要修改配置文件;

  2. ES 使用 JVM 堆内存进行计算,但官方建议单个 ES 实例的堆内存要低于 32 GB(不能等于),否则将有资源的浪费、性能的损耗 —— 主要与 JVM 的指针压缩算法有关。

基于此,在大批量数据下的统计、聚合、排序等场景,可借助 Spark 提升运算性能。

2 - 开发方法

2.1 引入依赖

ES 官方提供了一款工具:elasticsearch-hadoop,通过 Maven 引入即可使用。

<!-- 我们可以引入 elasticsearch-hadoop 下的 elasticsearch-spark -->
<dependency>
    <groupId>org.elasticsearch</groupId>
    <artifactId>elasticsearch-spark-20_2.11</artifactId>
    <version>7.6.0</version>
</dependency>

可参考官方文档:Elasticsearch for Apache Hadoop 7.6 » Apache Spark

2.2 代码开发

开发步骤:

  1. 创建 SparkContext;

  2. 从 ES 中加载数据,分组统计,然后排序;

  3. 将排序后的结果保存到 HDFS 中。

示例代码如下:

public static void main(String[] args) {
    LOG.info("***** Start to run the Spark on ES test.");

    try {
        // Create a configuration class SparkConf,
        // meanwhile set the Secure configuration that the Elasticsearch Cluster needed,
        // finally create a SparkContext.
        SparkConf conf = new SparkConf()
            .setAppName("SparkOnEs")
            .set("es.nodes", esServerHost)
            // when you specified in es.nodes, then es.port is not necessary
            // .set("es.port", "24100")
            .set("es.nodes.discovery", "true")
            .set("es.index.auto.create", "true")
            .set("es.internal.spark.sql.pushdown", "true")
            // 要返回的字段,多个时以“,”分隔,此配置的性能要高于 es.read.field.include
            .set("es.read.source.filter", "name,age")
            // 滚动查询时,批大小最大值默认为10000
            .set("es.scroll.size", "10000")
            // 每个分区最多处理100万条数据
            .set("es.input.max.docs.per.partition", "1000000");

        JavaSparkContext jsc = new JavaSparkContext(conf);
        // Group data from ES
        groupBySpark(jsc); 

        jsc.stop();
    } catch (IOException e) {
        LOG.error("***** There are exceptions in main.", e);
    }
}

/**
 * 1. query all data from ES by JavaEsSpark,
 * 2. group by specified field and sort the result,
 * 3. save the result to HDFS or local files
 *
 * @param jsc the Java Spark Context which has already initialization
 */
public static void groupBySpark(JavaSparkContext jsc) {
    long begin = System.currentTimeMillis();
    JavaPairRDD<string, map> pairRDD = JavaEsSpark.esRDD(jsc, esIndex);

    // 根据 age 字段进行分组
    final String field = "age";
    JavaPairRDDresultRdd = pairRDD.mapPartitionsToPair(
        new PairFlatMapFunction<iterator<tuple2<string, map>>, String, Long>() {
            @Override
            public Iterator<tuple2> call(
                Iterator<tuple2<string, map>> iterator) throws Exception {
                List<tuple2> list = new ArrayList<>(10000);
                iterator.forEachRemaining(
                    row -> list.add(new Tuple2<>(row._2.get(field).toString(), 1L)));
                return list.iterator();
            }
        })
        .reduceByKey((v1, v2) -> (v1 + v2))
        .mapToPair(row -> new Tuple2<>(row._2, row._1))
        // 对不同年龄的人数,倒序排序
        .sortByKey(false)
        .mapToPair(row -> new Tuple2<>(row._2, row._1));

    long end = System.currentTimeMillis();
    long spentTime = end - begin;
    LOG.info("***** GroupBy data from ES successful, spent time: {} ms", spentTime);

    resultRdd.saveAsTextFile("/user/spark-on-es/group-result/");
    LOG.info("***** Save all result to HDFS successful.");
}

3 - 运行任务

3.1 打包项目

在 FI 8.0.0 中下载 Spark 客户端,获取样例代码后解压,用 IDEA 打开:File -> Open,选中项目的 pom.xml 文件 -> OK,即可完成项目的导入。

(1)修改项目中的相关配置,与要测试集群中的信息一致;

(2) 通过 IDEA 自带的 Maven 工具,打包项目,生成 target\SparkOnES-1.0.jar

(3)将打包生成的 jar 包上传到 Spark 客户端所在的服务器下,这里以 /opt/spark-on-es/ 为例;

(4)将 esParams.propertiesuser.keytabkrb5.conf 三个文件上传到 /opt/spark-on-es/ 下;

(5)将项目所需的 jar 包上传到 /opt/spark-on-es/libs/ 下。

说明:样例代码运行至少需要如下 jar 包,请从 Elasticsearch 的客户端、Maven 中心仓等处获取相关包。

3.2 client 模式提交 Spark 任务

运行命令如下:

cd /opt/spark-on-es/
# 下述命令为一条命令,其中 /opt/spark-on-es/libs/ 是外部依赖的jar包路径:
spark-submit --class com.huawei.bigdata.spark.examples.SparkOnEs \
--master yarn --deploy-mode client \
--jars $(files=(/opt/spark-on-es/libs/*.jar); IFS=,; echo "${files[*]}") \
./SparkOnEs-1.0.jar

3.3 cluster 模式提交 Spark 任务

运行命令如下:

cd /opt/spark-on-es/
# 下述命令为一条命令,其中 --files 参数指定配置文件:
spark-submit --class com.huawei.bigdata.spark.examples.SparkOnEs \
--master yarn --deploy-mode cluster \
--jars $(files=(/opt/spark-on-es/libs/*.jar); IFS=,; echo "${files[*]}") \
--files ./user.keytab,./krb5.conf,./esParams.properties \
--driver-memory 6g \
--executor-cores 5 \
--num-executors 150 \
--executor-memory 5g \
./SparkOnEs-1.0.jar

3.4 查看运行结果

(1) 查询 Elasticsearch 中的数据:

# 安全模式集群下:kinit认证后,查看Elasticsearch中的index:
curl -XGET --tlsv1.2 --negotiate -k -u : 'https://10.10.10.11:24100/_cat/indices?v'

# 普通模式集群下:使用http(而非https)查询即可:
curl -XGET 'http://10.10.10.11:24100/_cat/indices?v'

# 通过下述命令对people索引中的数据进行范围查询:
curl -XPOST 'http://10.10.10.11:24100/people/_search?pretty' -H 'Content-Type:application/json' -d '
{
    "query": {
        "range": {
            "createdTime": {"gte": "2010-01-01T00:00:00Z", "lt": "2015-12-31T23:59:59Z"}
        }
    }
}'

注:Elasticsearch相关查询命令,请参考【业务操作指南】-【Elasticsearch】-【Linux下curl命令的使用】。

(2) 查询 HDFS 中的分组文件:

样例代码中将分组结果保存到 HDFS 中,安装客户端、kinit 认证后,可通过下述命令进行统计:

# 查看所有的文件,及其大小:
[root@10.10.10.11 spark-on-es]# hdfs dfs -du -s -h /user/spark-on-es/result/*
0         0        /user/spark-on-es/result/_SUCCESS
709       2.1 K    /user/spark-on-es/result/part-00000
3.5 K     10.4 K   /user/spark-on-es/result/part-00001
2.1 K     6.4 K    /user/spark-on-es/result/part-00002
3.1 K     9.4 K    /user/spark-on-es/result/part-00003
......

# 统计结果集的个数:
[root@10.10.10.11 spark-on-es]# hdfs dfs -cat /user/spark-on-es/result/* | wc -l
2000000

# 查看某个文件中的内容:
[root@10.10.10.11 spark-on-es]# hdfs dfs -cat /user/spark-on-es/result/part-00000
(573267,99)
(1929095,98)

4 - 性能优化方法

此样例代码的性能瓶颈:Spark 读取 Elasticsearch 中全量数据的过程,耗时最久,优化思路有:

1)增加索引的分片个数:elasticsearch-spark 工具读取 Elasticsearch 中的数据时,任务的并行度默认是索引的分片个数,因此分片个数越多,并行度越高;

Elasticsearch 中索引的分片个数不宜太大,此时可通过 es.input.max.docs.per.partition 参数规划 Spark 读取 Elasticsearch 中数据的 Partition 个数,也可提升并行度。(详见示例代码)

2)增大 scroll.size 的值:elasticsearch-spark 工具通过 Scroll 滚动读取数据,其大小默认是50,可以提高至10000,建议不高于50000,否则容易产生 OOM;

3)合理使用 Yarn 资源:参考 《产品文档-业务操作指南-Yarn-性能调优-节点配置调优》 中的说明,合理设置 Yarn 的资源,以 cluster 模式运行任务时,应适当修改 spark-submit 的参数:

--driver-memory 6g     ## Driver的内存大小
--executor-cores 5     ## 每个Executor可用的 CPU 核数
--num-executors 150    ## Executor的个数,num-executors * executor-cores 不能超过 Yarn的VCores数
--executor-memory 5g   ## 每个Executor的内存大小,num-executors * executor-memory 不能超过Yarn的最大内存

5 - 参考资料

Elasticsearch for Apache Hadoop 7.6 » Configuration

6 - 其他代码参考

通过 Scroll 滚动查询大批量数据的逻辑:
    /**
     * Query data from ES by ES rest client
     */
    private static void queryDataByRestHighLevelClient(JavaSparkContext jsc) {
        LOG.info("=====> Query data from ES by Rest High Level Client beginning...");
        LOG.info("=====> Query string: {}", esQueryJsonString);

        long begin = System.currentTimeMillis();
        List<Map<String, Object>> resultMap = new ArrayList<>(1024 * 100);

        try {
            // query data by scroll api, avoid the OOM
            SearchRequest searchRequest = new SearchRequest(index);
            final Scroll scroll = new Scroll(TimeValue.timeValueMinutes(1L));
            searchRequest.scroll(scroll);

            // set the size of result, note: if the number of size was too large, may cause the OOM
            SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder().size(2000);
            searchSourceBuilder.query(QueryBuilders.rangeQuery(esQueryField).gte(esQueryRangeBegin).lt(esQueryRangeEnd));
            String[] includeFields = new String[] {"id", "name", "birthday"};
            String[] excludeFields = new String[] {"age"};
            searchSourceBuilder.fetchSource(includeFields, excludeFields);

            searchRequest.source(searchSourceBuilder);
            SearchResponse searchResponse = highLevelClient.search(searchRequest, RequestOptions.DEFAULT);

            String scrollId = searchResponse.getScrollId();
            SearchHit[] searchHits = searchResponse.getHits().getHits();

            while (searchHits != null && searchHits.length > 0) {
                for (SearchHit hit : searchHits) {
                    Map<String, Object> source = hit.getSourceAsMap();
                    resultMap.add(source);
                }

                // continue scroll search
                SearchScrollRequest scrollRequest = new SearchScrollRequest(scrollId);
                scrollRequest.scroll(scroll);
                searchResponse = highLevelClient.scroll(scrollRequest, RequestOptions.DEFAULT);
                scrollId = searchResponse.getScrollId();
                searchHits = searchResponse.getHits().getHits();
            }
        } catch (Exception e) {
            LOG.error("***** Query data failed, exception occurred.", e);
        }

        JavaRDD<Map<String, Object>> rdd = jsc.parallelize(resultMap);

        long end = System.currentTimeMillis();
        long spentTime = end - begin;
        LOG.info("=====> Query data from ES by Rest High Level Client, rdd's count: {}, spent time: {} ms",
                rdd.count(), spentTime);
    }


通过 bulkPut 创建大量测试数据的逻辑:
    /**
     * Put data by a bulk request
     *
     * @param restClient the Client of Elasticsearch
     */
    private static void putDataByBulk(RestClient restClient) {
        LOG.info("***** Bulk put data beginning...");
        // total number of documents need to index
        long totalRecordNum = 100000;
        // number of document per bulk request
        long oneCommit = 500;
        long circleNumber = totalRecordNum / oneCommit;
        StringEntity entity;
        Gson gson = new Gson();
        Map<String, Object> esMap = new HashMap<>();
        String str = "{ \"index\" : { \"_index\" : \"" + index + "\", \"_type\" :  \"" + type + "\"} }";

        for (int i = 0; i < circleNumber; i++) {
            StringBuilder builder = new StringBuilder();
            for (int j = 1; j <= oneCommit; j++) {
                esMap.clear();
                esMap.put("id", (i * oneCommit + j) + "");
                esMap.put("name", getName());
                esMap.put("age", ThreadLocalRandom.current().nextInt(1, 30));
                esMap.put("birthday", getBirthday());

                /* esMap.clear();
                 id = i * oneCommit + j + "";
                 esMap.put("id", id);
                 esMap.put("name", "name-" + id);
                 esMap.put("age", ThreadLocalRandom.current().nextInt(1, 30));
                 esMap.put("birthday", new Date());
                 */

                String strJson = gson.toJson(esMap);
                builder.append(str).append("\n");
                builder.append(strJson).append("\n");
            }
            entity = new StringEntity(builder.toString(), ContentType.APPLICATION_JSON);
            entity.setContentEncoding("UTF-8");

            Response response;
            try {
                Request request = new Request("PUT", "/_bulk");
                request.addParameter("pretty", "true");
                request.setEntity(entity);
                response = restClientTest.performRequest(request);
                if (HttpStatus.SC_OK == response.getStatusLine().getStatusCode()) {
                    LOG.info("***** Already input documents: " + oneCommit * i);
                } else {
                    LOG.error("***** Bulk failed.");
                }
                LOG.info("Bulk response entity is : " + EntityUtils.toString(response.getEntity()));
            } catch (Exception e) {
                LOG.error("Bulk failed, exception occurred.", e);
            }
        }
    }

    // 较真实的数据
    private static String[] firstName = {"Jenny", "James", "Linda", "Judy", "Karen", "Kelly", "Margaret", "Rose", "Nora", "Wendy"};
    private static String[] lastName = {"Abel", "Abraham", "Kent", "Brown", "White", "Cotton", "Hawk", "George", "Henry", "David"};

    private static String getName() {
        int index = ThreadLocalRandom.current().nextInt(0, 10);
        return firstName[index] + " " + lastName[index];
    }

    private static String getBirthday() {
        ThreadLocalRandom random = ThreadLocalRandom.current();
        int year = random.nextInt(1990, 2021);
        int month = random.nextInt(1, 13);
        int day = random.nextInt(1, 29);

        int hour = random.nextInt(0, 24);
        int minute = random.nextInt(0, 60);
        int second = random.nextInt(0, 60);

        LocalDateTime time = LocalDateTime.of(year, Month.of(month), day, hour, minute, second);
        DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
        return formatter.format(time);
    }

}
【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。