yuyuyuyu2

举报
yd_219475889 发表于 2024/12/13 18:55:11 2024/12/13
【摘要】 def createPairKey(): try: # create request = NovaCreateKeypairRequest() keypairbody = NovaCreateKeypairOption( name="chinaskills_keypair" ) request....
def createPairKey():
    try:
        # create
        request = NovaCreateKeypairRequest()
        keypairbody = NovaCreateKeypairOption(
            name="chinaskills_keypair"
        )
        request.body = NovaCreateKeypairRequestBody(
            keypair=keypairbody
        )
        response = client.nova_create_keypair(request)
        print(response)
    except exceptions.ClientRequestException as e:
        print(e.status_code)
        print(e.request_id)
        print(e.error_code)
        print(e.error_msg)


def deletePairKey():
    try:
        # delete
        request = NovaDeleteKeypairRequest()
        request.keypair_name = "chinaskills_keypair"
        response = client.nova_delete_keypair(request)
        print(response)
    except exceptions.ClientRequestException as e:
        print(e.status_code)
        print(e.request_id)
        print(e.error_code)
        print(e.error_msg)


def getKeyByName():
    try:
        request = NovaShowKeypairRequest()
        request.keypair_name = "chinaskills_keypair"
        response = client.nova_show_keypair(request)
        return response
    except exceptions.ClientRequestException as e:
        print(e.status_code)
        print(e.request_id)
        print(e.error_code)
        print(e.error_msg)
def create_volume():
    try:
        request = CreateVolumeRequest()
        volumebody = CreateVolumeOption(
            availability_zone="af-north-1a",
            multiattach=True,
            name="chinaskills_volume ",
            size=100,
            volume_type="SSD"
        )
        request.body = CreateVolumeRequestBody(
            volume=volumebody
        )
        response = client.create_volume(request)
        print(response)
        print('volume create')
    except exceptions.ClientRequestException as e:
        print(e.status_code)
        print(e.request_id)
        print(e.error_code)
        print(e.error_msg)


def delete_volume(volume_id):
    try:
        request = DeleteVolumeRequest()
        request.volume_id = volume_id
        response = client.delete_volume(request)
        print(response)
    except exceptions.ClientRequestException as e:
        print(e.status_code)
        print(e.request_id)
        print(e.error_code)
        print(e.error_msg)


def get_volume_id(name):
    try:
        request = ListVolumesRequest()
        request.name = name
        response = client.list_volumes(request)
        print(response)
        if response.to_dict()['count'] >= 1:
            print('volume exist')
            dict_volumes = response.to_dict()['volumes']
            id = dict_volumes[0]['id']
            return id
        else:
            return ' '

    except exceptions.ClientRequestException as e:
        print(e.status_code)
        print(e.request_id)
        print(e.error_code)
        print(e.error_msg)
def createserver(name,imageid):
    try:
        request = CreateServersRequest()
        rootVolumeServer = PrePaidServerRootVolume(
            volumetype="SSD"
        )
        listNicsServer = [
            PrePaidServerNic(
                subnet_id="ecf996a0-bf3e-4521-bbfd-3f70963cc23b"
            )
        ]
        serverbody = PrePaidServer(
            image_ref=imageid,
            flavor_ref="ac7.2xlarge.2",
            name=name,
            vpcid="518f3932-5828-4da4-8eab-0c8bf2e42dda",
            nics=listNicsServer,
            root_volume=rootVolumeServer
        )
        request.body = CreateServersRequestBody(
            server=serverbody
        )
        response = client.create_servers(request)
        print(response)
    except exceptions.ClientRequestException as e:
        print(e.status_code)
        print(e.request_id)
        print(e.error_code)
        print(e.error_msg)

def getserverbyname(name):
    try:
        request = ListServersDetailsRequest()
        request.name = name
        response = client.list_servers_details(request)
        print(response)
    except exceptions.ClientRequestException as e:
        print(e.status_code)
        print(e.request_id)
        print(e.error_code)
        print(e.error_msg)

def getAllServers():
    try:
        request = ListServersDetailsRequest()
        response = client.list_servers_details(request)
        print(response)
    except exceptions.ClientRequestException as e:
        print(e.status_code)
        print(e.request_id)
        print(e.error_code)
        print(e.error_msg)

def getServersIdByName(name):
    try:
        request = ListCloudServersRequest()
        request.name = name
        response = client.list_cloud_servers(request)
        print(response.to_dict()['servers'])
        return response.to_dict()['servers'][0]['id']
    except exceptions.ClientRequestException as e:
        print(e.status_code)
        print(e.request_id)
        print(e.error_code)
        print(e.error_msg)

def deleteServer(serverid):
    try:
        request = DeleteServersRequest()
        listServersbody = [
            ServerId(
                id=serverid
            )
        ]
        request.body = DeleteServersRequestBody(
            servers=listServersbody
        )
        response = client.delete_servers(request)
        print(response)
    except exceptions.ClientRequestException as e:
        print(e.status_code)
        print(e.request_id)
        print(e.error_code)
        print(e.error_msg)

if __name__ == "__main__":
    # The AK and SK used for authentication are hard-coded or stored in plaintext, which has great security risks. It is recommended that the AK and SK be stored in ciphertext in configuration files or environment variables and decrypted during use to ensure security.
    # In this example, AK and SK are stored in environment variables for authentication. Before running this example, set environment variables CLOUD_SDK_AK and CLOUD_SDK_SK in the local environment
    ak = "DEWQPLKHLDZKHBKRKTEM"
    sk = "Ww6lhO4rv0KplgLLCpCSgMqAwsjhpRN6XIN2K9xy"

    credentials = BasicCredentials(ak, sk)

    client = EcsClient.new_builder() \
        .with_credentials(credentials) \
        .with_region(EcsRegion.value_of("cn-north-9")) \
        .build()
    # createserver('test','6288ab88-3b7c-4320-9554-ad68a53eebb6')
    # 这里写命令的映射关系,和openstack的provier一样103行之前所有的代码都是从华为云的apiexplorer里找出来的
    parser = argparse.ArgumentParser(description='ECS Manager for Huawei Cloud')
    subparsers = parser.add_subparsers(dest='command')

    # Create ECS
    parser_create = subparsers.add_parser('create', help='Create an ECS instance')
    parser_create.add_argument('-i', '--input', type=json.loads, required=True,help='JSON formatted input for ECS details')

    # Get ECS by name
    parser_get = subparsers.add_parser('get', help='Get details of an ECS instance by name')
    parser_get.add_argument('-n', '--name', type=str, required=True, help='Name of the ECS instance')
    parser_get.add_argument('-o', '--output', type=str, help='Output file path (JSON format)')

    # Get all ECS instances
    parser_getall = subparsers.add_parser('getall', help='Get details of all ECS instances')
    parser_getall.add_argument('-o', '--output', type=str, help='Output file path (YAML format)')

    # Delete ECS by name
    parser_delete = subparsers.add_parser('delete', help='Delete an ECS instance by name')
    parser_delete.add_argument('-n', '--name', type=str, required=True, help='Name of the ECS instance')

    args = parser.parse_args()

    if args.command == 'create':
        server = createserver(args.input['name'],args.input['image'])
        if server:
            server = getserverbyname(args.input['name'])
            print(json.dumps(server.to_dict(), indent=2))

    elif args.command == 'get':
        server = getserverbyname(args.name)
        if server:
            server_info = server.to_dict()
            if args.output:
                print('write file')
                with open(args.output, 'w') as f:
                    json.dump(server_info, f, indent=2)
            else:
                print(json.dumps(server_info, indent=2))
        else:
            print(f"ECS instance with name {args.name} not found.")

    elif args.command == 'getall':
        print('get all servers')
        servers = getAllServers()
        servers_info = [server.to_dict() for server in servers]
        if args.output:
            with open(args.output, 'w') as f:
                yaml.dump(servers_info, f, default_flow_style=False)
        else:
            print(yaml.dump(servers_info, default_flow_style=False))

    elif args.command == 'delete':
        id = getserverbyname(args.name)
        deleteServer(id)

    else:
        parser.print_help()
# coding: utf-8
from fastapi import FastAPI, HTTPException, Body, Path
from huaweicloudsdkcore.auth.credentials import BasicCredentials
from huaweicloudsdkvpc.v2.region.vpc_region import VpcRegion
from huaweicloudsdkcore.exceptions import exceptions
from huaweicloudsdkvpc.v2 import *


# The AK and SK used for authentication are hard-coded or stored in plaintext, which has great security risks. It is recommended that the AK and SK be stored in ciphertext in configuration files or environment variables and decrypted during use to ensure security.
# In this example, AK and SK are stored in environment variables for authentication. Before running this example, set environment variables CLOUD_SDK_AK and CLOUD_SDK_SK in the local environment
ak = "DEWQPLKHLDZKHBKRKTEM"
sk = "Ww6lhO4rv0KplgLLCpCSgMqAwsjhpRN6XIN2K9xy"

credentials = BasicCredentials(ak, sk)

client = VpcClient.new_builder() \
    .with_credentials(credentials) \
    .with_region(VpcRegion.value_of("cn-east-3")) \
    .build()

# 初始化 FastAPI 应用
app = FastAPI()

def getVPCIdByName(name):
    try:
        request = ListVpcsRequest()
        response = client.list_vpcs(request)
        for vpc in response.to_dict()['vpcs']:
            if vpc['name'] == name:
                return vpc['id']
    except exceptions.ClientRequestException as e:
        print(e.status_code)
        print(e.request_id)
        print(e.error_code)
        print(e.error_msg)

# 封装创建 VPC 接口
@app.post("/cloud_vpc/create_vpc/")
async def create_vpc(vpc_data: dict = Body(...)):
    name = vpc_data.get("name")
    cidr = vpc_data.get("cidr")

    if not name or not cidr:
        raise HTTPException(status_code=400, detail="VPC 名称和 CIDR 不能为空")
    try:
        request = CreateVpcRequest()
        vpcbody = CreateVpcOption(
            cidr=cidr,
            name=name
        )
        request.body = CreateVpcRequestBody(
            vpc=vpcbody
        )
        response = client.create_vpc(request)
        print(response)
        return response.to_dict()
    except exceptions.ClientRequestException as e:
        print(e.status_code)
        print(e.request_id)
        print(e.error_code)
        print(e.error_msg)


# 封装单个 VPC 查询接口
@app.get("/cloud_vpc/vpc/{vpc_name}")
async def get_vpc(vpc_name: str = Path(...)):
    try:
        request = ListVpcsRequest()
        response = client.list_vpcs(request)
        for vpc in response.to_dict()['vpcs']:
            if vpc['name'] == vpc_name:
                return vpc
    except exceptions.ClientRequestException as e:
        print(e.status_code)
        print(e.request_id)
        print(e.error_code)
        print(e.error_msg)


# 封装服务查询接口
@app.get("/cloud_vpc/vpc/")
async def list_vpc():
    try:
        request = ListVpcsRequest()
        response = client.list_vpcs(request)
        print(response)
        return response.to_dict()
    except exceptions.ClientRequestException as e:
        print(e.status_code)
        print(e.request_id)
        print(e.error_code)
        print(e.error_msg)


# 封装更新 VPC 名称接口
@app.put("/cloud_vpc/update_vpc/")
async def update_vpc(update_data: dict = Body(...)):
    new_name = update_data.get("new_name")
    old_name = update_data.get("old_name")
    try:
        request = UpdateVpcRequest()
        request.vpc_id = getVPCIdByName(old_name)
        vpcbody = UpdateVpcOption(
            name= new_name
        )
        request.body = UpdateVpcRequestBody(
            vpc=vpcbody
        )
        response = client.update_vpc(request)
        print(response)
        return response.to_dict()
    except exceptions.ClientRequestException as e:
        print(e.status_code)
        print(e.request_id)
        print(e.error_code)
        print(e.error_msg)



# 封装删除 VPC 接口
@app.delete("/cloud_vpc/delete_vpc/")
async def delete_vpc(vpc_data: dict = Body(...)):
    vpc_name = vpc_data.get("vpc_name")
    id = getVPCIdByName(vpc_name)
    try:
        print('delete id = ', id)
        request = DeleteVpcRequest()
        request.vpc_id = id
        response = client.delete_vpc(request)
        print(response)
        return response.to_dict()
    except exceptions.ClientRequestException as e:
        print(e.status_code)
        print(e.request_id)
        print(e.error_code)
        print(e.error_msg)


# 启动 FastAPI 应用
if __name__ == "__main__":
    import uvicorn

    uvicorn.run(app, host="0.0.0.0", port=7045)

curl  -X   DELETE  http://192.168.100.10:7045/cloud_vpc/delete_vpc/ -H "Content-type: application/json"  -d '{"vpc_name":"test123"}'
 
curl  -X  POST http://192.168.100.10:7045/cloud_vpc/create_vpc/ -H "Content-type: application/json"  -d '{"name":"test123","cidr":"192.168.0.0/16"}'

curl  -X GET http://192.168.100.10:7045/cloud_vpc/vpc/vpcname

curl  -X  PUT  http://192.168.100.10:7045/cloud_vpc/update_vpc/ -H "Content-type: application/json"  -d '{"name":"test123","cidr":"192.168.0.0/16"}'

pip install fastapi uvicorn
【版权声明】本文为华为云社区用户原创内容,未经允许不得转载,如需转载请自行联系原作者进行授权。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。