Python代码里使用paramiko连接远程服务器执行Shell命令

举报
nineteens 发表于 2021/03/04 22:04:48 2021/03/04
【摘要】 Python代码里使用paramiko连接远程服务器执行Shell命令

  需求

  在自动化测试场景里, 有时需要在代码里获取远程服务器的某些数据, 或执行一些查询命令,如获取Linux系统版本号 \ 获取CPU及内存的占用等, 本章记录一下使用paramiko模块SSH连接服务器的方法

  1. 先安装paramiko库

  pip3 install paramiko

  2. 代码

  #!/usr/bin/env python

  # coding=utf-8

  """

  # :author: Terry Li

  # :url: https://blog.csdn.net/qq_42183962

  # :copyright: © 2020-present Terry Li

  # :motto: I believe that the God rewards the diligent.

  """

  import paramiko

  class cfg:

  host = "192.168.2.2"

  user = "root"

  password = "123456"

  class sshChannel:

  def __init__(self, cfg_obj, timeout_sec=15, port=22):

  self._cfg = cfg_obj

  self.ssh_connect_timeout = timeout_sec

  self.port = port

  self.ssh_cli = None

  def __enter__(self):

  try:

  self.connecting_server_with_SSH2()

  except paramiko.ssh_exception.SSHException:

  print("连接{}失败, 请检查配置或重试".format(self._cfg.host))

  self.ssh_cli.close()

  else:

  return self

  def __exit__(self, tp, value, trace):

  self.ssh_cli.close()

  def connecting_server_with_SSH2(self):

  self.ssh_cli = paramiko.SSHClient()

  self.ssh_cli.load_system_host_keys()

  key = paramiko.AutoAddPolicy()

  self.ssh_cli.set_missing_host_key_policy(key)

  self.ssh_cli.connect(self._cfg.host, port=self.port, username=self._cfg.user, password=self._cfg.password,

  timeout=self.ssh_connect_timeout)

  def execute_cmd(self, cmd):

  """

  :param cmd: 单个命令

  :return: 服务器的输出信息

  """

  stdin, stdout, stderr = self.ssh_cli.exec_command(cmd)

  return stdout.read().decode('utf-8')

  def execute_cmd_list(self, cmd_list):

  """

  :param cmd_list: 以列表的形式传入命令

  :return: 以列表的形式返回服务器的输出信息,与命令列表的顺序一一对应

  """大连专业人流医院 http://www.dlrlyy.net/

  out_list = list(map(self.execute_cmd, cmd_list))

  return out_list

  def test_get_sys_version(self):

  sys_version = self.execute_cmd("lsb_release -rd")

  print(sys_version)

  def test_get_sys_disk_free_and_memory_free(self):

  sys_info = self.execute_cmd_list(["df -h -BG /", "free -m"])

  print(sys_info)

  if __name__ == '__main__':

  with sshChannel(cfg) as my_server:

  # 自测

  my_server.test_get_sys_version()

  my_server.test_get_sys_disk_free_and_memory_free()

  # 调用演示

  pwd = my_server.execute_cmd("pwd")

  print(pwd)

【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。