开发笔记2

举报
yd_235284610 发表于 2025/03/21 00:02:58 2025/03/21
【摘要】 开发笔记2

#!/usr/bin/env python3
from fastapi import FastAPI
import uvicorn
from huaweicloudsdkcore.auth.credentials import BasicCredentials
from huaweicloudsdkvpc.v2.region.vpc_region import VpcRegion
from huaweicloudsdkcore.exceptions import exceptions
from huaweicloudsdkvpc.v2 import *

app = FastAPI()
ak = "UCJEM0YMIIEGCS3SGW4R"
sk = "ueyWtMjX5o9KBbQkFKPWx14Jkiqbq4VQ2EQjacfn"
g_region = "cn-north-4"


def api_list_vpc(name):
    credentials = BasicCredentials(ak, sk)
    client = VpcClient.new_builder() \
        .with_credentials(credentials) \
        .with_region(VpcRegion.value_of(g_region)) \
        .build()

    try:
        request = ListVpcsRequest()
        response = list(map(lambda x: {'name': x['name'], 'id': x['id']}, client.list_vpcs(request).to_dict()['vpcs']))
        for i in response:
            if i['name'] == name:
                print(i['id'])
                return i['id']
        return None
    except Exception:
        return None


@app.post('/cloud_vpc/create_vpc')
def api_create_vpc(data: dict):
    credentials = BasicCredentials(ak, sk)
    client = VpcClient.new_builder() \
        .with_credentials(credentials) \
        .with_region(VpcRegion.value_of(g_region)) \
        .build()
    try:
        request = CreateVpcRequest()
        vpcbody = CreateVpcOption(
            cidr=data['cidr'],
            name=data['name']
        )
        request.body = CreateVpcRequestBody(
            vpc=vpcbody
        )
        response = client.create_vpc(request)
        print(response)
        return response
    except exceptions.ClientRequestException as e:
        print(e.status_code)
        print(e.request_id)
        print(e.error_code)
        print(e.error_msg)
        return e
    except Exception as e:
        return e


@app.get('/cloud_vpc/vpc/{vpc_name}')
def api_get_vpc(vpc_name: str):
    credentials = BasicCredentials(ak, sk)
    client = VpcClient.new_builder() \
        .with_credentials(credentials) \
        .with_region(VpcRegion.value_of(g_region)) \
        .build()
    try:
        request = ShowVpcRequest()
        request.vpc_id = api_list_vpc(vpc_name)
        response = client.show_vpc(request)
        print(response)
        return response
    except exceptions.ClientRequestException as e:
        print(e.status_code)
        print(e.request_id)
        print(e.error_code)
        print(e.error_msg)
        return e
    except Exception as e:
        return e


@app.get('/cloud_vpc/vpc')
def api_get_list_vpc():
    credentials = BasicCredentials(ak, sk)
    client = VpcClient.new_builder() \
        .with_credentials(credentials) \
        .with_region(VpcRegion.value_of(g_region)) \
        .build()
    try:
        request = ListVpcsRequest()
        response = client.list_vpcs(request).to_dict()
        print(response)
        return response
    except Exception as e:
        return e


@app.put('/cloud_vpc/update_vpc')
def api_update_vpc(data: dict):
    credentials = BasicCredentials(ak, sk)
    client = VpcClient.new_builder() \
        .with_credentials(credentials) \
        .with_region(VpcRegion.value_of(g_region)) \
        .build()
    try:
        request = UpdateVpcRequest()
        request.vpc_id = api_list_vpc(data['old_name'])
        vpcbody = UpdateVpcOption(
            name=data['new_name']
        )
        request.body = UpdateVpcRequestBody(
            vpc=vpcbody
        )
        response = client.update_vpc(request)
        print(response)
        return response.to_dict()
    except Exception as e:
        return e


@app.delete('/cloud_vpc/delete_vpc')
def api_delete_vpc(data: dict):
    credentials = BasicCredentials(ak, sk)
    client = VpcClient.new_builder() \
        .with_credentials(credentials) \
        .with_region(VpcRegion.value_of(g_region)) \
        .build()

    try:
        request = DeleteVpcRequest()
        request.vpc_id = api_list_vpc(data['vpc_name'])
        response = client.delete_vpc(request)
        print(response)
        return response.to_dict()
    except Exception as e:
        return e


if __name__ == '__main__':
    uvicorn.run(app='main:app', host='0.0.0.0', port=7045, reload=True)



----vpc

import os
from huaweicloudsdkcore.auth.credentials import BasicCredentials
from huaweicloudsdkvpc.v2.region.vpc_region import VpcRegion
from huaweicloudsdkcore.exceptions import exceptions
from huaweicloudsdkvpc.v2 import *
ak = ""
sk = ""
projectId = "39b485b4efc24a3f9a6f28f7b47f4396"

credentials = BasicCredentials(ak, sk, projectId)

client = VpcClient.new_builder() \
        .with_credentials(credentials) \
        .with_region(VpcRegion.value_of("cn-north-4")) \
        .build()

try:
    request_list = ListVpcsRequest()
    response_list = client.list_vpcs(request_list).vpcs
    #print(response_list)
    for vpc in response_list:
        if vpc.name == "chinaskills_vpc":
            request_del = DeleteVpcRequest()
            request_del.vpc_id = vpc.id
            response = client.delete_vpc(request_del)

    request_create = CreateVpcRequest()
    vpcbody = CreateVpcOption(
        cidr="10.0.1.0/24",
        name="chinaskills_vpc"
    )
    request_create.body = CreateVpcRequestBody(
        vpc=vpcbody
    )
    response_create = client.create_vpc(request_create)
    #    print(response_create)
    request_list = ListVpcsRequest()
    response_list = client.list_vpcs(request_list).vpcs
    for vpc in response_list:
        if vpc.name == "chinaskills_vpc":
            request_read = ShowVpcRequest()
            request_read.vpc_id = vpc.id
            response = client.show_vpc(request_read)
            print(response)


except exceptions.ClientRequestException as e:
    print(e.status_code)
    print(e.request_id)
    print(e.error_code)
    print(e.error_msg)


---vpc-sub

# coding: utf-8

from huaweicloudsdkcore.auth.credentials import BasicCredentials
from huaweicloudsdkvpc.v2.region.vpc_region import VpcRegion
from huaweicloudsdkcore.exceptions import exceptions
from huaweicloudsdkvpc.v2 import *
import time
def get_token():
    ak = "58PXSSOET6Y2YWEYJQ4F"
    sk = "8gUvuu7aUEtDhhptZFPzzwRpICo6I9eza07GelGp"
    projectId = "39b485b4efc24a3f9a6f28f7b47f4396"
    credentials = BasicCredentials(ak, sk, projectId)

    client = VpcClient.new_builder() \
        .with_credentials(credentials) \
        .with_region(VpcRegion.value_of("cn-north-4")) \
        .build()
    return client


class subnet:
    def __init__(self, client):
        self.client = client

    def listvpc(self, name):
        client = self.client
        request = ListVpcsRequest()
        response = client.list_vpcs(request)
        a = response.vpcs
        for i in a:
            if i.name == name:
                return i.id

    def create(self):
        client = self.client
        id = self.listvpc('chinaskills_vpc')
        print(id)
        request = CreateSubnetRequest()
        subnetbody = CreateSubnetOption(
            availability_zone="cn-north-4a",
            name="chinaskills_subnet",
            cidr="10.0.1.0/26",
            vpc_id=id,
            gateway_ip="10.0.1.1",

        )
        request.body = CreateSubnetRequestBody(
            subnet=subnetbody
        )
        response = client.create_subnet(request)

    def get_id(self, name):
        request = ListSubnetsRequest()
        response = client.list_subnets(request)
        a = response.subnets
        for i in a:
            if i.name == name:
                return i.id

    def delete(self, vpc_name, sub_name):
        sub_id = self.get_id(sub_name)
        vpc_id = self.listvpc(vpc_name)
        request = DeleteSubnetRequest()
        request.vpc_id = vpc_id
        request.subnet_id = sub_id
        response = client.delete_subnet(request)

    def get(self, name):
        id = self.get_id(name)
        print(id)
        request = ShowSubnetRequest()
        request.subnet_id = id
        response = client.show_subnet(request)
        print(response)


if __name__ == "__main__":
    client = get_token()
    a = subnet(client)
    a.delete('vpc-default', 'chinaskills_subnet')
    time.sleep(3)
    a.create()
    time.sleep(2)
    a.get('chinaskills_subnet')

---as_group

from huaweicloudsdkcore.auth.credentials import BasicCredentials
from huaweicloudsdkas.v1.region.as_region import AsRegion
from huaweicloudsdkcore.exceptions import exceptions
from huaweicloudsdkas.v1 import *

if __name__ == "__main__":
    ak = "58PXSSOET6Y2YWEYJQ4F"
    sk = "8gUvuu7aUEtDhhptZFPzzwRpICo6I9eza07GelGp"

    credentials = BasicCredentials(ak, sk)
    client = AsClient.new_builder() \
        .with_credentials(credentials) \
        .with_region(AsRegion.value_of("cn-north-4")) \
        .build()

    try:
        # 查询
        request = ListScalingGroupsRequest()
        response = client.list_scaling_groups(request).to_dict()
        if len(response['scaling_groups']) > 0:
            for i in response['scaling_groups']:
                # 删除
                scaling_group_id = i['scaling_group_id']
                request = DeleteScalingGroupRequest()
                request.scaling_group_id = scaling_group_id
                response = client.delete_scaling_group(request).to_dict()
                print(response)

        # 创建
        request = CreateScalingGroupRequest()
        listSecurityGroupsbody = [
            SecurityGroup(
                id="10a7d515-8fc5-4b89-a9e0-cd76cd24d4d3"
            )
        ]
        listNetworksbody = [
            Networks(
                id="60fac206-a107-485e-8a54-79216547e54a"
            )
        ]
        listAvailableZonesbody = [
            "cn-north-4a"
        ]
        request.body = CreateScalingGroupOption(
            vpc_id="eafce6b0-0ee0-4e1e-98f4-6e8fe4c96781",
            security_groups=listSecurityGroupsbody,
            networks=listNetworksbody,
            available_zones=listAvailableZonesbody,
            max_instance_number=3,
            min_instance_number=1,
            desire_instance_number=2,
            scaling_configuration_id="28a4a873-4788-4055-b371-665b836a19cd",
            delete_volume=True,
            delete_publicip=True,
            scaling_group_name="chinaskills_as_group"
        )
        response = client.create_scaling_group(request).to_dict()
        print(response)
        scaling_group_id = response['scaling_group_id']
        # 查询
        request = ShowScalingGroupRequest()
        request.scaling_group_id = scaling_group_id
        response = client.show_scaling_group(request)
        print(response)
    except exceptions.ClientRequestException as e:
        print(e.status_code)
        print(e.request_id)
        print(e.error_code)
        print(e.error_msg)

【版权声明】本文为华为云社区用户原创内容,未经允许不得转载,如需转载请自行联系原作者进行授权。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。