自动升级
【摘要】
自动升级
# coding=utf-8""" @Project :pachong-master @File :common.py @Author :gaojs @Date :2022/7/15 14:01 @Blogs : https://www.gaojs.com.cn"""i...
自动升级
-
# coding=utf-8
-
"""
-
@Project :pachong-master
-
@File :common.py
-
@Author :gaojs
-
@Date :2022/7/15 14:01
-
@Blogs : https://www.gaojs.com.cn
-
"""
-
import sys
-
import logging
-
import time
-
import paramiko
-
from time import sleep
-
import re
-
import requests
-
import schedule
-
from faker import Factory
-
from selenium import webdriver
-
from selenium.webdriver.chrome.options import Options
-
-
-
sys.setrecursionlimit(5000)
-
-
-
class CLI:
-
-
def ssh_ag(self, hostname='192.168.120.6', port=22, username='array', password='admin'):
-
"""
-
-
:param self:
-
:return:
-
"""
-
# 创建ssh对象
-
self.fin = open('log.txt', 'w')
-
self.ssh = paramiko.SSHClient()
-
self.logging = logging
-
# 允许连接不在know_hosts文件中的主机
-
self.ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy)
-
# 连接AG
-
self.ssh.connect(hostname=hostname, port=port, username=username, password=password)
-
sleep(5)
-
channel = self.ssh.invoke_shell()
-
self.channel = channel
-
channel.settimeout(5)
-
output = channel.recv(2048).decode('utf-8')
-
time.sleep(1)
-
self.fin.write(output)
-
self.cli_cmd('enable')
-
self.cli_cmd('')
-
self.cli_cmd('config ter')
-
self.cli_cmd('no page')
-
-
def print_step(self):
-
"""
-
-
:return:
-
"""
-
result = self.channel.recv(2048)
-
print(result.decode())
-
-
def cli_cmd(self, cli, prompt="[#\$]", timeout=3):
-
"""
-
-
:param cli:
-
:return:
-
"""
-
output = ''
-
self.logging.info("AG execute command in enable mode: " + cli)
-
self.channel.send(cli + '\n')
-
output = self.read_until(prompt, timeout)
-
return output
-
-
def quit_enable(self):
-
"""
-
-
:return:
-
"""
-
self.cli_cmd('switch global')
-
self.cli_cmd('exit')
-
-
def clear_log(self):
-
"""
-
-
:return:
-
"""
-
sleep(2)
-
self.cli_cmd('clear log b')
-
-
def read_until(self, expected, timeout=10):
-
"""
-
等待回显:这个方法是从同事那里偷来的哈哈哈
-
:return:
-
"""
-
output = ''
-
regexp = re.compile(expected)
-
max_time = time.time() + timeout
-
i = 0
-
while time.time() < max_time:
-
i = i + 1
-
tmp = ""
-
try:
-
tmp = self.channel.recv(1024).decode('utf-8')
-
except:
-
pass
-
self.fin.write(tmp)
-
self.fin.flush()
-
output += tmp
-
if regexp.search(output):
-
return output
-
return output
-
-
def switch_vsite(self, vsite):
-
"""
-
切换虚拟站点
-
:param vsite:
-
:return:
-
"""
-
self.cli_cmd('sw %s' % vsite)
-
-
def get_sn(self):
-
"""
-
获取sn码
-
:return:
-
"""
-
# with open('log.txt', mode='r') as fin:
-
# resp = fin.read()
-
resp = self.cli_cmd('show version')
-
result = re.compile('Serial Number : [A-Z0-9]{31}')
-
sn = result.findall(resp)[0]
-
sn_number = sn.split(':')[1]
-
# print(sn_number)
-
return sn_number
-
-
def create_test_password(self, sn):
-
"""
-
生成test用户密码
-
"""
-
url = 'http://10.3.0.50/cgi-bin/passwd_res'
-
f = Factory.create()
-
ua = f.user_agent()
-
headers = {
-
'User-Agent': ua
-
}
-
data = {
-
'serial': sn
-
}
-
rsp = requests.post(url=url, headers=headers, data=data)
-
passwd = re.findall('password: (.*?)</pre>', rsp.text)[0]
-
print(passwd)
-
return passwd
-
-
def update_ag(self, latest_builds):
-
"""
-
自动更新uag
-
"""
-
self.cli_cmd('config terminal')
-
self.cli_cmd(f'system update "http://192.168.120.204/builds/netIAG/{latest_builds}"')
-
time.sleep(1800)
-
-
def get_current_release(self):
-
"""
-
获取当前版本号
-
"""
-
resp = self.cli_cmd('show version')
-
pattern = re.compile(r'Rel.NetIAG.[0-9].[0-9].[0-9].[0-9][0-9]')
-
curr_release = re.findall(pattern, resp)[0]
-
curr_release = curr_release.replace('.', '_')
-
print(f'***************** 您当前AG的版本是 {curr_release} ***********************')
-
return curr_release
-
-
def get_latest_release(self):
-
"""
-
获取最新release
-
"""
-
chrome_options = Options()
-
chrome_options.add_argument('--headless')
-
chrome_options.add_argument('--disable-gpu')
-
driver = webdriver.Chrome(options=chrome_options)
-
driver.get('http://192.168.120.204/builds/netIAG/')
-
driver.maximize_window()
-
# 获取最新版本号
-
latest_release = driver.find_element_by_xpath('//tbody/tr[last()-1]/td[2]').text
-
print(f'***************** 204 server上最新的netiag版本是 {latest_release} ***********************')
-
return latest_release
-
-
def auto_update_netiag(self):
-
"""
-
自动升级netiag脚本
-
"""
-
# 获取204server上最新版本号
-
server_latest_release = self.get_latest_release()
-
curr_release = self.get_current_release()
-
current_release_name = 'nodebug-' + curr_release + '.click'
-
# 如果最新版本不和获取的版本一致,则升级
-
if server_latest_release not in (current_release_name):
-
print(current_release_name)
-
self.cli_cmd(f'system update "http://192.168.120.204/builds/netIAG/{server_latest_release}"')
-
self.cli_cmd('YES')
-
print(f'**************************** 正在升级 {server_latest_release} 版本的netiag,请耐心等候 *****************************')
-
time.sleep(1200)
-
print(f'**************** 当前版本 {server_latest_release} 已经是最新版本 **********************')
-
-
-
# 定时任务:每天晚上七点准时升级版本
-
test = CLI()
-
test.ssh_ag(hostname='192.168.120.7')
-
schedule.every().day.at("19:00").do(test.auto_update_netiag)
-
while True:
-
# 启动任务
-
schedule.run_pending()
-
time.sleep(1)
-
-
# if __name__ == '__main__':
-
# """
-
# 自动获取test密码
-
# """
-
# # ag_ip = input('请输入您AG管理IP:')
-
# # test = CLI()
-
# # test.ssh_ag(hostname=ag_ip)
-
# # sn_number = test.get_sn()
-
# # passwd = test.create_test_password(sn_number)
-
# # print(f'*********************** 您的test账户密码是 {passwd} ************************\n')
文章来源: blog.csdn.net,作者:懿曲折扇情,版权归原作者所有,如需转载,请联系作者。
原文链接:blog.csdn.net/qq_41332844/article/details/126837451
【版权声明】本文为华为云社区用户转载文章,如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱:
cloudbbs@huaweicloud.com
- 点赞
- 收藏
- 关注作者
评论(0)