华为云ModelArts通过resnet50预置算法rest接口aksk鉴权创建训练作业,导入模型,部署服务
【摘要】 华为云ModelArts通过resnet50预置算法rest接口aksk鉴权创建训练作业,导入模型,部署服务,
红框位置的信息都需要配置成自己的,鉴权使用了apig的sdk
代码结构如下
main.py是程序的入口,我的开发流程是将apig整个工程下载下来以后,直接在main.py上进行更改。运行环境是modelarts的notebook。
job_configs下面包含创建的配置文件,配置文件的内容获取方式参考,里面默认用的是resnet50这个算法
https://bbs.huaweicloud.com/blogs/238421
create_wait_job_finish_v2()适配的是新版本训练
check_train_url()方法是用来动态的创建训练的输出目录,如果是在本地进行运行,那么这里可以注释掉,如果需要动态的创建输出目录,可以使用obsutil工具来实现
obsutil参考 https://bbs.huaweicloud.com/blogs/230127
# coding=utf-8
import requests
from apig_sdk import signer
import json
import time
import os
def check_train_url(obs_url):
import moxing as mox
if mox.file.exists(obs_url) == False:
mox.file.mk_dir(obs_url)
print("create obs dir ", obs_url)
else:
print("obs dir exist")
index = "34"
JOB_NAME = "train_test" + index
MODEL_NAME = "model_test" + index
SERVICE_NAME = "service_test" + index
own_ak = "<YOUR-OWN-AK>"
own_sk = "<YOUR-OWN-SK>"
project_id = "<YOUR-OWN-PROJECTID>"
TRAIN_URL_BUCKET = "<OUTPUT OBS bucket name>"
TRAIN_FILE_URL = "OUTPUT OBS file path without bucket name"
#DATA_URL = ""
DATA_URL = "INPUT DATA path"
TRAIN_URL = "/" + TRAIN_URL_BUCKET + TRAIN_FILE_URL
check_train_url("obs:/" + TRAIN_URL)
obs_bucket_url = "https://"+ TRAIN_URL_BUCKET + ".obs.myhwclouds.com"
MODEL_SRC_LOCATION = obs_bucket_url + TRAIN_FILE_URL + "model/"
MODEL_EXEC_CODE_LOCATION = obs_bucket_url + TRAIN_FILE_URL + "model/customize_service.py"
sig = signer.Signer()
sig.Key = own_ak
sig.Secret = own_sk
base_header = "https://modelarts.cn-north-4.myhuaweicloud.com/"
url_header = base_header + "v2/"
url_v1_header = base_header + "v1/"
def create_job_v2():
operation = "/training-jobs"
train_url = url_header + project_id + operation
job_config_file_path = "job_configs/create_job_v2_horovod.json"
with open(job_config_file_path, "r") as f:
load_dict = json.load(f)
load_dict["metadata"]["name"] = JOB_NAME
load_dict["algorithm"]["inputs"][0]["remote"]["obs"]["obs_url"] = DATA_URL
load_dict["algorithm"]["outputs"][0]["remote"]["obs"]["obs_url"] = TRAIN_URL
#print(json.dumps(load_dict))
r = signer.HttpRequest("POST",
train_url,
{"x-stage": "RELEASE", "content-type": "application/json"},
json.dumps(load_dict))
sig.Sign(r)
resp = requests.request(r.method, r.scheme + "://" + r.host + r.uri, headers=r.headers, data=r.body)
#print(resp.status_code, resp.reason)
#print(json.loads(resp.content.decode()))
jobid = ""
if resp.status_code == 201 or resp.status_code == 200:
jobid = json.loads(resp.content.decode())["metadata"]["id"]
#print(jobid)
return jobid
def create_job_v1():
operation = "/training-jobs"
train_url = url_v1_header + project_id + operation
job_config_file_path = "job_configs/create_job_v1.json"
with open(job_config_file_path, "r") as f:
load_dict = json.load(f)
load_dict["job_name"] = JOB_NAME
load_dict["config"]["inputs"][0]["value"] = DATA_URL
load_dict["config"]["inputs"][0]["data_source"]["obs"]["obs_url"] = DATA_URL
load_dict["config"]["outputs"][0]["value"] = TRAIN_URL
load_dict["config"]["outputs"][0]["data_source"]["obs"]["obs_url"] = TRAIN_URL
#print(json.dumps(load_dict))
r = signer.HttpRequest("POST",
train_url,
{"x-stage": "RELEASE", "content-type": "application/json"},
json.dumps(load_dict))
sig.Sign(r)
resp = requests.request(r.method, r.scheme + "://" + r.host + r.uri, headers=r.headers, data=r.body)
#print(resp.status_code, resp.reason)
#print(json.loads(resp.content.decode()))
jobid = ""
if resp.status_code == 201 or resp.status_code == 200:
jobid = json.loads(resp.content.decode())["job_id"]
versionid = json.loads(resp.content.decode())["version_id"]
#print(jobid, versionid)
else:
print(json.loads(resp.content.decode()))
print("create job failed")
exit(1)
#print(jobid)
return jobid, versionid
def get_job_status_v2(jobid):
operation = "/training-jobs"
get_job_url = url_header + project_id + operation + "/" + jobid
r = signer.HttpRequest("GET",
get_job_url,
{"x-stage": "RELEASE"})
sig.Sign(r)
resp = requests.request(r.method, r.scheme + "://" + r.host + r.uri, headers=r.headers, data=r.body)
#print(resp.status_code, resp.reason)
print(json.loads(resp.content.decode())["status"]["phase"])
return json.loads(resp.content.decode())["status"]["phase"]
def get_job_status_v1(jobid, versionid):
operation = "/training-jobs"
get_job_url = url_v1_header + project_id + operation + "/" + str(jobid) + "/versions/" + str(versionid)
r = signer.HttpRequest("GET",
get_job_url,
{"x-stage": "RELEASE"})
sig.Sign(r)
resp = requests.request(r.method, r.scheme + "://" + r.host + r.uri, headers=r.headers, data=r.body)
#print(resp.status_code, resp.reason)
#print(json.loads(resp.content.decode())["status"])
return json.loads(resp.content.decode())["status"]
def import_model():
operation = "/models"
model_url = url_v1_header + project_id + operation
model_import_config_file_path = "job_configs/model_import.json"
with open(model_import_config_file_path, "r") as f:
load_dict = json.load(f)
load_dict["model_name"] = MODEL_NAME
#print(MODEL_SRC_LOCATION)
#print(MODEL_EXEC_CODE_LOCATION)
load_dict["source_location"] = MODEL_SRC_LOCATION
load_dict["execution_code"] = MODEL_EXEC_CODE_LOCATION
r = signer.HttpRequest("POST",
model_url,
{"x-stage": "RELEASE", "content-type": "application/json"},
json.dumps(load_dict))
sig.Sign(r)
resp = requests.request(r.method, r.scheme + "://" + r.host + r.uri, headers=r.headers, data=r.body)
print(resp.status_code, resp.reason)
#print(json.loads(resp.content.decode()))
if resp.status_code != 200:
print("import model failed")
exit(1)
modelid = json.loads(resp.content.decode())["model_id"]
print(modelid)
return modelid
def get_model_status(modelid):
operation = "/models/"
get_model_url = url_v1_header + project_id + operation + modelid
r = signer.HttpRequest("GET",
get_model_url,
{"x-stage": "RELEASE"})
sig.Sign(r)
resp = requests.request(r.method, r.scheme + "://" + r.host + r.uri, headers=r.headers, data=r.body)
#print(json.loads(resp.content.decode())["model_status"])
return json.loads(resp.content.decode())["model_status"]
def deploy_model(modelid):
operation = "/services"
model_deploy_url = url_v1_header + project_id + operation
model_deploy_config_file_path = "job_configs/model_deploy.json"
with open(model_deploy_config_file_path, "r") as f:
load_dict = json.load(f)
load_dict["service_name"] = SERVICE_NAME
load_dict["config"][0]["model_id"] = modelid
r = signer.HttpRequest("POST",
model_deploy_url,
{"x-stage": "RELEASE", "content-type": "application/json"},
json.dumps(load_dict))
sig.Sign(r)
resp = requests.request(r.method, r.scheme + "://" + r.host + r.uri, headers=r.headers, data=r.body)
print(json.loads(resp.content.decode()))
print(json.loads(resp.content.decode())["service_id"])
return json.loads(resp.content.decode())["service_id"]
def get_service_status(serviceid):
operation = "/services/"
service_url = url_v1_header + project_id + operation + serviceid
r = signer.HttpRequest("GET",
service_url,
{"x-stage": "RELEASE"})
sig.Sign(r)
resp = requests.request(r.method, r.scheme + "://" + r.host + r.uri, headers=r.headers, data=r.body)
try:
test = json.loads(resp.content.decode())["status"]
except:
print(json.loads(resp.content.decode()))
return json.loads(resp.content.decode())["status"]
def create_wait_job_finish_v2():
jobid = create_job_v2()
if jobid == "":
print("create job failed")
exit(1)
while(1):
jobstatus = get_job_status_v2(jobid)
print(jobstatus)
if jobstatus == "Completed":
break
if jobstatus == "Failed":
print("job run failed")
exit(1)
time.sleep(5)
def create_wait_job_finish_v1():
jobid,versionid = create_job_v1()
if jobid == "":
print("create job failed")
exit(1)
while(1):
jobstatus = get_job_status_v1(jobid, versionid)
print(jobstatus)
if jobstatus == 10:
break
if jobstatus == 11:
print("job run failed")
exit(1)
time.sleep(10)
def import_deploy_model():
modelid = import_model()
while(1):
modelstatus = get_model_status(modelid)
print(modelstatus)
if modelstatus == "published":
break
if modelstatus == "publishing" or modelstatus == "building":
time.sleep(5)
continue
print("import model failed")
exit(1)
serviceid = deploy_model(modelid)
while(1):
time.sleep(10)
servicestatus = get_service_status(serviceid)
print(servicestatus)
if servicestatus == "running":
break
else:
continue
if servicestatus == "deploying":
continue
print("import model failed")
exit(1)
if __name__ == '__main__':
#create_wait_job_finish_v2()
create_wait_job_finish_v1()
import_deploy_model()
【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱:
cloudbbs@huaweicloud.com
- 点赞
- 收藏
- 关注作者
评论(0)