Logstash实现ES集群数据迁移实践-update

举报
五柳 发表于 2022/09/26 18:48:21 2022/09/26
【摘要】 原文: Logstash实现ES集群数据迁移实践-云社区-华为云 (huaweicloud.com),修改创建索引脚本,兼容python3。import yamlimport requestsimport jsonimport getoptimport sysdef help(): print """ usage: -h/--help print this help....

原文: Logstash实现ES集群数据迁移实践-云社区-华为云 (huaweicloud.com),修改创建索引脚本,兼容python3。

import yaml
import requests
import json
import getopt
import sys
def help():
    print
    """
    usage:
    -h/--help print this help.
    -c/--config config file path, default is config.yaml

    example:
    python create_mapping.py -c config.yaml
    """
def process_mapping(index_mapping, dest_index):
    print(index_mapping)
    # remove unnecessary keys
    del index_mapping["settings"]["index"]["provided_name"]
    del index_mapping["settings"]["index"]["uuid"]
    del index_mapping["settings"]["index"]["creation_date"]
    del index_mapping["settings"]["index"]["version"]

    # check alias
    aliases = index_mapping["aliases"]
    for alias in list(aliases.keys()):
        if alias == dest_index:
            print(
                "source index " + dest_index + " alias " + alias + " is the same as dest_index name, will remove this alias.")
            del index_mapping["aliases"][alias]
    if "lifecycle" in index_mapping["settings"]["index"]:
        lifecycle = index_mapping["settings"]["index"]["lifecycle"]
        opendistro = {"opendistro": {"index_state_management":
                                         {"policy_id": lifecycle["name"],
                                          "rollover_alias": lifecycle["rollover_alias"]}}}
        index_mapping["settings"].update(opendistro)
        # index_mapping["settings"]["opendistro"]["index_state_management"]["rollover_alias"] = lifecycle["rollover_alias"]
        del index_mapping["settings"]["index"]["lifecycle"]
    print(index_mapping)
    return index_mapping
def put_mapping_to_target(url, mapping, source_index, dest_auth=None):
    headers = {'Content-Type': 'application/json'}
    create_resp = requests.put(url, headers=headers, data=json.dumps(mapping), auth=dest_auth)
    if create_resp.status_code != 200:
        print(
            "create index " + url + " failed with response: " + str(create_resp) + ", source index is " + source_index)
        print(create_resp.text)
        with open(source_index + ".json", "w") as f:
            json.dump(mapping, f)
def main():
    config_yaml = "config.yaml"
    opts, args = getopt.getopt(sys.argv[1:], '-h-c:', ['help', 'config='])
    for opt_name, opt_value in opts:
        if opt_name in ('-h', '--help'):
            help()
            exit()
        if opt_name in ('-c', '--config'):
            config_yaml = opt_value

    config_file = open(config_yaml)
    config = yaml.load(config_file)
    source = config["source"]
    source_user = config["source_user"]
    source_passwd = config["source_passwd"]
    source_auth = None
    if source_user != "":
        source_auth = (source_user, source_passwd)
    dest = config["destination"]
    dest_user = config["destination_user"]
    dest_passwd = config["destination_passwd"]
    dest_auth = None
    if dest_user != "":
        dest_auth = (dest_user, dest_passwd)
    print(source_auth)
    print(dest_auth)

    # only deal with mapping list
    print(config["mapping"])
    if config["only_mapping"]:
        for source_index, dest_index in config["mapping"].items():
            print("start to process source index" + source_index + ", target index: " + dest_index)
            source_url = source + "/" + source_index
            response = requests.get(source_url, auth=source_auth)
            if response.status_code != 200:
                print("*** get ElasticSearch message failed. resp statusCode:" + str(
                    response.status_code) + " response is " + response.text)
                continue
            mapping = response.json()
            index_mapping = process_mapping(mapping[source_index], dest_index)

            dest_url = dest + "/" + dest_index
            put_mapping_to_target(dest_url, index_mapping, source_index, dest_auth)
            print("process source index " + source_index + " to target index " + dest_index + " successed.")
    else:
        # get all indices
        response = requests.get(source + "/_alias", auth=source_auth)
        if response.status_code != 200:
            print("*** get all index failed. resp statusCode:" + str(
                response.status_code) + " response is " + response.text)
            exit()
        all_index = response.json()
        for index in list(all_index.keys()):
            if index == ".kibana":
                continue
            print("start to process source index" + index)
            source_url = source + "/" + index
            index_response = requests.get(source_url, auth=source_auth)
            if index_response.status_code != 200:
                print("*** get ElasticSearch message failed. resp statusCode:" + str(
                    index_response.status_code) + " response is " + index_response.text)
                continue
            mapping = index_response.json()

            dest_index = index
            if index in config["mapping"].keys():
                dest_index = config["mapping"][index]
            index_mapping = process_mapping(mapping[index], dest_index)

            dest_url = dest + "/" + dest_index
            put_mapping_to_target(dest_url, index_mapping, index, dest_auth)
            print("process source index " + index + " to target index " + dest_index + " successed.")
if __name__ == '__main__':
    main()

【版权声明】本文为华为云社区用户转载文章,如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。