开发笔记2
#!/usr/bin/env python3
from fastapi import FastAPI
import uvicorn
from huaweicloudsdkcore.auth.credentials import BasicCredentials
from huaweicloudsdkvpc.v2.region.vpc_region import VpcRegion
from huaweicloudsdkcore.exceptions import exceptions
from huaweicloudsdkvpc.v2 import *
app = FastAPI()
ak = "UCJEM0YMIIEGCS3SGW4R"
sk = "ueyWtMjX5o9KBbQkFKPWx14Jkiqbq4VQ2EQjacfn"
g_region = "cn-north-4"
def api_list_vpc(name):
credentials = BasicCredentials(ak, sk)
client = VpcClient.new_builder() \
.with_credentials(credentials) \
.with_region(VpcRegion.value_of(g_region)) \
.build()
try:
request = ListVpcsRequest()
response = list(map(lambda x: {'name': x['name'], 'id': x['id']}, client.list_vpcs(request).to_dict()['vpcs']))
for i in response:
if i['name'] == name:
print(i['id'])
return i['id']
return None
except Exception:
return None
@app.post('/cloud_vpc/create_vpc')
def api_create_vpc(data: dict):
credentials = BasicCredentials(ak, sk)
client = VpcClient.new_builder() \
.with_credentials(credentials) \
.with_region(VpcRegion.value_of(g_region)) \
.build()
try:
request = CreateVpcRequest()
vpcbody = CreateVpcOption(
cidr=data['cidr'],
name=data['name']
)
request.body = CreateVpcRequestBody(
vpc=vpcbody
)
response = client.create_vpc(request)
print(response)
return response
except exceptions.ClientRequestException as e:
print(e.status_code)
print(e.request_id)
print(e.error_code)
print(e.error_msg)
return e
except Exception as e:
return e
@app.get('/cloud_vpc/vpc/{vpc_name}')
def api_get_vpc(vpc_name: str):
credentials = BasicCredentials(ak, sk)
client = VpcClient.new_builder() \
.with_credentials(credentials) \
.with_region(VpcRegion.value_of(g_region)) \
.build()
try:
request = ShowVpcRequest()
request.vpc_id = api_list_vpc(vpc_name)
response = client.show_vpc(request)
print(response)
return response
except exceptions.ClientRequestException as e:
print(e.status_code)
print(e.request_id)
print(e.error_code)
print(e.error_msg)
return e
except Exception as e:
return e
@app.get('/cloud_vpc/vpc')
def api_get_list_vpc():
credentials = BasicCredentials(ak, sk)
client = VpcClient.new_builder() \
.with_credentials(credentials) \
.with_region(VpcRegion.value_of(g_region)) \
.build()
try:
request = ListVpcsRequest()
response = client.list_vpcs(request).to_dict()
print(response)
return response
except Exception as e:
return e
@app.put('/cloud_vpc/update_vpc')
def api_update_vpc(data: dict):
credentials = BasicCredentials(ak, sk)
client = VpcClient.new_builder() \
.with_credentials(credentials) \
.with_region(VpcRegion.value_of(g_region)) \
.build()
try:
request = UpdateVpcRequest()
request.vpc_id = api_list_vpc(data['old_name'])
vpcbody = UpdateVpcOption(
name=data['new_name']
)
request.body = UpdateVpcRequestBody(
vpc=vpcbody
)
response = client.update_vpc(request)
print(response)
return response.to_dict()
except Exception as e:
return e
@app.delete('/cloud_vpc/delete_vpc')
def api_delete_vpc(data: dict):
credentials = BasicCredentials(ak, sk)
client = VpcClient.new_builder() \
.with_credentials(credentials) \
.with_region(VpcRegion.value_of(g_region)) \
.build()
try:
request = DeleteVpcRequest()
request.vpc_id = api_list_vpc(data['vpc_name'])
response = client.delete_vpc(request)
print(response)
return response.to_dict()
except Exception as e:
return e
if __name__ == '__main__':
uvicorn.run(app='main:app', host='0.0.0.0', port=7045, reload=True)
----vpc
import os
from huaweicloudsdkcore.auth.credentials import BasicCredentials
from huaweicloudsdkvpc.v2.region.vpc_region import VpcRegion
from huaweicloudsdkcore.exceptions import exceptions
from huaweicloudsdkvpc.v2 import *
ak = ""
sk = ""
projectId = "39b485b4efc24a3f9a6f28f7b47f4396"
credentials = BasicCredentials(ak, sk, projectId)
client = VpcClient.new_builder() \
.with_credentials(credentials) \
.with_region(VpcRegion.value_of("cn-north-4")) \
.build()
try:
request_list = ListVpcsRequest()
response_list = client.list_vpcs(request_list).vpcs
#print(response_list)
for vpc in response_list:
if vpc.name == "chinaskills_vpc":
request_del = DeleteVpcRequest()
request_del.vpc_id = vpc.id
response = client.delete_vpc(request_del)
request_create = CreateVpcRequest()
vpcbody = CreateVpcOption(
cidr="10.0.1.0/24",
name="chinaskills_vpc"
)
request_create.body = CreateVpcRequestBody(
vpc=vpcbody
)
response_create = client.create_vpc(request_create)
# print(response_create)
request_list = ListVpcsRequest()
response_list = client.list_vpcs(request_list).vpcs
for vpc in response_list:
if vpc.name == "chinaskills_vpc":
request_read = ShowVpcRequest()
request_read.vpc_id = vpc.id
response = client.show_vpc(request_read)
print(response)
except exceptions.ClientRequestException as e:
print(e.status_code)
print(e.request_id)
print(e.error_code)
print(e.error_msg)
---vpc-sub
# coding: utf-8
from huaweicloudsdkcore.auth.credentials import BasicCredentials
from huaweicloudsdkvpc.v2.region.vpc_region import VpcRegion
from huaweicloudsdkcore.exceptions import exceptions
from huaweicloudsdkvpc.v2 import *
import time
def get_token():
ak = "58PXSSOET6Y2YWEYJQ4F"
sk = "8gUvuu7aUEtDhhptZFPzzwRpICo6I9eza07GelGp"
projectId = "39b485b4efc24a3f9a6f28f7b47f4396"
credentials = BasicCredentials(ak, sk, projectId)
client = VpcClient.new_builder() \
.with_credentials(credentials) \
.with_region(VpcRegion.value_of("cn-north-4")) \
.build()
return client
class subnet:
def __init__(self, client):
self.client = client
def listvpc(self, name):
client = self.client
request = ListVpcsRequest()
response = client.list_vpcs(request)
a = response.vpcs
for i in a:
if i.name == name:
return i.id
def create(self):
client = self.client
id = self.listvpc('chinaskills_vpc')
print(id)
request = CreateSubnetRequest()
subnetbody = CreateSubnetOption(
availability_zone="cn-north-4a",
name="chinaskills_subnet",
cidr="10.0.1.0/26",
vpc_id=id,
gateway_ip="10.0.1.1",
)
request.body = CreateSubnetRequestBody(
subnet=subnetbody
)
response = client.create_subnet(request)
def get_id(self, name):
request = ListSubnetsRequest()
response = client.list_subnets(request)
a = response.subnets
for i in a:
if i.name == name:
return i.id
def delete(self, vpc_name, sub_name):
sub_id = self.get_id(sub_name)
vpc_id = self.listvpc(vpc_name)
request = DeleteSubnetRequest()
request.vpc_id = vpc_id
request.subnet_id = sub_id
response = client.delete_subnet(request)
def get(self, name):
id = self.get_id(name)
print(id)
request = ShowSubnetRequest()
request.subnet_id = id
response = client.show_subnet(request)
print(response)
if __name__ == "__main__":
client = get_token()
a = subnet(client)
a.delete('vpc-default', 'chinaskills_subnet')
time.sleep(3)
a.create()
time.sleep(2)
a.get('chinaskills_subnet')
---as_group
from huaweicloudsdkcore.auth.credentials import BasicCredentials
from huaweicloudsdkas.v1.region.as_region import AsRegion
from huaweicloudsdkcore.exceptions import exceptions
from huaweicloudsdkas.v1 import *
if __name__ == "__main__":
ak = "58PXSSOET6Y2YWEYJQ4F"
sk = "8gUvuu7aUEtDhhptZFPzzwRpICo6I9eza07GelGp"
credentials = BasicCredentials(ak, sk)
client = AsClient.new_builder() \
.with_credentials(credentials) \
.with_region(AsRegion.value_of("cn-north-4")) \
.build()
try:
# 查询
request = ListScalingGroupsRequest()
response = client.list_scaling_groups(request).to_dict()
if len(response['scaling_groups']) > 0:
for i in response['scaling_groups']:
# 删除
scaling_group_id = i['scaling_group_id']
request = DeleteScalingGroupRequest()
request.scaling_group_id = scaling_group_id
response = client.delete_scaling_group(request).to_dict()
print(response)
# 创建
request = CreateScalingGroupRequest()
listSecurityGroupsbody = [
SecurityGroup(
id="10a7d515-8fc5-4b89-a9e0-cd76cd24d4d3"
)
]
listNetworksbody = [
Networks(
id="60fac206-a107-485e-8a54-79216547e54a"
)
]
listAvailableZonesbody = [
"cn-north-4a"
]
request.body = CreateScalingGroupOption(
vpc_id="eafce6b0-0ee0-4e1e-98f4-6e8fe4c96781",
security_groups=listSecurityGroupsbody,
networks=listNetworksbody,
available_zones=listAvailableZonesbody,
max_instance_number=3,
min_instance_number=1,
desire_instance_number=2,
scaling_configuration_id="28a4a873-4788-4055-b371-665b836a19cd",
delete_volume=True,
delete_publicip=True,
scaling_group_name="chinaskills_as_group"
)
response = client.create_scaling_group(request).to_dict()
print(response)
scaling_group_id = response['scaling_group_id']
# 查询
request = ShowScalingGroupRequest()
request.scaling_group_id = scaling_group_id
response = client.show_scaling_group(request)
print(response)
except exceptions.ClientRequestException as e:
print(e.status_code)
print(e.request_id)
print(e.error_code)
print(e.error_msg)
- 点赞
- 收藏
- 关注作者
评论(0)