Python代码里使用paramiko连接远程服务器执行Shell命令
需求
在自动化测试场景里, 有时需要在代码里获取远程服务器的某些数据, 或执行一些查询命令,如获取Linux系统版本号 \ 获取CPU及内存的占用等, 本章记录一下使用paramiko模块SSH连接服务器的方法
1. 先安装paramiko库
pip3 install paramiko
2. 代码
#!/usr/bin/env python
# coding=utf-8
"""
# :author: Terry Li
# :url: https://blog.csdn.net/qq_42183962
# :copyright: © 2020-present Terry Li
# :motto: I believe that the God rewards the diligent.
"""
import paramiko
class cfg:
host = "192.168.2.2"
user = "root"
password = "123456"
class sshChannel:
def __init__(self, cfg_obj, timeout_sec=15, port=22):
self._cfg = cfg_obj
self.ssh_connect_timeout = timeout_sec
self.port = port
self.ssh_cli = None
def __enter__(self):
try:
self.connecting_server_with_SSH2()
except paramiko.ssh_exception.SSHException:
print("连接{}失败, 请检查配置或重试".format(self._cfg.host))
self.ssh_cli.close()
else:
return self
def __exit__(self, tp, value, trace):
self.ssh_cli.close()
def connecting_server_with_SSH2(self):
self.ssh_cli = paramiko.SSHClient()
self.ssh_cli.load_system_host_keys()
key = paramiko.AutoAddPolicy()
self.ssh_cli.set_missing_host_key_policy(key)
self.ssh_cli.connect(self._cfg.host, port=self.port, username=self._cfg.user, password=self._cfg.password,
timeout=self.ssh_connect_timeout)
def execute_cmd(self, cmd):
"""
:param cmd: 单个命令
:return: 服务器的输出信息
"""
stdin, stdout, stderr = self.ssh_cli.exec_command(cmd)
return stdout.read().decode('utf-8')
def execute_cmd_list(self, cmd_list):
"""
:param cmd_list: 以列表的形式传入命令
:return: 以列表的形式返回服务器的输出信息,与命令列表的顺序一一对应
"""大连专业人流医院 http://www.dlrlyy.net/
out_list = list(map(self.execute_cmd, cmd_list))
return out_list
def test_get_sys_version(self):
sys_version = self.execute_cmd("lsb_release -rd")
print(sys_version)
def test_get_sys_disk_free_and_memory_free(self):
sys_info = self.execute_cmd_list(["df -h -BG /", "free -m"])
print(sys_info)
if __name__ == '__main__':
with sshChannel(cfg) as my_server:
# 自测
my_server.test_get_sys_version()
my_server.test_get_sys_disk_free_and_memory_free()
# 调用演示
pwd = my_server.execute_cmd("pwd")
print(pwd)
- 点赞
- 收藏
- 关注作者
评论(0)