[Ubuntu][Python][原创]make_sd_card.py制卡脚本Python文件解读注释

举报
futureflsl 发表于 2020/01/14 16:11:29 2020/01/14
【摘要】 刚看时这个脚本代码太多,是不是看不懂或者难看?其实告诉你写了这么多代码就一句有用,其他都是在搞乱七八糟的校验,防止出错。关键代码在202行,就这一句有用,其实完全就用202一行代码搞定了。很多注释在下面我做了,一看就懂。废话不多上代码+注释。下次放出制卡的sh脚本注释。## =========================================================...

刚看时这个脚本代码太多,是不是看不懂或者难看?其实告诉你写了这么多代码就一句有用,其他都是在搞乱七八糟的校验,防止出错。关键代码在202行,就这一句有用,其实完全就用202一行代码搞定了。很多注释在下面我做了,一看就懂。废话不多上代码+注释。下次放出制卡的sh脚本注释。


#

#   =======================================================================

#

# Copyright (C) 2018, Hisilicon Technologies Co., Ltd. All Rights Reserved.

#

# Redistribution and use in source and binary forms, with or without

# modification, are permitted provided that the following conditions are met:

#

#   1 Redistributions of source code must retain the above copyright notice,

#     this list of conditions and the following disclaimer.

#

#   2 Redistributions in binary form must reproduce the above copyright notice,

#     this list of conditions and the following disclaimer in the documentation

#     and/or other materials provided with the distribution.

#

#   3 Neither the names of the copyright holders nor the names of the

#   contributors may be used to endorse or promote products derived from this

#   software without specific prior written permission.

#

# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"

# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE

# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE

# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE

# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR

# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF

# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS

# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN

# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)

# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE

# POSSIBILITY OF SUCH DAMAGE.

#   =======================================================================

#

'''common installation'''

import os

import platform

import signal

import subprocess

import time

import yaml

import sys


NETWORK_CARD_DEFAULT_IP="192.168.0.2" # 网卡默认的IP地址

USB_CARD_DEFAULT_IP="192.168.1.2" # 开发板USB登录默认IP


VERSION_INFO_URL = "https://raw.githubusercontent.com/Ascend/tools/master/versioninfo.yaml"


CURRENT_PATH = os.path.dirname(

    os.path.realpath(__file__))


SD_CARD_MAKING_PATH = os.path.join(CURRENT_PATH, "sd_card_making")


MIN_DISK_SIZE = 7 * 1024 * 1024 * 1024 #最小容量限制

# 这个脚本写了一大堆代码其实就是为这个命令服务

MAKING_SD_CARD_COMMAND = "bash {path}/make_ubuntu_sd.sh " + {dev_name}" + \

    {pkg_path} {ubuntu_file_name} {ascend_developerkit_file_name}" + \

    " " + NETWORK_CARD_DEFAULT_IP + " " + USB_CARD_DEFAULT_IP + \

    " > {log_path}/make_ubuntu_sd.log "


# 在终端执行命令,python调用,这个和windows一样,其他编程调用终端的程序去运行脚本

def execute(cmdtimeout=3600cwd=None):

    '''execute os command'''

    # print(cmd)

    is_linux = platform.system() == 'Linux'


    if not cwd:

        cwd = os.getcwd()

    process = subprocess.Popen(cmd, cwd=cwd, bufsize=32768stdout=subprocess.PIPE,

                               stderr=subprocess.STDOUT, shell=True,

                               preexec_fn=os.setsid if is_linux else None)


    t_beginning = time.time()


    # cycle times

    time_gap = 0.01


    str_std_output = ""

    while True:


        str_out = str(process.stdout.read().decode())


        str_std_output = str_std_output + str_out


        if process.poll() is not None:

            break

        seconds_passed = time.time() - t_beginning


        if timeout and seconds_passed > timeout:


            if is_linux:

                os.kill(process.pid, signal.SIGTERM)

            else:

                process.terminate()

            return False, process.stdout.readlines()

        time.sleep(time_gap)

    str_std_output = str_std_output.strip()

    # print(str_std_output)

    std_output_lines_last = []

    std_output_lines = str_std_output.split("\n")

    for i in std_output_lines:

        std_output_lines_last.append(i)


    if process.returncode != 0 or "Traceback" in str_std_output:

        return False, std_output_lines_last


    return True, std_output_lines_last


def print_process(stringis_finished=False):

    if string == "" or string is None or is_finished:

        print(".......... .......... .......... .......... 100%"end='\r')

        print("")

    else:

        string = string.split(".......... "1)[1]

        string = string.split("%")[0] + "%"

        print(string, end='\r')


def execute_wget(cmdtimeout=86400cwd=None):

    '''execute os command'''

    # print(cmd)

    is_linux = platform.system() == 'Linux'


    if not cwd:

        cwd = os.getcwd()

    process = subprocess.Popen(cmd, cwd=cwd, bufsize=1stdout=subprocess.PIPE,

                               stderr=subprocess.STDOUT, shell=True,

                               preexec_fn=os.setsid if is_linux else None)


    t_beginning = time.time()


    # cycle times

    time_gap = 0.01


    str_std_output = ""

    process_str = ""

    while True:


        str_out = str(process.stdout.readline())

        if "......" in str_out:

            process_str = str_out

            print_process(process_str)


        str_std_output = str_std_output + str_out


        if process.poll() is not None:

            break

        seconds_passed = time.time() - t_beginning


        if timeout and seconds_passed > timeout:


            if is_linux:

                os.kill(process.pid, signal.SIGTERM)

            else:

                process.terminate()

            print("")

            return False

        time.sleep(time_gap)

        

    if process.returncode != 0 or "Traceback" in str_std_output:

        print("")

        return False

    print_process(process_str, True)

    return True


def check_sd(dev_name):

    # 执行shell命令 fdisk -l 2>/dev/null | grep "Disk 你的USB名称:"

    ret, disk = execute(

        "fdisk -l 2>/dev/null | grep \"Disk {dev_name}:\"".format(dev_name=dev_name))


    if not ret or len(disk) > 1:

        # 如果提示这个错误说明调用上面的shell命令,没有找到disk,此时你可以手动在终端执行看看什么原因

        print(

            "[ERROR] Can not get disk, please use fdisk -l to check available disk name!")

        return False

    # 查询挂载的磁盘到list

    ret, mounted_list = execute("df -h")

    # 返回None表示没有查询到,如果出现这个错误提示,请自己在终端执行df-h看看什么情况

    if not ret:

        print("[ERROR] Can not get mounted disk list!")

        return False

# df -h执行的结果,我电脑插的有SD卡

# Filesystem      Size  Used Avail Use% Mounted on

# udev            3.6G     0  3.6G   0% /dev

# tmpfs           742M  9.6M  732M   2% /run

# /dev/sda1       289G   21G  256G   8% /

# tmpfs           3.7G   22M  3.6G   1% /dev/shm

# tmpfs           5.0M  4.0K  5.0M   1% /run/lock

# tmpfs           3.7G     0  3.7G   0% /sys/fs/cgroup

# tmpfs           742M   60K  742M   1% /run/user/1002

# vmhgfs-fuse     376G  203G  174G  54% /mnt/hgfs

# tmpfs           742M   64K  742M   1% /run/user/1003

# /dev/sdb3        24G  179M   22G   1% /media/lwl/ubuntu_fs1

# /dev/sdb2       976M  1.3M  924M   1% /media/lwl/ubuntu_fs

# /dev/sdb1       4.8G  621M  4.0G  14% /media/lwl/ubuntu_fs2

    # 查询系统盘,并送入unchanged_disk_list

    unchanged_disk_list = []

    for each_mounted_disk in mounted_list:

        disk_name = each_mounted_disk.split()[0]

        disk_type = each_mounted_disk.split()[5]

        if disk_type == "/boot" or disk_type == "/":

            unchanged_disk_list.append(disk_name)

    unchanged_disk = " ".join(unchanged_disk_list)

    # 从USB信息中分割出实际的磁盘容量

    disk_size_str = disk[0].split(",")[1]

    disk_size_str = disk_size_str.split()[0]

    disk_size = int(disk_size_str)


    if dev_name not in unchanged_disk and disk_size >= MIN_DISK_SIZE:

        return True

    # 经过大量的用户反馈,这个错误经常出现,原因是disk_size >= MIN_DISK_SIZE不成立

    # 因为202-204代码把磁盘容量有时候会分割错,因为有的系统显示是28.7G,有的却显示28,7G结果割出来7

    # 出现这个问题,在确保SD卡足够情况下,202代码改成

    # disk_size_str = disk[0].split(", ")[1] 就是,后面加个空格

    print("[ERROR] Invalid SD card or size is less then 8G, please check SD Card.")

    return False



def parse_download_info(ascend_version):

    version_info_file_name = os.path.basename(VERSION_INFO_URL)

    version_info_path = os.path.join(

        CURRENT_PATH, version_info_file_name)

    ret = execute_wget("wget -O {name} {url} --no-check-certificate".format(

        name=version_info_path, url=VERSION_INFO_URL))


    if not ret or not os.path.exists(version_info_path):

        print(

            "[ERROR] Can not download versioninfo.yaml, please check your network connection.")

        execute("rm -rf {path}".format(path=version_info_path))

        return False""""""


    version_info_file = open(

        version_info_path, 'r'encoding='utf-8')

    version_info_dict = yaml.load(

        version_info_file)


    ascend_version_dict = version_info_dict.get("mini_developerkit")

    if ascend_version == "" or ascend_version == ascend_version_dict.get("latest_version"):

        ascend_developerkit_url = ascend_version_dict.get("url")

        ascend_sd_making_url = ascend_version_dict.get("sd_making_url")

        ubuntu_version = ascend_version_dict.get(

            "compatibility").get("ubuntu")[0]

    else:

        version_list = ascend_version_dict.get("archived_versions")

        for each_version in version_list:

            if ascend_version == each_version.get("version"):

                ascend_developerkit_url = each_version.get("url")

                ascend_sd_making_url = each_version.get("sd_making_url")

                ubuntu_version = each_version.get(

                    "compatibility").get("ubuntu")[0]

                break

    ubuntu_version_dict = version_info_dict.get("ubuntu")

    for each_version in ubuntu_version_dict:

        if ubuntu_version == each_version.get("version"):

            ubuntu_url = each_version.get("url")

            break


    version_info_file.close()


    if ascend_developerkit_url == "" or ascend_sd_making_url == "" or ubuntu_url == "":

        return False""""""


    return True, ascend_developerkit_url, ascend_sd_making_url, ubuntu_url



def process_local_installation(dev_name):

    confirm_tips = "Please make sure you have installed dependency packages:" + \

        "\n\t apt-get install -y qemu-user-static binfmt-support gcc-aarch64-linux-gnu g++-aarch64-linux-gnu\n" + \

        "Please input Y: continue, other to install them:"

    confirm = input(confirm_tips) #手动提示输入

    confirm = confirm.strip() # 删首尾空


    if confirm != "Y" and confirm != "y":

        return False


   # 删除日志目录

    execute("rm -rf {path}_log/*".format(path=SD_CARD_MAKING_PATH))

   # 重新创建日志目录

    execute("mkdir -p {path}_log".format(path=SD_CARD_MAKING_PATH))

    log_path = "{path}_log".format(path=SD_CARD_MAKING_PATH)

    # 在当前路径找有没有mini_developerkit包

    ret, paths = execute(

        "find {path} -name \"mini_developerkit*.rar\"".format(path=CURRENT_PATH))

    if not ret:

        print("[ERROR] Can not fine mini eveloperkit package in current path")

        return False

    # 只允许有一个mini_developerkit包

    if len(paths) > 1:

        print(

            "[ERROR] Too many mini developerkit packages, please delete redundant packages.")

        return False

    ascend_developerkit_path = paths[0]

    ascend_developerkit_file_name = os.path.basename(ascend_developerkit_path)

    # 看看脚本下面有没有make-ubuntu-sd.sh

    ret, paths = execute(

        "find {path} -name \"make-ubuntu-sd.sh\"".format(path=CURRENT_PATH))

    if not ret:

        print("[ERROR] Can not fine make_ubuntu_sd.sh in current path")

        return False

    # 看看脚本下面有没有ubuntu镜像,注意下载的镜像不能乱改名字,如果有错误提示

    # 要么你没有下载镜像,要么你把下载镜像名字给乱改了

    ret, paths = execute(

        "find {path} -name \"ubuntu*server*arm*.iso\"".format(path=CURRENT_PATH))

    if not ret:

        print("[ERROR] Can not fine ubuntu\ package in current path")

        return False

    if len(paths) > 1:

        print("[ERROR] Too many ubuntu packages, please delete redundant packages.")

        return False

    ubuntu_path = paths[0]

    ubuntu_file_name = os.path.basename(ubuntu_path)

    # 这个python脚本就这一句有用,其他都是辅助,或者说整个脚本就这一句起了实际制卡作用

    print("Step: Start to make SD Card. It need some time, please wait...")

    execute(MAKING_SD_CARD_COMMAND.format(path=CURRENT_PATH, dev_name=dev_name, pkg_path=CURRENT_PATH,

                                                ubuntu_file_name=ubuntu_file_name,

                                                ascend_developerkit_file_name=ascend_developerkit_file_name, log_path=log_path))

    # make_ubuntu_sd.result会存储制卡的结果,如果不是Success就表示出了问题

    ret = execute("grep Success {log_path}/make_ubuntu_sd.result".format(log_path=log_path))

    if not ret[0]:

        # 很多用户反馈这个错误提示后查看日志没有什么有用信息,我们只能看终端打印的信息排查问题

        print("[ERROR] Making SD Card failed, please check %s/make_ubuntu_sd.log for details!" % log_path)

        return False


    return True


# 通过网络方式连接方式安装,由于安装包很大,网络速度等问题,不推荐这种方式

def process_internet_installation(dev_nameascend_version):

    print("Step: Downloading version information...")

    ret = parse_download_info(ascend_version) # 校验版本号

    # 版本不对就提示一下

    if not ret[0]:

        print("Can not find valid versions, please try to get valid version: python3 sd_card_making.py list")

        return False


    ascend_developerkit_url = ret[1]

    ascend_sd_making_url = ret[2]

    ubuntu_url = ret[3]


    execute("rm -rf {path}_log/*".format(path=SD_CARD_MAKING_PATH))

    execute("mkdir -p {path}_log".format(path=SD_CARD_MAKING_PATH))

    log_path = "{path}_log".format(path=SD_CARD_MAKING_PATH)

    # 下载developerkit包,这样免除了离线的校验

    print("Step: Downloading developerkit package...")

    ascend_developerkit_file_name = os.path.basename(ascend_developerkit_url)

    ascend_developerkit_path = os.path.join(

        CURRENT_PATH, ascend_developerkit_file_name)

    if os.path.exists(ascend_developerkit_path):

        print("%s is already downloaded, skip to download it." %

              ascend_developerkit_path)

    else:

        ret = execute_wget("wget -O {name} {url} --no-check-certificate".format(

            name=ascend_developerkit_path, url=ascend_developerkit_url))

        if not ret or not os.path.exists(ascend_developerkit_path):

            print("[ERROR] Download develperkit package failed, Please check %s connection." %

                  ascend_developerkit_url)

            execute("rm -rf {path}".format(path=ascend_developerkit_path))

            return False

    # 下载制卡脚本

    print("Step: Downloading SD Card making scripts...")

    ascend_sd_making_file_name = os.path.basename(ascend_sd_making_url)

    ascend_sd_making_path = os.path.join(

        CURRENT_PATH, ascend_sd_making_file_name)

    ret = execute_wget("wget -O {name} {url} --no-check-certificate".format(

        name=ascend_sd_making_path, url=ascend_sd_making_url))

    if not ret or not os.path.exists(ascend_developerkit_path):

        print("[ERROR] Download SD Card making scripts failed, Please check %s connection." %

              ascend_sd_making_url)

        execute("rm -rf {path}".format(path=ascend_sd_making_path))

        return False

    # 下载Ubuntu镜像

    print("Step: Downloading Ubuntu iso...")

    ubuntu_file_name = os.path.basename(ubuntu_url)

    ubuntu_path = os.path.join(CURRENT_PATH, ubuntu_file_name)

    if os.path.exists(ubuntu_path):

        print("%s is already downloaded, skip to download it." % ubuntu_path)

    else:

        ret = execute_wget(

            "wget -O {name} {url} --no-check-certificate".format(name=ubuntu_path, url=ubuntu_url))

        if not ret or not os.path.exists(ascend_developerkit_path):

            print(

                "[ERROR] Download Ubuntu iso failed, Please check %s connection." % ubuntu_url)

            execute("rm -rf {path}".format(path=ubuntu_path))

            return False

    # 安装一些依赖

    print("Step: Installing system dependency...")

    ret = execute(

        "apt-get install -y qemu-user-static binfmt-support gcc-aarch64-linux-gnu g++-aarch64-linux-gnu")

    if not ret[0]:

        print("[ERROR] Install system dependency failed, please check:" +

              "\n\tapt-get install -y qemu-user-static binfmt-support gcc-aarch64-linux-gnu g++-aarch64-linux-gnu\n\tapt-get install -y qemu-user-static binfmt-support gcc-aarch64-linux-gnu g++-aarch64-linux-gnu")

    # 执行制卡脚本

    print("Step: Start to make SD Card. It need some time, please wait...")

    execute(MAKING_SD_CARD_COMMAND.format(path=CURRENT_PATH, dev_name=dev_name,

                                                pkg_path=CURRENT_PATH, ubuntu_file_name=ubuntu_file_name,

                                                ascend_developerkit_file_name=ascend_developerkit_file_name,

                                                log_path=log_path))

    ret = execute("grep Success {log_path}/make_ubuntu_sd.result".format(log_path=log_path))

    if not ret[0]:

        print("[ERROR] Making SD Card failed, please check %s/make_ubuntu_sd.log for details!" % log_path)

        return False

    return True


# 打印一些提示信息

def print_usage():

    print("Usage: ")

    print("\t[internet]: python3 make_sd_card.py internet [SD Name]")

    print("\t                 Use latest version to make SD card.")

    print("\t[internet]: python3 make_sd_card.py internet [SD Name] [Version]")

    print("\t                 Use given version to make SD card.")

    print("\t[local   ]: python3 make_sd_card.py local [SD Name]")

    print("\t                 Use local given packages to make SD card.")



def main():

    '''sd card making'''

    command = ""

    dev_name = ""

    version = ""

    # 参数超过3个的,取出执行命令(网络还是离线)和USB名称

    if (len(sys.argv) >= 3):

        command = sys.argv[1]

        dev_name = sys.argv[2]

    # 参数超过4个的,第4个参数是版本号

    if (len(sys.argv) >= 4):

        version = sys.argv[3]

    # 参数必须为3个及其以上,否则不会制卡

    if command == "internet" and (len(sys.argv) == 3 or len(sys.argv) == 4):

        print("Begin to make SD Card...")

    elif command == "local" and len(sys.argv) == 3:

        print("Begin to make SD Card...")

    else:

        print("Invalid Command!")

        print_usage()

        exit(-1)

    # 检查U盘名称是否合法,如果不合法不会制卡

    ret = check_sd(dev_name)


    if not ret:

        exit(-1)#退出程序


    if command == "internet":# 执行在线安装函数

        result = process_internet_installation(dev_name, version)

    else:

        result = process_local_installation(dev_name)# 执行离线安装函数

    # 执行上面的结果为True就是制卡成功,否则就是失败

    if result:

        print("Make SD Card successfully!")

        exit(0)

    else:

        exit(-1)



if __name__ == '__main__':

    main() # 程序主函数的入口调用,打开此文件从此函数开始执行



【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。