学习 华为云 API exporler python方式 第一天
import json
import time
import yaml
import argparse
from huaweicloudsdkims.v2 import *
from huaweicloudsdkevs.v2 import *
from huaweicloudsdkecs.v2 import *
from huaweicloudsdkcore.exceptions import exceptions
from huaweicloudsdkims.v2.region.ims_region import ImsRegion
from huaweicloudsdkevs.v2.region.evs_region import EvsRegion
from huaweicloudsdkecs.v2.region.ecs_region import EcsRegion
from huaweicloudsdkcore.auth.credentials import BasicCredentials
ak = "UCJEM0YMIIEGCS3SGW4R"
sk = "ueyWtMjX5o9KBbQkFKPWx14Jkiqbq4VQ2EQjacfn"
region = "cn-north-4"
dev_server_id = 'cb13603f-f736-4d00-bb06-0c56f13ca288'
def ShowServer(server_id: str):
credentials = BasicCredentials(ak, sk)
client = EcsClient.new_builder().with_credentials(credentials).with_region(EcsRegion.value_of(region)).build()
try:
request = ShowServerRequest()
request.server_id = server_id
response = client.show_server(request)
print(response)
return response
except exceptions.ClientRequestException as e:
print(e.status_code)
print(e.request_id)
print(e.error_code)
print(e.error_msg)
return e.status_code, e.request_id, e.error_code, e.error_msg
def ListImages():
credentials = BasicCredentials(ak, sk)
client = ImsClient.new_builder().with_credentials(credentials).with_region(ImsRegion.value_of(region)).build()
try:
request = ListImagesRequest()
response = client.list_images(request)
print(response)
return response
except exceptions.ClientRequestException as e:
print(e.status_code)
print(e.request_id)
print(e.error_code)
print(e.error_msg)
return e.status_code, e.request_id, e.error_code, e.error_msg
def CreatePostPaidServers(name: str, flavor_ref: str, image_ref: str, volumetype: str, vpcid: str, subnet_id: str):
credentials = BasicCredentials(ak, sk)
client = EcsClient.new_builder().with_credentials(credentials).with_region(EcsRegion.value_of(region)).build()
try:
request = CreatePostPaidServersRequest()
# SATA: 普通IO磁盘类型。
# SAS: 高IO磁盘类型。
# SSD: 超高IO磁盘类型。
# GPSSD: 为通用型SSD磁盘类型。
# co - p1: 高IO(性能优化I型)。
# uh - l1: 超高IO(时延优化)。
# ESSD: 为极速IO磁盘类型。
# GPSSD2: 通用型SSD
# V2磁盘类型。
# ESSD2: 极速型SSD
# V2磁盘类型。
rootVolumeServer = PostPaidServerRootVolume(
volumetype=volumetype
)
listNicsServer = [
PostPaidServerNic(
subnet_id=subnet_id
)
]
serverbody = PostPaidServer(
flavor_ref=flavor_ref,
image_ref=image_ref,
name=name,
nics=listNicsServer,
root_volume=rootVolumeServer,
vpcid=vpcid
)
request.body = CreatePostPaidServersRequestBody(
server=serverbody
)
response = client.create_post_paid_servers(request)
print(response)
return response
except exceptions.ClientRequestException as e:
print(e.status_code)
print(e.request_id)
print(e.error_code)
print(e.error_msg)
return e.status_code, e.request_id, e.error_code, e.error_msg
def ListServersDetails():
credentials = BasicCredentials(ak, sk)
client = EcsClient.new_builder().with_credentials(credentials).with_region(EcsRegion.value_of(region)).build()
try:
request = ListServersDetailsRequest()
response = client.list_servers_details(request)
print(response)
return response
except exceptions.ClientRequestException as e:
print(e.status_code)
print(e.request_id)
print(e.error_code)
print(e.error_msg)
return e.status_code, e.request_id, e.error_code, e.error_msg
def DeleteServers(server_id: str):
credentials = BasicCredentials(ak, sk)
client = EcsClient.new_builder().with_credentials(credentials).with_region(EcsRegion.value_of(region)).build()
try:
request = DeleteServersRequest()
listServersbody = [
ServerId(
id=server_id
)
]
request.body = DeleteServersRequestBody(
servers=listServersbody,
delete_volume=True,
delete_publicip=True
)
response = client.delete_servers(request)
print(response)
return response
except exceptions.ClientRequestException as e:
print(e.status_code)
print(e.request_id)
print(e.error_code)
print(e.error_msg)
return e.status_code, e.request_id, e.error_code, e.error_msg
def ShowVolume(volume_id: str):
credentials = BasicCredentials(ak, sk)
client = EvsClient.new_builder().with_credentials(credentials).with_region(EvsRegion.value_of(region)).build()
try:
request = ShowVolumeRequest()
request.volume_id = volume_id
response = client.show_volume(request)
print(response)
return response
except exceptions.ClientRequestException as e:
print(e.status_code)
print(e.request_id)
print(e.error_code)
print(e.error_msg)
return e.status_code, e.request_id, e.error_code, e.error_msg
def ListServerInterfaces(server_id: str):
credentials = BasicCredentials(ak, sk)
client = EcsClient.new_builder().with_credentials(credentials).with_region(EcsRegion.value_of(region)).build()
try:
request = ListServerInterfacesRequest()
request.server_id = server_id
response = client.list_server_interfaces(request)
print(response)
return response
except exceptions.ClientRequestException as e:
print(e.status_code)
print(e.request_id)
print(e.error_code)
print(e.error_msg)
return e.status_code, e.request_id, e.error_code, e.error_msg
if __name__ == "__main__":
parser = argparse.ArgumentParser()
sub = parser.add_subparsers(dest='command')
sub1 = sub.add_parser('create')
sub1.add_argument('-i', '--input', type=json.loads)
sub2 = sub.add_parser('get')
sub2.add_argument('-n', '--name')
sub2.add_argument('-o', '--output')
sub3 = sub.add_parser('getall')
sub3.add_argument('-o', '--output')
sub4 = sub.add_parser('delete')
sub4.add_argument('-n', '--name')
args = parser.parse_args()
print(args)
if args.command == 'create':
print(args.input)
dev_info = ShowServer(dev_server_id)
print(dev_info)
dev_info = dev_info.to_dict()
name = args.input['name']
flavor_ref = dev_info['server']['flavor']['id']
image_ref = args.input['imagename']
# image_ref = '02a17486-1214-4e42-8da7-7d200cac585e' # Centos 7.9 64bit
volumetype = ShowVolume(dev_info['server']['os_extended_volumesvolumes_attached'][0]['id']).to_dict()['volume']['volume_type']
vpcid = dev_info['server']['metadata']['vpc_id']
subnet_id = ListServerInterfaces(dev_server_id).to_dict()['interface_attachments'][0]['net_id']
new_server_id = CreatePostPaidServers(name=name, flavor_ref=flavor_ref, image_ref=image_ref, volumetype=volumetype, vpcid=vpcid, subnet_id=subnet_id).to_dict()['server_ids'][0]
print(new_server_id)
time.sleep(10)
print(str(ShowServer(new_server_id)))
elif args.command == 'get':
args_name = args.name
args_output = args.output
resp = ListServersDetails().to_dict()
for i in resp['servers']:
if i['name'] == args_name:
with open(args_output, 'w') as f:
f.write(str(ShowServer(i['id'])))
print('break')
break
elif args.command == 'getall':
with open(args.output,'w') as f:
f.write(yaml.dump(ListServersDetails().to_dict()))
elif args.command == 'delete':
args_name = args.name
resp = ListServersDetails().to_dict()
for i in resp['servers']:
if i['name'] == args_name:
print(str(DeleteServers(i['id'])))
print('break')
break
- 点赞
- 收藏
- 关注作者
评论(0)