network_fastapi

举报
玩个球球 发表于 2024/03/06 11:24:43 2024/03/06
【摘要】 Python库。(1)封装创建接口,路径为/cloud_vpc/create_vpc,参数格式为{"name": "","cidr": ""},name为VPC名称,cidr为虚拟私有云下可用子网的范围;方法为POST,返回数据类型为JSON类型。(2)封装单个VPC查询接口,路径为/cloud_vpc/vpc/{vpc_name},路径中{vpc_name}为待查询VPC;方法为GET,返...

Python库。
(1)封装创建接口,路径为/cloud_vpc/create_vpc,参数格式为{"name": "","cidr": ""},name为VPC名称,cidr为虚拟私有云下可用子网的范围;方法为POST,返回数据类型为JSON类型。
(2)封装单个VPC查询接口,路径为/cloud_vpc/vpc/{vpc_name},路径中{vpc_name}为待查询VPC;方法为GET,返回数据类型为JSON类型。
(3)封装服务查询接口,要求可以查询到所有VPC,路径为/cloud_vpc/vpc;方法为GET,返回的数据类型JSON类型的列表类型。
(4)封装更新VPN名称接口,路径为/cloud_vpc/update_vpc,参数格式为{"new_name": "","old_name": ""},new_name为VPC新名称,old_name为待删除的VPN名称;方法为PUT,返回JSON类型的更新后数据。
(5)封装删除接口,接口名为/cloud_vpc/delete_vpc,参数格式为{"vpc_name": ""},vpc_name为VPC新名称;方法为DELETE,返回JSON类型的删除后的数据。
功能编写完成后,手工启动Restful服务,再提交检测。

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# @author YWLBTWTK
# @date 2023/9/18
from fastapi import FastAPI
import uvicorn
from huaweicloudsdkcore.auth.credentials import BasicCredentials
from huaweicloudsdkvpc.v2.region.vpc_region import VpcRegion
from huaweicloudsdkcore.exceptions import exceptions
from huaweicloudsdkvpc.v2 import *

app = FastAPI()
ak = ""
sk = ""
g_region = "cn-north-4"


def api_list_vpc(name):
    credentials = BasicCredentials(ak, sk)
    client = VpcClient.new_builder() \
        .with_credentials(credentials) \
        .with_region(VpcRegion.value_of(g_region)) \
        .build()

    try:
        request = ListVpcsRequest()
        response = list(map(lambda x: {'name': x['name'], 'id': x['id']}, client.list_vpcs(request).to_dict()['vpcs']))
        for i in response:
            if i['name'] == name:
                print(i['id'])
                return i['id']
        return None
    except Exception:
        return None


@app.post('/cloud_vpc/create_vpc')
def api_create_vpc(data: dict):
    credentials = BasicCredentials(ak, sk)
    client = VpcClient.new_builder() \
        .with_credentials(credentials) \
        .with_region(VpcRegion.value_of(g_region)) \
        .build()
    try:
        request = CreateVpcRequest()
        vpcbody = CreateVpcOption(
            cidr=data['cidr'],
            name=data['name']
        )
        request.body = CreateVpcRequestBody(
            vpc=vpcbody
        )
        response = client.create_vpc(request)
        print(response)
        return response
    except exceptions.ClientRequestException as e:
        print(e.status_code)
        print(e.request_id)
        print(e.error_code)
        print(e.error_msg)
        return e
    except Exception as e:
        return e


@app.get('/cloud_vpc/vpc/{vpc_name}')
def api_get_vpc(vpc_name: str):
    credentials = BasicCredentials(ak, sk)
    client = VpcClient.new_builder() \
        .with_credentials(credentials) \
        .with_region(VpcRegion.value_of(g_region)) \
        .build()
    try:
        request = ShowVpcRequest()
        request.vpc_id = api_list_vpc(vpc_name)
        response = client.show_vpc(request)
        print(response)
        return response
    except exceptions.ClientRequestException as e:
        print(e.status_code)
        print(e.request_id)
        print(e.error_code)
        print(e.error_msg)
        return e
    except Exception as e:
        return e


@app.get('/cloud_vpc/vpc')
def api_get_list_vpc():
    credentials = BasicCredentials(ak, sk)
    client = VpcClient.new_builder() \
        .with_credentials(credentials) \
        .with_region(VpcRegion.value_of(g_region)) \
        .build()
    try:
        request = ListVpcsRequest()
        response = client.list_vpcs(request).to_dict()
        print(response)
        return response
    except Exception as e:
        return e


@app.put('/cloud_vpc/update_vpc')
def api_update_vpc(data: dict):
    credentials = BasicCredentials(ak, sk)
    client = VpcClient.new_builder() \
        .with_credentials(credentials) \
        .with_region(VpcRegion.value_of(g_region)) \
        .build()
    try:
        request = UpdateVpcRequest()
        request.vpc_id = api_list_vpc(data['old_name'])
        vpcbody = UpdateVpcOption(
            name=data['new_name']
        )
        request.body = UpdateVpcRequestBody(
            vpc=vpcbody
        )
        response = client.update_vpc(request)
        print(response)
        return response.to_dict()
    except Exception as e:
        return e


@app.delete('/cloud_vpc/delete_vpc')
def api_delete_vpc(data: dict):
    credentials = BasicCredentials(ak, sk)
    client = VpcClient.new_builder() \
        .with_credentials(credentials) \
        .with_region(VpcRegion.value_of(g_region)) \
        .build()

    try:
        request = DeleteVpcRequest()
        request.vpc_id = api_list_vpc(data['vpc_name'])
        response = client.delete_vpc(request)
        print(response)
        return response.to_dict()
    except Exception as e:
        return e


if __name__ == '__main__':
    uvicorn.run(app='main:app', host='0.0.0.0', port=7045, reload=True)
【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。