yuyuyuyu2
【摘要】 def createPairKey(): try: # create request = NovaCreateKeypairRequest() keypairbody = NovaCreateKeypairOption( name="chinaskills_keypair" ) request....
def createPairKey():
try:
# create
request = NovaCreateKeypairRequest()
keypairbody = NovaCreateKeypairOption(
name="chinaskills_keypair"
)
request.body = NovaCreateKeypairRequestBody(
keypair=keypairbody
)
response = client.nova_create_keypair(request)
print(response)
except exceptions.ClientRequestException as e:
print(e.status_code)
print(e.request_id)
print(e.error_code)
print(e.error_msg)
def deletePairKey():
try:
# delete
request = NovaDeleteKeypairRequest()
request.keypair_name = "chinaskills_keypair"
response = client.nova_delete_keypair(request)
print(response)
except exceptions.ClientRequestException as e:
print(e.status_code)
print(e.request_id)
print(e.error_code)
print(e.error_msg)
def getKeyByName():
try:
request = NovaShowKeypairRequest()
request.keypair_name = "chinaskills_keypair"
response = client.nova_show_keypair(request)
return response
except exceptions.ClientRequestException as e:
print(e.status_code)
print(e.request_id)
print(e.error_code)
print(e.error_msg)
def create_volume():
try:
request = CreateVolumeRequest()
volumebody = CreateVolumeOption(
availability_zone="af-north-1a",
multiattach=True,
name="chinaskills_volume ",
size=100,
volume_type="SSD"
)
request.body = CreateVolumeRequestBody(
volume=volumebody
)
response = client.create_volume(request)
print(response)
print('volume create')
except exceptions.ClientRequestException as e:
print(e.status_code)
print(e.request_id)
print(e.error_code)
print(e.error_msg)
def delete_volume(volume_id):
try:
request = DeleteVolumeRequest()
request.volume_id = volume_id
response = client.delete_volume(request)
print(response)
except exceptions.ClientRequestException as e:
print(e.status_code)
print(e.request_id)
print(e.error_code)
print(e.error_msg)
def get_volume_id(name):
try:
request = ListVolumesRequest()
request.name = name
response = client.list_volumes(request)
print(response)
if response.to_dict()['count'] >= 1:
print('volume exist')
dict_volumes = response.to_dict()['volumes']
id = dict_volumes[0]['id']
return id
else:
return ' '
except exceptions.ClientRequestException as e:
print(e.status_code)
print(e.request_id)
print(e.error_code)
print(e.error_msg)
def createserver(name,imageid):
try:
request = CreateServersRequest()
rootVolumeServer = PrePaidServerRootVolume(
volumetype="SSD"
)
listNicsServer = [
PrePaidServerNic(
subnet_id="ecf996a0-bf3e-4521-bbfd-3f70963cc23b"
)
]
serverbody = PrePaidServer(
image_ref=imageid,
flavor_ref="ac7.2xlarge.2",
name=name,
vpcid="518f3932-5828-4da4-8eab-0c8bf2e42dda",
nics=listNicsServer,
root_volume=rootVolumeServer
)
request.body = CreateServersRequestBody(
server=serverbody
)
response = client.create_servers(request)
print(response)
except exceptions.ClientRequestException as e:
print(e.status_code)
print(e.request_id)
print(e.error_code)
print(e.error_msg)
def getserverbyname(name):
try:
request = ListServersDetailsRequest()
request.name = name
response = client.list_servers_details(request)
print(response)
except exceptions.ClientRequestException as e:
print(e.status_code)
print(e.request_id)
print(e.error_code)
print(e.error_msg)
def getAllServers():
try:
request = ListServersDetailsRequest()
response = client.list_servers_details(request)
print(response)
except exceptions.ClientRequestException as e:
print(e.status_code)
print(e.request_id)
print(e.error_code)
print(e.error_msg)
def getServersIdByName(name):
try:
request = ListCloudServersRequest()
request.name = name
response = client.list_cloud_servers(request)
print(response.to_dict()['servers'])
return response.to_dict()['servers'][0]['id']
except exceptions.ClientRequestException as e:
print(e.status_code)
print(e.request_id)
print(e.error_code)
print(e.error_msg)
def deleteServer(serverid):
try:
request = DeleteServersRequest()
listServersbody = [
ServerId(
id=serverid
)
]
request.body = DeleteServersRequestBody(
servers=listServersbody
)
response = client.delete_servers(request)
print(response)
except exceptions.ClientRequestException as e:
print(e.status_code)
print(e.request_id)
print(e.error_code)
print(e.error_msg)
if __name__ == "__main__":
# The AK and SK used for authentication are hard-coded or stored in plaintext, which has great security risks. It is recommended that the AK and SK be stored in ciphertext in configuration files or environment variables and decrypted during use to ensure security.
# In this example, AK and SK are stored in environment variables for authentication. Before running this example, set environment variables CLOUD_SDK_AK and CLOUD_SDK_SK in the local environment
ak = "DEWQPLKHLDZKHBKRKTEM"
sk = "Ww6lhO4rv0KplgLLCpCSgMqAwsjhpRN6XIN2K9xy"
credentials = BasicCredentials(ak, sk)
client = EcsClient.new_builder() \
.with_credentials(credentials) \
.with_region(EcsRegion.value_of("cn-north-9")) \
.build()
# createserver('test','6288ab88-3b7c-4320-9554-ad68a53eebb6')
# 这里写命令的映射关系,和openstack的provier一样103行之前所有的代码都是从华为云的apiexplorer里找出来的
parser = argparse.ArgumentParser(description='ECS Manager for Huawei Cloud')
subparsers = parser.add_subparsers(dest='command')
# Create ECS
parser_create = subparsers.add_parser('create', help='Create an ECS instance')
parser_create.add_argument('-i', '--input', type=json.loads, required=True,help='JSON formatted input for ECS details')
# Get ECS by name
parser_get = subparsers.add_parser('get', help='Get details of an ECS instance by name')
parser_get.add_argument('-n', '--name', type=str, required=True, help='Name of the ECS instance')
parser_get.add_argument('-o', '--output', type=str, help='Output file path (JSON format)')
# Get all ECS instances
parser_getall = subparsers.add_parser('getall', help='Get details of all ECS instances')
parser_getall.add_argument('-o', '--output', type=str, help='Output file path (YAML format)')
# Delete ECS by name
parser_delete = subparsers.add_parser('delete', help='Delete an ECS instance by name')
parser_delete.add_argument('-n', '--name', type=str, required=True, help='Name of the ECS instance')
args = parser.parse_args()
if args.command == 'create':
server = createserver(args.input['name'],args.input['image'])
if server:
server = getserverbyname(args.input['name'])
print(json.dumps(server.to_dict(), indent=2))
elif args.command == 'get':
server = getserverbyname(args.name)
if server:
server_info = server.to_dict()
if args.output:
print('write file')
with open(args.output, 'w') as f:
json.dump(server_info, f, indent=2)
else:
print(json.dumps(server_info, indent=2))
else:
print(f"ECS instance with name {args.name} not found.")
elif args.command == 'getall':
print('get all servers')
servers = getAllServers()
servers_info = [server.to_dict() for server in servers]
if args.output:
with open(args.output, 'w') as f:
yaml.dump(servers_info, f, default_flow_style=False)
else:
print(yaml.dump(servers_info, default_flow_style=False))
elif args.command == 'delete':
id = getserverbyname(args.name)
deleteServer(id)
else:
parser.print_help()
# coding: utf-8
from fastapi import FastAPI, HTTPException, Body, Path
from huaweicloudsdkcore.auth.credentials import BasicCredentials
from huaweicloudsdkvpc.v2.region.vpc_region import VpcRegion
from huaweicloudsdkcore.exceptions import exceptions
from huaweicloudsdkvpc.v2 import *
# The AK and SK used for authentication are hard-coded or stored in plaintext, which has great security risks. It is recommended that the AK and SK be stored in ciphertext in configuration files or environment variables and decrypted during use to ensure security.
# In this example, AK and SK are stored in environment variables for authentication. Before running this example, set environment variables CLOUD_SDK_AK and CLOUD_SDK_SK in the local environment
ak = "DEWQPLKHLDZKHBKRKTEM"
sk = "Ww6lhO4rv0KplgLLCpCSgMqAwsjhpRN6XIN2K9xy"
credentials = BasicCredentials(ak, sk)
client = VpcClient.new_builder() \
.with_credentials(credentials) \
.with_region(VpcRegion.value_of("cn-east-3")) \
.build()
# 初始化 FastAPI 应用
app = FastAPI()
def getVPCIdByName(name):
try:
request = ListVpcsRequest()
response = client.list_vpcs(request)
for vpc in response.to_dict()['vpcs']:
if vpc['name'] == name:
return vpc['id']
except exceptions.ClientRequestException as e:
print(e.status_code)
print(e.request_id)
print(e.error_code)
print(e.error_msg)
# 封装创建 VPC 接口
@app.post("/cloud_vpc/create_vpc/")
async def create_vpc(vpc_data: dict = Body(...)):
name = vpc_data.get("name")
cidr = vpc_data.get("cidr")
if not name or not cidr:
raise HTTPException(status_code=400, detail="VPC 名称和 CIDR 不能为空")
try:
request = CreateVpcRequest()
vpcbody = CreateVpcOption(
cidr=cidr,
name=name
)
request.body = CreateVpcRequestBody(
vpc=vpcbody
)
response = client.create_vpc(request)
print(response)
return response.to_dict()
except exceptions.ClientRequestException as e:
print(e.status_code)
print(e.request_id)
print(e.error_code)
print(e.error_msg)
# 封装单个 VPC 查询接口
@app.get("/cloud_vpc/vpc/{vpc_name}")
async def get_vpc(vpc_name: str = Path(...)):
try:
request = ListVpcsRequest()
response = client.list_vpcs(request)
for vpc in response.to_dict()['vpcs']:
if vpc['name'] == vpc_name:
return vpc
except exceptions.ClientRequestException as e:
print(e.status_code)
print(e.request_id)
print(e.error_code)
print(e.error_msg)
# 封装服务查询接口
@app.get("/cloud_vpc/vpc/")
async def list_vpc():
try:
request = ListVpcsRequest()
response = client.list_vpcs(request)
print(response)
return response.to_dict()
except exceptions.ClientRequestException as e:
print(e.status_code)
print(e.request_id)
print(e.error_code)
print(e.error_msg)
# 封装更新 VPC 名称接口
@app.put("/cloud_vpc/update_vpc/")
async def update_vpc(update_data: dict = Body(...)):
new_name = update_data.get("new_name")
old_name = update_data.get("old_name")
try:
request = UpdateVpcRequest()
request.vpc_id = getVPCIdByName(old_name)
vpcbody = UpdateVpcOption(
name= new_name
)
request.body = UpdateVpcRequestBody(
vpc=vpcbody
)
response = client.update_vpc(request)
print(response)
return response.to_dict()
except exceptions.ClientRequestException as e:
print(e.status_code)
print(e.request_id)
print(e.error_code)
print(e.error_msg)
# 封装删除 VPC 接口
@app.delete("/cloud_vpc/delete_vpc/")
async def delete_vpc(vpc_data: dict = Body(...)):
vpc_name = vpc_data.get("vpc_name")
id = getVPCIdByName(vpc_name)
try:
print('delete id = ', id)
request = DeleteVpcRequest()
request.vpc_id = id
response = client.delete_vpc(request)
print(response)
return response.to_dict()
except exceptions.ClientRequestException as e:
print(e.status_code)
print(e.request_id)
print(e.error_code)
print(e.error_msg)
# 启动 FastAPI 应用
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=7045)
curl -X DELETE http://192.168.100.10:7045/cloud_vpc/delete_vpc/ -H "Content-type: application/json" -d '{"vpc_name":"test123"}'
curl -X POST http://192.168.100.10:7045/cloud_vpc/create_vpc/ -H "Content-type: application/json" -d '{"name":"test123","cidr":"192.168.0.0/16"}'
curl -X GET http://192.168.100.10:7045/cloud_vpc/vpc/vpcname
curl -X PUT http://192.168.100.10:7045/cloud_vpc/update_vpc/ -H "Content-type: application/json" -d '{"name":"test123","cidr":"192.168.0.0/16"}'
pip install fastapi uvicorn
【版权声明】本文为华为云社区用户原创内容,未经允许不得转载,如需转载请自行联系原作者进行授权。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱:
cloudbbs@huaweicloud.com
- 点赞
- 收藏
- 关注作者
评论(0)