华为云智能社区a
【摘要】 123
#!/bin/bash
rm -rf /etc/yum.repos.d/*
cat << EOF > /etc/yum.repos.d/centos.repo
[os]
name=Qcloud centos os - \$basearch
baseurl=http://mirrors.cloud.tencent.com/centos/\$releasever/os/\$basearch/
enabled=1
gpgcheck=1
gpgkey=http://mirrors.cloud.tencent.com/centos/RPM-GPG-KEY-CentOS-7
[updates]
name=Qcloud centos updates - \$basearch
baseurl=http://mirrors.cloud.tencent.com/centos/\$releasever/updates/\$basearch/
enabled=1
gpgcheck=1
gpgkey=http://mirrors.cloud.tencent.com/centos/RPM-GPG-KEY-CentOS-7
[extras]
name=Qcloud centos extras - \$basearch
baseurl=http://mirrors.cloud.tencent.com/centos/\$releasever/extras/\$basearch/
enabled=1
gpgcheck=1
gpgkey=http://mirrors.cloud.tencent.com/centos/RPM-GPG-KEY-CentOS-7
[epel]
name=EPEL for redhat/centos \$releasever - \$basearch
baseurl=http://mirrors.cloud.tencent.com/epel/\$releasever/\$basearch/
failovermethod=priority
enabled=1
gpgcheck=1
gpgkey=http://mirrors.cloud.tencent.com/epel/RPM-GPG-KEY-EPEL-7
EOF
yum install python3 -y
pip3 install -i https://mirrors.cloud.tencent.com/pypi/simple --upgrade pip
pip3 config set global.index-url https://mirrors.cloud.tencent.com/pypi/simple
pip3 install huaweicloudsdkcore huaweicloudsdkecs huaweicloudsdkvpc huaweicloudsdkswr huaweicloudsdkcce huaweicloudsdkrds huaweicloudsdkims uvicorn fastapi
from fastapi import FastAPI, Query, Body
from pydantic import BaseModel
import uvicorn
app = FastAPI()
class ItemModel(BaseModel):
name: str
age: int
@app.get("/items/{item_id}")
async def read_item(item_id: int, query: str = None):
return {"item_id": item_id, "query": query}
@app.get("/hello")
async def hello_world():
return {"Hello": "World"}
@app.put("/items/{item_id}")
async def update_item(item_id: int, item: ItemModel = Body(...)):
return {"item_name": item.name, "item_id": item_id}
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8010)
# coding: utf-8
import os
from huaweicloudsdkcore.auth.credentials import BasicCredentials
from huaweicloudsdkevs.v2.region.evs_region import EvsRegion
from huaweicloudsdkcore.exceptions import exceptions
from huaweicloudsdkevs.v2 import *
if __name__ == "__main__":
# 从环境变量中获取 AK 和 SK
ak = ""
sk = ""
if not ak or not sk:
print("请先设置环境变量 CLOUD_SDK_AK 和 CLOUD_SDK_SK")
exit(1)
credentials = BasicCredentials(ak, sk)
client = EvsClient.new_builder() \
.with_credentials(credentials) \
.with_region(EvsRegion.value_of("cn-north-4")) \
.build()
try:
# 列出所有卷
request = ListVolumesRequest()
response = client.list_volumes(request)
# 查找名为 chinaskills_volume 的卷
volume_id_to_delete = None
for volume in response.volumes:
if volume.name == "chinaskills_volume":
volume_id_to_delete = volume.id
print(f"找到卷: {volume.name},ID: {volume.id}")
break
# 如果找到对应的卷 ID,则删除该卷
if volume_id_to_delete:
delete_request = DeleteVolumeRequest(volume_id_to_delete)
client.delete_volume(delete_request)
print(f"卷 {volume_id_to_delete} 已删除")
else:
print("未找到名为 chinaskills_volume 的卷,无需删除")
request = CreateVolumeRequest()
listMetadataVolume = {
"__system__cmkid": "4f765277-a00e-4c3a-81f8-2cbaa0856e18",
"__system__encrypted]": "1"
}
volumebody = CreateVolumeOption(
availability_zone="cn-north-4a",
metadata=listMetadataVolume,
multiattach=True,
name="chinaskills_volume",
size=100,
volume_type="SSD"
)
request.body = CreateVolumeRequestBody(
volume=volumebody
)
response = client.create_volume(request)
print(response)
except exceptions.ClientRequestException as e:
print(f"HTTP状态码: {e.status_code}")
print(f"请求ID: {e.request_id}")
print(f"错误码: {e.error_code}")
print(f"错误信息: {e.error_msg}")
# coding: utf-8
from huaweicloudsdkcore.auth.credentials import BasicCredentials
from huaweicloudsdkkps.v3.region.kps_region import KpsRegion
from huaweicloudsdkcore.exceptions import exceptions
from huaweicloudsdkkps.v3 import *
credentials = BasicCredentials("", "") \
client = KpsClient.new_builder() \
.with_credentials(credentials) \
.with_region(KpsRegion.value_of("cn-north-4")) \
.build()
def create_key(name):
request = ListKeypairsRequest()
response = client.list_keypairs(request).to_json_object()
for i in response['keypairs']:
if i['keypair']['name'] == name:
request = DeleteKeypairRequest()
request.keypair_name = name
response = client.delete_keypair(request)
try:
request = CreateKeypairRequest()
encryptionKeyProtection = Encryption(
type="default"
)
keyProtectionKeypair = KeyProtection(
encryption=encryptionKeyProtection
)
keypairbody = CreateKeypairAction(
name=name,
key_protection=keyProtectionKeypair
)
request.body = CreateKeypairRequestBody(
keypair=keypairbody
)
response = client.create_keypair(request)
print(response)
except exceptions.ClientRequestException as e:
print(e.status_code)
print(e.request_id)
print(e.error_code)
print(e.error_msg)
if __name__ == "__main__":
create_key('chinaskills_keypair')
import os
import json
import time
import argparse
from huaweicloudsdkcore.auth.credentials import BasicCredentials
from huaweicloudsdkecs.v2.region.ecs_region import EcsRegion
from huaweicloudsdkecs.v2 import *
ak = ""
sk = ""
cloud_vpc_id = ""
cloud_subnet_id = ""
credentials = BasicCredentials(ak, sk)
client = EcsClient.new_builder() \
.with_credentials(credentials) \
.with_region(EcsRegion.value_of("cn-north-4")) \
.build()
def create_ecs(name, image_id):
request = CreatePostPaidServersRequest()
rootVolumeServer = PostPaidServerRootVolume(volumetype="SSD")
listNicsServer = [PostPaidServerNic(subnet_id=cloud_subnet_id)]
serverbody = PostPaidServer(
flavor_ref="c7.large.2",
image_ref=image_id,
name=name,
nics=listNicsServer,
root_volume=rootVolumeServer,
vpcid=cloud_vpc_id
)
request.body = CreatePostPaidServersRequestBody(server=serverbody)
response = client.create_post_paid_servers(request)
print(response.to_dict())
def get(name, output_file=None):
max_retries = 5
wait_time = 5
for _ in range(max_retries):
request = ListServersDetailsRequest()
response = client.list_servers_details(request)
servers = response.to_dict().get('servers', [])
for server in servers:
if server['name'] == name:
print(f"服务器名称: {server['name']}, 服务器ID: {server['id']}")
server_dict = server.to_dict() if hasattr(server, 'to_dict') else dict(server)
if output_file:
try:
with open(output_file, 'w') as f:
json.dump(server_dict, f, indent=4, default=str)
print(f"服务器信息已保存到 {output_file}")
except Exception as e:
print(f"保存文件时发生错误: {e}")
return
print(f"未找到名称为 {name} 的服务器,等待 {wait_time} 秒后重试...")
time.sleep(wait_time)
print(f"未能找到名称为 {name} 的服务器,已达到最大重试次数。")
def getall(output_file=None):
request = ListServersDetailsRequest()
response = client.list_servers_details(request)
servers = response.to_dict().get('servers', [])
serializable_servers = []
for server in servers:
server_dict = server.to_dict() if hasattr(server, 'to_dict') else dict(server)
serializable_servers.append(server_dict)
if output_file:
try:
with open(output_file, 'w') as f:
json.dump(serializable_servers, f, indent=4, default=str)
print(f"所有服务器信息已保存到 {output_file}")
except Exception as e:
print(f"保存文件时发生错误: {e}")
else:
print(serializable_servers)
def delete(name):
request = ListServersDetailsRequest()
response = client.list_servers_details(request)
servers = response.to_dict().get('servers', [])
for server in servers:
if server['name'] == name:
print(f"服务器名称: {server['name']}, 服务器ID: {server['id']}")
request = DeleteServersRequest()
listServersbody = [ServerId(id=server['id'])]
request.body = DeleteServersRequestBody(servers=listServersbody)
response = client.delete_servers(request)
print(response)
return
print(f"没有找到名称为 {name} 的服务器。")
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="ECS 云主机管理")
subparsers = parser.add_subparsers(dest="command")
create_parser = subparsers.add_parser("create")
create_parser.add_argument("-i", "--input", required=True, help="云主机名称和镜像 ID,JSON 格式")
get_parser = subparsers.add_parser("get")
get_parser.add_argument("-n", "--name", required=True, help="指定要查询的云主机名称")
get_parser.add_argument("-o", "--output", help="输出文件名,格式为 JSON")
getall_parser = subparsers.add_parser("getall")
getall_parser.add_argument("-o", "--output", help="输出文件名,格式为 JSON")
delete_parser = subparsers.add_parser("delete")
delete_parser.add_argument("-n", "--name", required=True, help="指定要删除的云主机名称")
args = parser.parse_args()
if args.command == "create":
input_data = json.loads(args.input)
create_ecs(input_data['name'], input_data['imagename'])
elif args.command == "get":
get(args.name, args.output)
elif args.command == "getall":
getall(args.output)
elif args.command == "delete":
delete(args.name)
from fastapi import FastAPI
import uvicorn
from huaweicloudsdkcore.auth.credentials import BasicCredentials
from huaweicloudsdkvpc.v2.region.vpc_region import VpcRegion
from huaweicloudsdkcore.exceptions import exceptions
from huaweicloudsdkvpc.v2 import *
app = FastAPI()
ak = ""
sk = ""
g_region = "cn-north-4"
def api_list_vpc(name):
credentials = BasicCredentials(ak, sk)
client = VpcClient.new_builder() \
.with_credentials(credentials) \
.with_region(VpcRegion.value_of(g_region)) \
.build()
try:
request = ListVpcsRequest()
response = list(map(lambda x: {'name': x['name'], 'id': x['id']}, client.list_vpcs(request).to_dict()['vpcs']))
for i in response:
if i['name'] == name:
print(i['id'])
return i['id']
return None
except Exception:
return None
@app.post('/cloud_vpc/create_vpc')
def api_create_vpc(data: dict):
credentials = BasicCredentials(ak, sk)
client = VpcClient.new_builder() \
.with_credentials(credentials) \
.with_region(VpcRegion.value_of(g_region)) \
.build()
try:
request = CreateVpcRequest()
vpcbody = CreateVpcOption(
cidr=data['cidr'],
name=data['name']
)
request.body = CreateVpcRequestBody(
vpc=vpcbody
)
response = client.create_vpc(request)
print(response)
return response
except exceptions.ClientRequestException as e:
print(e.status_code)
print(e.request_id)
print(e.error_code)
print(e.error_msg)
return e
except Exception as e:
return e
@app.get('/cloud_vpc/vpc/{vpc_name}')
def api_get_vpc(vpc_name: str):
credentials = BasicCredentials(ak, sk)
client = VpcClient.new_builder() \
.with_credentials(credentials) \
.with_region(VpcRegion.value_of(g_region)) \
.build()
try:
request = ShowVpcRequest()
request.vpc_id = api_list_vpc(vpc_name)
response = client.show_vpc(request)
print(response)
return response
except exceptions.ClientRequestException as e:
print(e.status_code)
print(e.request_id)
print(e.error_code)
print(e.error_msg)
return e
except Exception as e:
return e
@app.get('/cloud_vpc/vpc')
def api_get_list_vpc():
credentials = BasicCredentials(ak, sk)
client = VpcClient.new_builder() \
.with_credentials(credentials) \
.with_region(VpcRegion.value_of(g_region)) \
.build()
try:
request = ListVpcsRequest()
response = client.list_vpcs(request).to_dict()
print(response)
return response
except Exception as e:
return e
@app.put('/cloud_vpc/update_vpc')
def api_update_vpc(data: dict):
credentials = BasicCredentials(ak, sk)
client = VpcClient.new_builder() \
.with_credentials(credentials) \
.with_region(VpcRegion.value_of(g_region)) \
.build()
try:
request = UpdateVpcRequest()
request.vpc_id = api_list_vpc(data['old_name'])
vpcbody = UpdateVpcOption(
name=data['new_name']
)
request.body = UpdateVpcRequestBody(
vpc=vpcbody
)
response = client.update_vpc(request)
print(response)
return response.to_dict()
except Exception as e:
return e
@app.delete('/cloud_vpc/delete_vpc')
def api_delete_vpc(data: dict):
credentials = BasicCredentials(ak, sk)
client = VpcClient.new_builder() \
.with_credentials(credentials) \
.with_region(VpcRegion.value_of(g_region)) \
.build()
try:
request = DeleteVpcRequest()
request.vpc_id = api_list_vpc(data['vpc_name'])
response = client.delete_vpc(request)
print(response)
return response.to_dict()
except Exception as e:
return e
if __name__ == '__main__':
uvicorn.run(app='main:app', host='0.0.0.0', port=7045, reload=True)
【版权声明】本文为华为云社区用户原创内容,未经允许不得转载,如需转载请自行联系原作者进行授权。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱:
cloudbbs@huaweicloud.com
- 点赞
- 收藏
- 关注作者
评论(0)