Python 实现Ping命令状态检测

举报
王瑞专家 发表于 2022/12/21 17:51:51 2022/12/21
【摘要】 ping 是一种因特网包探索器,用于测试网络连接量的程序,Ping是工作在TCP/IP网络体系结构中应用层的一个服务命令,主要是向特定的目的主机发送 ICMP 请求报文,测试目的站是否可达及了解其有关状态,实现Ping方法的这段代码原始版本来源于网络,后经排版封装后实现了一些功能,放在这里收藏之用。第一步封装MyPing类,在pycharm下面创建一个MyPing.py文件,详细代码备注如下...

ping 是一种因特网包探索器,用于测试网络连接量的程序,Ping是工作在TCP/IP网络体系结构中应用层的一个服务命令,主要是向特定的目的主机发送 ICMP 请求报文,测试目的站是否可达及了解其有关状态,实现Ping方法的这段代码原始版本来源于网络,后经排版封装后实现了一些功能,放在这里收藏之用。

第一步封装MyPing类,在pycharm下面创建一个MyPing.py文件,详细代码备注如下。

import time, struct
import socket, select

class MyPing():
    # 发送原始套接字
    def raw_socket(self, dst_addr, imcp_packet):
        rawsocket = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket.getprotobyname("icmp"))
        send_request_ping_time = time.time()
        rawsocket.sendto(imcp_packet, (dst_addr, 80))
        return send_request_ping_time, rawsocket

    # 计算校验和
    def chesksum(self, data):
        n = len(data)
        m = n % 2
        sum = 0
        for i in range(0, n - m, 2):
            sum += (data[i]) + ((data[i + 1]) << 8)
            sum = (sum >> 16) + (sum & 0xffff)
        if m:
            sum += (data[-1])
            sum = (sum >> 16) + (sum & 0xffff)
        answer = ~sum & 0xffff
        answer = answer >> 8 | (answer << 8 & 0xff00)
        return answer

    # 通过域名获取主机地址
    def get_host_address(self,host):
        dst_addr = socket.gethostbyname(host)
        return dst_addr

    # 接受到数据包
    def request_ping(self, data_type, data_code, data_checksum, data_ID, data_Sequence, payload_body):
        #  把字节打包成二进制数据
        imcp_packet = struct.pack('>BBHHH32s', data_type, data_code, data_checksum, data_ID, data_Sequence,payload_body)
        # 获取校验和
        icmp_chesksum = self.chesksum(imcp_packet)
        #  把校验和传入,再次打包
        imcp_packet = struct.pack('>BBHHH32s', data_type, data_code, icmp_chesksum, data_ID, data_Sequence,payload_body)
        return imcp_packet

    # 相应数据包,解包执行
    def reply_ping(self, send_request_ping_time, rawsocket, data_Sequence, timeout=3):
        while True:
            # 实例化select对象(非阻塞),可读,可写为空,异常为空,超时时间
            what_ready = select.select([rawsocket], [], [], timeout)
            # 等待时间 wait_for_time = (time.time() - started_select)
            wait_for_time = (time.time() - send_request_ping_time)
            # 没有返回可读的内容,判断超时
            if what_ready[0] == []:
                return -1
            # 记录接收时间
            time_received = time.time()
            # 设置接收的包的字节为1024
            received_packet, addr = rawsocket.recvfrom(1024)
            # 获取接收包的icmp头
            icmpHeader = received_packet[20:28]
            # 反转编码
            type, code, r_checksum, packet_id, sequence = struct.unpack(">BBHHH", icmpHeader)
            if type == 0 and sequence == data_Sequence:
                return time_received - send_request_ping_time
            # 数据包的超时时间判断
            timeout = timeout - wait_for_time
            if timeout <= 0:
                return -1

    # 向特定地址发送一条ping命令
    def send_ping(self, address):
        data_type = 8
        data_code = 0
        data_checksum = 0
        data_ID = 0
        data_Sequence = 1
        payload_body = b'abcdefghijklmnopqrstuvwabcdefghi'

        # 请求ping数据包的二进制转换
        icmp_packet = self.request_ping(data_type, data_code, data_checksum, data_ID, data_Sequence, payload_body)
        # 连接套接字,并将数据发送到套接字
        send_request_ping_time, rawsocket = self.raw_socket(address, icmp_packet)
        # 数据包传输时间
        times = self.reply_ping(send_request_ping_time, rawsocket, data_Sequence)
        if times > 0:
            return_time = int(times * 1000)
            return return_time
        else:
            return -1

实现模仿Windows中的ping命令,代码如下:

from MyPing import *

if __name__ == '__main__':
    # 使用Ping方法
    host = "www.lyshark.com"
    ping = MyPing()

    sumtime, shorttime, longtime, avgtime = 0, 1000, 0, 0
    # 8回射请求 11超时 0回射应答
    data_type = 8
    data_code = 0
    # 检验和
    data_checksum = 0
    # ID
    data_ID = 0
    # 序号
    data_Sequence = 1
    # 可选的内容
    payload_body = b'abcdefghijklmnopqrstuvwabcdefghi'
    dst_addr = socket.gethostbyname(host)
    print("正在 Ping {0} [{1}] 具有 32 字节的数据:".format(host, dst_addr))
    # 发送3for i in range(0, 3):
        # 请求ping数据包的二进制转换
        icmp_packet = ping.request_ping(data_type, data_code, data_checksum, data_ID, data_Sequence + i, payload_body)
        # 连接套接字,并将数据发送到套接字
        send_request_ping_time, rawsocket = ping.raw_socket(dst_addr, icmp_packet)
        # 数据包传输时间
        times = ping.reply_ping(send_request_ping_time, rawsocket, data_Sequence + i)
        if times > 0:
            print("来自 {0} 的回复: 字节=32 时间={1}ms".format(dst_addr, int(times * 1000)))
            return_time = int(times * 1000)
            sumtime += return_time
            if return_time > longtime:
                longtime = return_time
            if return_time < shorttime:
                shorttime = return_time
            time.sleep(0.7)
        else:
            print("请求超时")

运行效果如下:

image.png

发送一条ping探测命令,send_ping()主要用于发送一个Ping包,后期我们可以实现一个主机存活探测器,主要调用代码如下:

from MyPing import *

if __name__ == "__main__":
    # 使用Ping方法
    ping = MyPing()

    address = ping.get_host_address("www.lyshark.com")
    print("域名地址: {}".format(address))

    # 对单台主机Ping
    ref = ping.send_ping(address)
    if( ref != -1):
        print("已经连通, 抖动值: {}".format(ref))

输出测试效果如下:

image.png

如上我们就可以实现对特定主机执行Ping检测了,接下来我们开始编写一个能够计算出特定范围内主机数的CalculationIP()方法,并通过开线程实现一个批量主机存活探测器.

from MyPing import *

def CalculationIP(Addr_Count):
    ret = []
    try:
        IP_Start = str(Addr_Count.split("/")[0]).split(".")
        IP_Heads = str(IP_Start[0] + "." + IP_Start[1] + "." + IP_Start[2] +".")
        IP_Start_Range = int(Addr_Count.split(".")[3].split("/")[0])
        IP_End_Range = int(Addr_Count.split("/")[1])
        for item in range(IP_Start_Range,IP_End_Range+1):
            ret.append(IP_Heads+str(item))
        return ret
    except Exception:
        return 0

if __name__ == "__main__":
    ret = CalculationIP("192.168.1.1/254")

    for each in ret:
        ping = MyPing()
        ref = ping.send_ping(each)
        print("地址: {} ---> 抖动值: {}".format(each,ref))

输出效果如下:

image.png


参考文献:https://blog.csdn.net/Small_Teenager/article/details/122123299

【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。