建议使用以下浏览器,以获得最佳体验。 IE 9.0+以上版本 Chrome 31+ 谷歌浏览器 Firefox 30+ 火狐浏览器
请选择 进入手机版 | 继续访问电脑版
设置昵称

在此一键设置昵称,即可参与社区互动!

确定
我再想想
选择版块
直达楼层
标签
您还可以添加5个标签
  • 没有搜索到和“关键字”相关的标签
  • 云产品
  • 解决方案
  • 技术领域
  • 通用技术
  • 平台功能
取消

采纳成功

您已采纳当前回复为最佳回复

ES_zhao

发帖: 7粉丝: 1

发消息 + 关注

发表于2019年03月07日 11:29:02 6846 2
直达本楼层的链接
楼主
显示全部楼层
[技术干货] ES的批量bulk导入样例--转发

一、Bulk API

使用bulk命令时,REST API以_bulk结尾,批量操作写在json文件中,官网给出的语法格式:

action_and_meta_data\n

optional_source\n

action_and_meta_data\n

optional_source\n

....

action_and_meta_data\n

optional_source\n


也就是说每一个操作都有2行数据组成,末尾要回车换行。第一行用来说明操作命令和原数据、第二行是自定义的选项.举个例子,同时执行**2条数据、删除一条数据, 新建bulkdata.json,写入如下内容:


{ "create" : { "_index" : "blog", "_type" : "article", "_id" : "3" }}

{ "title":"title1","posttime":"2016-07-02","content":"内容一" }


{ "create" : { "_index" : "blog", "_type" : "article", "_id" : "4" }}

{ "title":"title2","posttime":"2016-07-03","content":"内容2" }


{ "delete":{"_index" : "blog", "_type" : "article", "_id" : "1" }}

执行:


$ curl -XPOST "http://localhost:9200/_bulk?pretty" --data-binary @bulkAdd.json

{

  "took" : 11,

  "errors" : false,

  "items" : [ {

    "create" : {

      "_index" : "blog",

      "_type" : "article",

      "_id" : "13",

      "_version" : 1,

      "_shards" : {

        "total" : 1,

        "successful" : 1,

        "failed" : 0

      },

      "status" : 201

    }

  } ]

}

注意:行末要回车换行,不然会因为命令不能识别而出现错误.


$ curl -XPOST "http://localhost:9200/_bulk?pretty" --data-binary @bulkAdd.json 

{

  "error" : {

    "root_cause" : [ {

      "type" : "action_request_validation_exception",

      "reason" : "Validation Failed: 1: no requests added;"

    } ],

    "type" : "action_request_validation_exception",

    "reason" : "Validation Failed: 1: no requests added;"

  },

  "status" : 400

}


二、批量导出

下面的例子是把索引库中的文档以json格式批量导出到文件中,其中集群名称为”bropen”,索引库名为”blog”,type为”article”,项目根目录下新建files/bulk.txt,索引内容写入bulk.txt中:


import java.io.BufferedWriter;

import java.io.File;

import java.io.FileWriter;

import java.io.IOException;

import java.net.InetAddress;

import java.net.UnknownHostException;


import org.elasticsearch.action.search.SearchResponse;

import org.elasticsearch.client.Client;

import org.elasticsearch.client.transport.TransportClient;

import org.elasticsearch.common.settings.Settings;

import org.elasticsearch.common.transport.InetSocketTransportAddress;

import org.elasticsearch.index.query.QueryBuilder;

import org.elasticsearch.index.query.QueryBuilders;

import org.elasticsearch.search.SearchHits;


public class ElasticSearchBulkOut {


    public static void main(String[] args) {


        try {

            Settings settings = Settings.settingsBuilder()

                    .put("cluster.name", "bropen").build();// cluster.name在elasticsearch.yml


            Client client = TransportClient.builder().settings(settings).build()

                    .addTransportAddress(new InetSocketTransportAddress(

                            InetAddress.getByName("127.0.0.1"), 9300));


            QueryBuilder qb = QueryBuilders.matchAllQuery();

            SearchResponse response = client.prepareSearch("blog")

                    .setTypes("article").setQuery(QueryBuilders.matchAllQuery())

                    .execute().actionGet();

            SearchHits resultHits = response.getHits();


            File article = new File("files/bulk.txt");

            FileWriter fw = new FileWriter(article);

            BufferedWriter bfw = new BufferedWriter(fw);


            if (resultHits.getHits().length == 0) {

                System.out.println("查到0条数据!");


            } else {

                for (int i = 0; i < resultHits.getHits().length; i++) {

                    String jsonStr = resultHits.getHits()[i]

                            .getSourceAsString();

                    System.out.println(jsonStr);

                    bfw.write(jsonStr);

                    bfw.write("\n");

                }

            }

            bfw.close();

            fw.close();


        } catch (UnknownHostException e) {

            e.printStackTrace();

        } catch (IOException e) {

            e.printStackTrace();

        }


    }


}

三、批量导入

从刚才导出的bulk.txt文件中按行读取,然后bulk导入。首先通过调用client.prepareBulk()实例化一个BulkRequestBuilder对象,调用BulkRequestBuilder对象的add方法添加数据。实现代码:


import java.io.BufferedReader;

import java.io.File;

import java.io.FileNotFoundException;

import java.io.FileReader;

import java.io.IOException;

import java.net.InetAddress;

import java.net.UnknownHostException;


import org.elasticsearch.action.bulk.BulkRequestBuilder;

import org.elasticsearch.client.Client;

import org.elasticsearch.client.transport.TransportClient;

import org.elasticsearch.common.settings.Settings;

import org.elasticsearch.common.transport.InetSocketTransportAddress;


public class ElasticSearchBulkIn {


    public static void main(String[] args) {


        try {


            Settings settings = Settings.settingsBuilder()

                    .put("cluster.name", "bropen").build();// cluster.name在elasticsearch.yml中配置


            Client client = TransportClient.builder().settings(settings).build()

                    .addTransportAddress(new InetSocketTransportAddress(

                            InetAddress.getByName("127.0.0.1"), 9300));


            File article = new File("files/bulk.txt");

            FileReader fr=new FileReader(article);

            BufferedReader bfr=new BufferedReader(fr);

            String line=null;

            BulkRequestBuilder bulkRequest=client.prepareBulk();

            int count=0;

            while((line=bfr.readLine())!=null){

                bulkRequest.add(client.prepareIndex("test","article").setSource(line));

                if (count%10==0) {

                    bulkRequest.execute().actionGet();

                }

                count++;

                //System.out.println(line);

            }

            bulkRequest.execute().actionGet();


            bfr.close();

            fr.close();

        } catch (UnknownHostException e) {

            e.printStackTrace();

        } catch (FileNotFoundException e) {

            e.printStackTrace();

        } catch (IOException e) {

            e.printStackTrace();

        }


    }


}


参考文档:Elasticsearch Reference [2.3] » Document APIs » Bulk API

原文:https://blog.csdn.net/napoay/article/details/51907709 



Elasticsearch

举报
分享

分享文章到朋友圈

分享文章到微博

采纳成功

您已采纳当前回复为最佳回复

Joey啊

发帖: 84粉丝: 16

级别 : 版主

发消息 + 关注

发表于2019年08月30日 11:17:55
直达本楼层的链接
沙发
显示全部楼层

点赞 评论 引用 举报

采纳成功

您已采纳当前回复为最佳回复

Joey啊

发帖: 84粉丝: 16

级别 : 版主

发消息 + 关注

发表于2019年08月30日 11:17:16
直达本楼层的链接
板凳
显示全部楼层

点赞 评论 引用 举报

游客

富文本
Markdown
您需要登录后才可以回帖 登录 | 立即注册

结贴

您对问题的回复是否满意?
满意度
非常满意 满意 一般 不满意
我要反馈
0/200