elasticsearch学习二:导入数据
【摘要】 安装go-elasticsearch,gormgo get -u github.com/jinzhu/gormgo get github.com/elastic/go-elasticsearch/v7复制我的是v7版本,所以elasticsearch使用v7,如果是v8则改成v8go-elasticsearch和gorm操作func EsClient() *elasticsearch.Cli...
安装go-elasticsearch,gorm
go get -u github.com/jinzhu/gorm
go get github.com/elastic/go-elasticsearch/v7
复制
我的是v7版本,所以elasticsearch使用v7,如果是v8则改成v8
go-elasticsearch和gorm操作
func EsClient() *elasticsearch.Client {
cfg := elasticsearch.Config{
Addresses: []string{
"http://127.0.0.1:9200",
},
}
es, err := elasticsearch.NewClient(cfg)
if err != nil {
log.Fatalf("Error creating the client: %s", err)
}
// Get cluster info
//
var r map[string]interface{}
res, err := es.Info()
if err != nil {
log.Fatalf("Error getting response: %s", err)
}
// Check response status
if res.IsError() {
log.Fatalf("Error: %s", res.String())
}
// Deserialize the response into a map.
if err := json.NewDecoder(res.Body).Decode(&r); err != nil {
log.Fatalf("Error parsing the response body: %s", err)
}
// Print client and server version numbers.
log.Printf("Client: %s", elasticsearch.Version)
log.Printf("Server: %s", r["version"].(map[string]interface{})["number"])
log.Println(strings.Repeat("~", 37))
return es
}
func Db() *gorm.DB {
db, err := gorm.Open("mysql", "robot:bK8D6pAx82iTSWrK@(admin.easyswoole.cn:3306)/robot?charset=utf8mb4&parseTime=True&loc=Local")
if err != nil {
panic(err)
}
return db
}
复制
elasticsearch导入
在7.0之后的版本,一个index只允许一个type,所以不需要额外定义type
func AddEsData(es *elasticsearch.Client, info LogInfo) {
id := info.Id
// Build the request body.
data, err := json.Marshal(info)
if err != nil {
log.Fatalf("Error marshaling document: %s", err)
}
// Set up the request object.
req := esapi.IndexRequest{
Index: "test",
DocumentID: strconv.Itoa(id),
Body: bytes.NewReader(data),
Refresh: "true",
}
// Perform the request with the client.
res, err := req.Do(context.Background(), es)
if err != nil {
log.Fatalf("Error getting response: %s", err)
}
defer res.Body.Close()
if res.IsError() {
fmt.Println(res)
log.Printf("[%s] Error indexing document ID=%d", res.Status(), info.Id)
} else {
// Deserialize the response into a map.
var r map[string]interface{}
if err := json.NewDecoder(res.Body).Decode(&r); err != nil {
log.Printf("Error parsing the response body: %s", err)
} else {
// Print the response status and indexed document version.
//log.Printf("[%s] %s; version=%d", res.Status(), r["result"], int(r["_version"].(float64)))
}
}
}
复制
总的代码:
package main
import (
"bytes"
"context"
"encoding/json"
"fmt"
"github.com/elastic/go-elasticsearch/v7"
"github.com/elastic/go-elasticsearch/v7/esapi"
"github.com/jinzhu/gorm"
_ "github.com/jinzhu/gorm/dialects/mysql"
"log"
"strconv"
"strings"
"sync"
)
type LogInfo struct {
Id int `gorm:"primary_key"`
Type_name string `gorm:""`
Type string `gorm:""`
Sub_type string `gorm:""`
Sub_type_name string `gorm:""`
Time string `gorm:""`
Login_qq string `gorm:""`
Send_qq string `gorm:""`
Group string `gorm:""`
Content string `gorm:""`
Font_id string `gorm:""`
File string `gorm:""`
Being_operate_qq string `gorm:""`
Add_time string `gorm:""`
}
func EsClient() *elasticsearch.Client {
cfg := elasticsearch.Config{
Addresses: []string{
"http://127.0.0.1:9200",
},
}
es, err := elasticsearch.NewClient(cfg)
if err != nil {
log.Fatalf("Error creating the client: %s", err)
}
// Get cluster info
//
var r map[string]interface{}
res, err := es.Info()
if err != nil {
log.Fatalf("Error getting response: %s", err)
}
// Check response status
if res.IsError() {
log.Fatalf("Error: %s", res.String())
}
// Deserialize the response into a map.
if err := json.NewDecoder(res.Body).Decode(&r); err != nil {
log.Fatalf("Error parsing the response body: %s", err)
}
// Print client and server version numbers.
log.Printf("Client: %s", elasticsearch.Version)
log.Printf("Server: %s", r["version"].(map[string]interface{})["number"])
log.Println(strings.Repeat("~", 37))
return es
}
func Db() *gorm.DB {
db, err := gorm.Open("mysql", "robot:bK8D6pAx82iTSWrK@(admin.easyswoole.cn:3306)/robot?charset=utf8mb4&parseTime=True&loc=Local")
if err != nil {
panic(err)
}
return db
}
func main() {
es := EsClient()
res, err := es.Info()
if err != nil {
log.Fatalf("Error getting response: %s", err)
}
defer res.Body.Close()
db := Db()
defer db.Close()
db.AutoMigrate(&LogInfo{})
var list = &[]LogInfo{}
lastId := 30308
start:
db.Table("log").Limit(100).Order("id ASC").Where("id > ?", lastId).Find(list)
var wg sync.WaitGroup
for _, logInfo := range *list {
wg.Add(1)
go func(info LogInfo) {
defer wg.Done()
AddEsData(es, info)
}(logInfo)
lastId = logInfo.Id
}
wg.Wait()
goto start
//fmt.Println(list)
//log.Println(res)
}
func AddEsData(es *elasticsearch.Client, info LogInfo) {
id := info.Id
// Build the request body.
data, err := json.Marshal(info)
if err != nil {
log.Fatalf("Error marshaling document: %s", err)
}
// Set up the request object.
req := esapi.IndexRequest{
Index: "test",
DocumentID: strconv.Itoa(id),
Body: bytes.NewReader(data),
Refresh: "true",
}
// Perform the request with the client.
res, err := req.Do(context.Background(), es)
if err != nil {
log.Fatalf("Error getting response: %s", err)
}
defer res.Body.Close()
if res.IsError() {
fmt.Println(res)
log.Printf("[%s] Error indexing document ID=%d", res.Status(), info.Id)
} else {
// Deserialize the response into a map.
var r map[string]interface{}
if err := json.NewDecoder(res.Body).Decode(&r); err != nil {
log.Printf("Error parsing the response body: %s", err)
} else {
// Print the response status and indexed document version.
//log.Printf("[%s] %s; version=%d", res.Status(), r["result"], int(r["_version"].(float64)))
}
}
}
复制
【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱:
cloudbbs@huaweicloud.com
- 点赞
- 收藏
- 关注作者
评论(0)