使用python调用api案例
requirements
bcrypt==3.2.2
certifi==2022.6.15
cffi==1.15.0
charset-normalizer==2.0.12
cryptography==37.0.3
huaweicloudsdkcore==3.0.94
huaweicloudsdkecs==3.0.94
huaweicloudsdkeip==3.0.94
huaweicloudsdkvpc==3.0.94
idna==3.3
paramiko==2.11.0
pycparser==2.21
PyNaCl==1.5.0
PyYAML==6.0
requests==2.28.0
requests-futures==1.0.0
requests-toolbelt==0.9.1
simplejson==3.17.0
six==1.16.0
urllib3==1.26.9
python
# coding: utf-8
# Detect eIP and automatically configure SNAT
# python3 -m venv .venv
# source .venv/bin/activate
# python -m pip install -i https://pypi.tuna.tsinghua.edu.cn/simple --upgrade pip
# pip config set global.index-url https://pypi.tuna.tsinghua.edu.cn/simple
# pip install -r requirement.txt
# 非洲-约翰内斯堡 af-south-1
# 华北-北京四 cn-north-4
# 华北-北京一 cn-north-1
# 华北-乌兰察布一 cn-north-9
# 华东-上海二 cn-east-2
# 华东-上海一 cn-east-3
# 华南-广州 cn-south-1
# 华南-广州-友好用户环境 cn-south-4
# 华南-深圳 cn-south-2
# 拉美-墨西哥城一 na-mexico-1
# 拉美-圣地亚哥 la-south-2
# 欧洲-巴黎 eu-west-0
# 西南-贵阳一 cn-southwest-2
# 亚太-曼谷 ap-southeast-2
# 亚太-新加坡 ap-southeast-3
# 中国-香港 ap-southeast-1
import json
import time
import paramiko
from huaweicloudsdkcore.auth.credentials import BasicCredentials
from huaweicloudsdkcore.exceptions import exceptions
from huaweicloudsdkeip.v3.region.eip_region import EipRegion
from huaweicloudsdkeip.v3 import *
from huaweicloudsdkecs.v2.region.ecs_region import EcsRegion
from huaweicloudsdkecs.v2 import *
from huaweicloudsdkvpc.v2.region.vpc_region import VpcRegion
from huaweicloudsdkvpc.v2 import *
# 获取EIP的详细信息
def get_eip_details():
credentials = BasicCredentials(ak, sk)
client = EipClient.new_builder() \
.with_credentials(credentials) \
.with_region(EipRegion.value_of(region)) \
.build()
try:
request = ShowPublicipRequest()
request.publicip_id = eip_id
response = client.show_publicip(request)
response = json.loads(str(response))
return response
except:
return False
# 根据实例ID和网卡ID获取私网ip
def get_private_ip(parameters):
credentials = BasicCredentials(ak, sk)
client = EcsClient.new_builder() \
.with_credentials(credentials) \
.with_region(EcsRegion.value_of("cn-east-3")) \
.build()
try:
request = ListServerInterfacesRequest()
request.server_id = parameters["device_id"]
response = client.list_server_interfaces(request)
response = json.loads(str(response))
for i in range(len(response["interfaceAttachments"])):
if response["interfaceAttachments"][i]["port_id"] == parameters["port_id"]:
return response["interfaceAttachments"][i]["fixed_ips"][0]["ip_address"]
return False
except:
return False
# ssh登录器
def ssh_client(ip, cli):
# 建立一个sshclient对象
ssh = paramiko.SSHClient()
# 允许将信任的主机自动加入到host_allow 列表,此方法必须放在connect方法的前面
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
# 调用connect方法连接服务器
ssh.connect(hostname=ip, port=22, username=user, password=passwd)
# 执行命令
ssh.exec_command(cli)
ssh.close()
# 更新路由表路由
def update_route_table(news):
credentials = BasicCredentials(ak, sk)
client = VpcClient.new_builder() \
.with_credentials(credentials) \
.with_region(VpcRegion.value_of(region)) \
.build()
try:
request = UpdateRouteTableRequest()
request.routetable_id = routetable_id
list_update_route_table_req_routes_routes = [{"type": "ecs", "destination": "0.0.0.0/0",
"nexthop": news["device_id"]}]
list_update_route_table_req_routes_routetable = {
"mod": list_update_route_table_req_routes_routes
}
rotatableUpdateRouteTableReq = UpdateRouteTableReq(
routes=list_update_route_table_req_routes_routetable
)
request.body = UpdateRoutetableReqBody(
routetable=rotatableUpdateRouteTableReq
)
response = client.update_route_table(request)
except exceptions.ClientRequestException as e:
print(e.status_code)
print(e.request_id)
print(e.error_code)
print(e.error_msg)
return False
# 设置snat
def snat_rule(defaults, news):
del_rule_ip = get_private_ip(defaults)
add_rule_ip = get_private_ip(news)
add_cli = "echo net.ipv4.ip_forward = 1 >> /etc/sysctl.conf && sysctl -p && iptables -t nat -A POSTROUTING -s {} " \
"-j SNAT --to {} ".format(subnet, add_rule_ip)
del_cli = "iptables -t nat -F POSTROUTING && sed -i '$d' /etc/sysctl.conf && echo net.ipv4.ip_forward = 0 >> " \
"/etc/sysctl.conf && sysctl -p && sed -i '$d' /etc/sysctl.conf "
ssh_client(del_rule_ip, del_cli)
ssh_client(add_rule_ip, add_cli)
update_route_table(news)
print("{}\t更新路由".format(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())))
# 检查eip的绑定信息
def detection():
while True:
new_port_id = {}
global default_port_id
while True:
eip = get_eip_details()
if not eip:
print("{}\tEIP详细信息读取失败".format(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())))
time.sleep(5)
else:
break
if eip["publicip"]["status"] == "ACTIVE":
new_port_id["port_id"] = eip["publicip"]["vnic"]["port_id"]
new_port_id["device_id"] = eip["publicip"]["vnic"]["device_id"]
if default_port_id:
if default_port_id["port_id"] != new_port_id["port_id"]:
print("{}\tEIP发生切换".format(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())))
snat_rule(default_port_id, new_port_id)
default_port_id = new_port_id
else:
print("{}\tEIP无变动".format(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())))
else:
default_port_id = new_port_id
else:
print("{}\tEIP没有绑定".format(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())))
time.sleep(time_sleep)
if __name__ == "__main__":
# 设置账户信息
ak = "B5O5XN7UESKTDWOIZT0N"
sk = "HZkdDpPCgFMBsKDuZvLNm3csOOWQqKqQc5BCx21R"
region = "cn-east-3"
# 设置监控的eip
eip_id = "319782ac-a7ed-4d06-acbc-95f9f002c53b"
# 设置服务器统一用户密码
user = "root"
passwd = "zhenxing-100"
# 设置路由网段
subnet = "192.168.0.0/24"
routetable_id = "3d03fd3f-82af-4683-bfd8-6f352750d71a"
# 设置脚本循环时间
time_sleep = 3
default_port_id = {}
detection()
- 点赞
- 收藏
- 关注作者
评论(0)