ssh操作及接受响应,自动获取test密码
【摘要】
一、ssh操作及接受响应
common.py
# coding=utf-8""" @Project :pachong-master @File :common.py @Author :gaojs @Date :2022/7/9 14:01 @Blogs : https://...
一、ssh操作及接受响应
-
# coding=utf-8
-
"""
-
@Project :pachong-master
-
@File :common.py
-
@Author :gaojs
-
@Date :2022/7/9 14:01
-
@Blogs : https://www.gaojs.com.cn
-
"""
-
import sys
-
-
import logging
-
import time
-
-
import paramiko
-
from time import sleep
-
import re
-
-
import requests
-
from faker import Factory
-
-
-
sys.setrecursionlimit(5000)
-
-
-
class CLI:
-
-
def ssh_ag(self, hostname='192.168.120.209', port=22, username='array', password='admin'):
-
"""
-
-
:param self:
-
:return:
-
"""
-
# 创建ssh对象
-
self.fin = open('log.txt', 'w')
-
self.ssh = paramiko.SSHClient()
-
self.logging = logging
-
# 允许连接不在know_hosts文件中的主机
-
self.ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy)
-
# 连接AG
-
self.ssh.connect(hostname=hostname, port=port, username=username, password=password)
-
sleep(5)
-
channel = self.ssh.invoke_shell()
-
self.channel = channel
-
channel.settimeout(5)
-
output = channel.recv(2048).decode('utf-8')
-
time.sleep(1)
-
self.fin.write(output)
-
self.cli_cmd('enable')
-
self.cli_cmd('')
-
self.cli_cmd('config ter')
-
self.cli_cmd('no page')
-
-
def print_step(self):
-
"""
-
-
:return:
-
"""
-
result = self.channel.recv(2048)
-
print(result.decode())
-
-
def cli_cmd(self, cli, prompt="[#\$]", timeout=3):
-
"""
-
-
:param cli:
-
:return:
-
"""
-
output = ''
-
self.logging.info("AG execute command in enable mode: " + cli)
-
self.channel.send(cli + '\n')
-
output = self.read_until(prompt, timeout)
-
return output
-
-
def quit_enable(self):
-
"""
-
-
:return:
-
"""
-
self.cli_cmd('switch global')
-
self.cli_cmd('exit')
-
-
def clear_log(self):
-
"""
-
-
:return:
-
"""
-
sleep(2)
-
self.cli_cmd('clear log b')
-
-
def read_until(self, expected, timeout=10):
-
"""
-
等待回显:这个方法是从同事那里偷来的哈哈哈
-
:return:
-
"""
-
output = ''
-
regexp = re.compile(expected)
-
max_time = time.time() + timeout
-
i = 0
-
while time.time() < max_time:
-
i = i + 1
-
tmp = ""
-
try:
-
tmp = self.channel.recv(1024).decode('utf-8')
-
except:
-
pass
-
self.fin.write(tmp)
-
self.fin.flush()
-
output += tmp
-
if regexp.search(output):
-
return output
-
return output
-
-
def switch_vsite(self, vsite):
-
"""
-
切换虚拟站点
-
:param vsite:
-
:return:
-
"""
-
self.cli_cmd('sw %s' % vsite)
-
-
def get_sn(self):
-
"""
-
获取sn码
-
:return:
-
"""
-
# with open('log.txt', mode='r') as fin:
-
# resp = fin.read()
-
resp = self.cli_cmd('show version')
-
result = re.compile('Serial Number : [A-Z0-9]{31}')
-
sn = result.findall(resp)[0]
-
sn_number = sn.split(':')[1]
-
# print(sn_number)
-
return sn_number
-
-
def create_test_password(self, sn):
-
"""
-
生成test用户密码
-
"""
-
url = 'http://10.3.0.50/cgi-bin/passwd_res'
-
f = Factory.create()
-
ua = f.user_agent()
-
headers = {
-
'User-Agent': ua
-
}
-
data = {
-
'serial': sn
-
}
-
rsp = requests.post(url=url, headers=headers, data=data)
-
passwd = re.findall('password: (.*?)</pre>', rsp.text)[0]
-
print(passwd)
-
return passwd
-
-
-
if __name__ == '__main__':
-
"""
-
自动获取test密码
-
"""
-
ag_ip = input('请输入您AG管理IP:')
-
test = CLI()
-
test.ssh_ag(hostname=ag_ip)
-
sn_number = test.get_sn()
-
passwd = test.create_test_password(sn_number)
-
print(f'*********************** 您的test账户密码是 {passwd} ************************\n')
2.云服务器操作
-
# coding=utf-8
-
"""
-
@Project :pachong-master
-
@File :test01.py
-
@Author :gaojs
-
@Date :2022/7/9 15:40
-
@Blogs : https://www.gaojs.com.cn
-
"""
-
-
import sys
-
-
import logging
-
import time
-
-
import paramiko
-
from time import sleep
-
import re
-
-
-
sys.setrecursionlimit(5000)
-
-
-
class CLI:
-
-
def ssh_ag(self, hostname='101.4x.39.xxx', port=22, username='root', password='xxxxxx'):
-
"""
-
-
:param self:
-
:return:
-
"""
-
# 创建ssh对象
-
self.fin = open('log.txt', 'w+')
-
self.ssh = paramiko.SSHClient()
-
self.logging = logging
-
# 允许连接不在know_hosts文件中的主机
-
self.ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy)
-
# 连接AG
-
self.ssh.connect(hostname=hostname, port=port, username=username, password=password)
-
sleep(5)
-
channel = self.ssh.invoke_shell()
-
self.channel = channel
-
channel.settimeout(5)
-
output = channel.recv(2048).decode('utf-8')
-
time.sleep(1)
-
self.fin.write(output)
-
-
def print_step(self):
-
"""
-
-
:return:
-
"""
-
result = self.channel.recv(2048)
-
print(result.decode())
-
-
def cli_cmd(self, cli, prompt="[#\$]", timeout=3):
-
"""
-
-
:param cli:
-
:return:
-
"""
-
output = ''
-
self.logging.info("cloud server execute command in enable mode: " + cli)
-
self.channel.send(cli + '\n')
-
output = self.read_until(prompt, timeout)
-
return output
-
-
def read_until(self, expected, timeout=10):
-
"""
-
等待回显
-
:return:
-
"""
-
output = ''
-
regexp = re.compile(expected)
-
max_time = time.time() + timeout
-
i = 0
-
while time.time() < max_time:
-
i = i + 1
-
tmp = ""
-
try:
-
tmp = self.channel.recv(1024).decode('utf-8')
-
except:
-
pass
-
self.fin.write(tmp)
-
self.fin.flush()
-
output += tmp
-
if regexp.search(output):
-
return output
-
return output
-
-
-
if __name__ == '__main__':
-
"""
-
自动获取test密码
-
"""
-
test = CLI()
-
test.ssh_ag()
-
test.cli_cmd('ll /opt')
-
test.cli_cmd('cd /var/www/html/')
-
test.cli_cmd('date')
-
test.cli_cmd('ls')
文章来源: blog.csdn.net,作者:懿曲折扇情,版权归原作者所有,如需转载,请联系作者。
原文链接:blog.csdn.net/qq_41332844/article/details/126837410
【版权声明】本文为华为云社区用户转载文章,如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱:
cloudbbs@huaweicloud.com
- 点赞
- 收藏
- 关注作者
评论(0)