elasticsearch学习二:导入数据

举报
仙士可 发表于 2023/06/30 12:16:28 2023/06/30
【摘要】 安装go-elasticsearch,gormgo get -u github.com/jinzhu/gormgo get github.com/elastic/go-elasticsearch/v7复制我的是v7版本,所以elasticsearch使用v7,如果是v8则改成v8go-elasticsearch和gorm操作func EsClient() *elasticsearch.Cli...

安装go-elasticsearch,gorm

go get -u github.com/jinzhu/gorm
go get github.com/elastic/go-elasticsearch/v7
复制

我的是v7版本,所以elasticsearch使用v7,如果是v8则改成v8

go-elasticsearch和gorm操作

func EsClient() *elasticsearch.Client {
   cfg := elasticsearch.Config{
      Addresses: []string{
         "http://127.0.0.1:9200",
      },
   }

   es, err := elasticsearch.NewClient(cfg)
   if err != nil {
      log.Fatalf("Error creating the client: %s", err)
   }

   // Get cluster info
   //
   var r map[string]interface{}
   res, err := es.Info()
   if err != nil {
      log.Fatalf("Error getting response: %s", err)
   }
   // Check response status
   if res.IsError() {
      log.Fatalf("Error: %s", res.String())
   }
   // Deserialize the response into a map.
   if err := json.NewDecoder(res.Body).Decode(&r); err != nil {
      log.Fatalf("Error parsing the response body: %s", err)
   }
   // Print client and server version numbers.
   log.Printf("Client: %s", elasticsearch.Version)
   log.Printf("Server: %s", r["version"].(map[string]interface{})["number"])
   log.Println(strings.Repeat("~", 37))
   
   return es
}

func Db() *gorm.DB {
   db, err := gorm.Open("mysql", "robot:bK8D6pAx82iTSWrK@(admin.easyswoole.cn:3306)/robot?charset=utf8mb4&parseTime=True&loc=Local")
   if err != nil {
      panic(err)
   }
   return db
}
复制

elasticsearch导入

在7.0之后的版本,一个index只允许一个type,所以不需要额外定义type

func AddEsData(es *elasticsearch.Client, info LogInfo) {
   id := info.Id
   // Build the request body.
   data, err := json.Marshal(info)
   if err != nil {
      log.Fatalf("Error marshaling document: %s", err)
   }

   // Set up the request object.
   req := esapi.IndexRequest{
      Index:      "test",
      DocumentID: strconv.Itoa(id),
      Body:       bytes.NewReader(data),
      Refresh:    "true",
   }

   // Perform the request with the client.
   res, err := req.Do(context.Background(), es)

   if err != nil {
      log.Fatalf("Error getting response: %s", err)
   }
   defer res.Body.Close()

   if res.IsError() {
      fmt.Println(res)
      log.Printf("[%s] Error indexing document ID=%d", res.Status(), info.Id)
   } else {
      // Deserialize the response into a map.
      var r map[string]interface{}
      if err := json.NewDecoder(res.Body).Decode(&r); err != nil {
         log.Printf("Error parsing the response body: %s", err)
      } else {
         // Print the response status and indexed document version.
         //log.Printf("[%s] %s; version=%d", res.Status(), r["result"], int(r["_version"].(float64)))
      }
   }
}
复制

总的代码:

package main

import (
   "bytes"
   "context"
   "encoding/json"
   "fmt"
   "github.com/elastic/go-elasticsearch/v7"
   "github.com/elastic/go-elasticsearch/v7/esapi"
   "github.com/jinzhu/gorm"
   _ "github.com/jinzhu/gorm/dialects/mysql"
   "log"
   "strconv"
   "strings"
   "sync"
)

type LogInfo struct {
   Id               int    `gorm:"primary_key"`
   Type_name        string `gorm:""`
   Type             string `gorm:""`
   Sub_type         string `gorm:""`
   Sub_type_name    string `gorm:""`
   Time             string `gorm:""`
   Login_qq         string `gorm:""`
   Send_qq          string `gorm:""`
   Group            string `gorm:""`
   Content          string `gorm:""`
   Font_id          string `gorm:""`
   File             string `gorm:""`
   Being_operate_qq string `gorm:""`
   Add_time         string `gorm:""`
}

func EsClient() *elasticsearch.Client {
   cfg := elasticsearch.Config{
      Addresses: []string{
         "http://127.0.0.1:9200",
      },
   }

   es, err := elasticsearch.NewClient(cfg)
   if err != nil {
      log.Fatalf("Error creating the client: %s", err)
   }

   // Get cluster info
   //
   var r map[string]interface{}
   res, err := es.Info()
   if err != nil {
      log.Fatalf("Error getting response: %s", err)
   }
   // Check response status
   if res.IsError() {
      log.Fatalf("Error: %s", res.String())
   }
   // Deserialize the response into a map.
   if err := json.NewDecoder(res.Body).Decode(&r); err != nil {
      log.Fatalf("Error parsing the response body: %s", err)
   }
   // Print client and server version numbers.
   log.Printf("Client: %s", elasticsearch.Version)
   log.Printf("Server: %s", r["version"].(map[string]interface{})["number"])
   log.Println(strings.Repeat("~", 37))

   return es
}

func Db() *gorm.DB {
   db, err := gorm.Open("mysql", "robot:bK8D6pAx82iTSWrK@(admin.easyswoole.cn:3306)/robot?charset=utf8mb4&parseTime=True&loc=Local")
   if err != nil {
      panic(err)
   }
   return db
}

func main() {
   es := EsClient()
   res, err := es.Info()
   if err != nil {
      log.Fatalf("Error getting response: %s", err)
   }
   defer res.Body.Close()

   db := Db()
   defer db.Close()
   db.AutoMigrate(&LogInfo{})

   var list = &[]LogInfo{}
   lastId := 30308
start:
   db.Table("log").Limit(100).Order("id ASC").Where("id > ?", lastId).Find(list)
   var wg sync.WaitGroup
   for _, logInfo := range *list {
      wg.Add(1)
      go func(info LogInfo) {
         defer wg.Done()
         AddEsData(es, info)
      }(logInfo)
      lastId = logInfo.Id
   }
   wg.Wait()
   goto start
   //fmt.Println(list)
   //log.Println(res)
}

func AddEsData(es *elasticsearch.Client, info LogInfo) {
   id := info.Id
   // Build the request body.
   data, err := json.Marshal(info)
   if err != nil {
      log.Fatalf("Error marshaling document: %s", err)
   }

   // Set up the request object.
   req := esapi.IndexRequest{
      Index:      "test",
      DocumentID: strconv.Itoa(id),
      Body:       bytes.NewReader(data),
      Refresh:    "true",
   }

   // Perform the request with the client.
   res, err := req.Do(context.Background(), es)

   if err != nil {
      log.Fatalf("Error getting response: %s", err)
   }
   defer res.Body.Close()

   if res.IsError() {
      fmt.Println(res)
      log.Printf("[%s] Error indexing document ID=%d", res.Status(), info.Id)
   } else {
      // Deserialize the response into a map.
      var r map[string]interface{}
      if err := json.NewDecoder(res.Body).Decode(&r); err != nil {
         log.Printf("Error parsing the response body: %s", err)
      } else {
         // Print the response status and indexed document version.
         //log.Printf("[%s] %s; version=%d", res.Status(), r["result"], int(r["_version"].(float64)))
      }
   }
}
复制
【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。