Python 抓取并解码原始数据包

举报
王瑞专家 发表于 2022/12/21 18:09:14 2022/12/21
【摘要】 应用Python支持的混杂模式,抓取流经网卡的数据包,并对IP以及ICMP数据包进行拆包,打印出我们所需要的字段信息。抓取原始数据包: Python中默认的Socket模块就可以实现对原始数据包的解包操作,如下代码.需要注意这段代码只能在Windows平台使用,因为我们需要开启网卡的IOCTL混杂模式,这是Win平台特有的.import socketimport uuid# 获取本机MAC地...

应用Python支持的混杂模式,抓取流经网卡的数据包,并对IP以及ICMP数据包进行拆包,打印出我们所需要的字段信息。

抓取原始数据包: Python中默认的Socket模块就可以实现对原始数据包的解包操作,如下代码.

需要注意这段代码只能在Windows平台使用,因为我们需要开启网卡的IOCTL混杂模式,这是Win平台特有的.

import socket
import uuid

# 获取本机MAC地址
def GetHostMAC():
    mac=uuid.UUID(int = uuid.getnode()).hex[-12:]
    this_mac = ":".join([mac[e:e+2] for e in range(0,11,2)])
    this_name = socket.getfqdn(socket.gethostname())
    print("本机名: {} --> 本机MAC: {}".format(this_name,this_mac))

# 获取本机IP地址
def GetHostAddress():
    try:
        sock=socket.socket(socket.AF_INET,socket.SOCK_DGRAM)
        sock.connect(('8.8.8.8',80))
        address =sock.getsockname()[0]
    finally:
        return address
        address.close()

# 开始跟踪原始数据包
def SnifferIOSock(address):
    # 创建原始套接字,然后绑定在公开接口上
    socket_protocol = socket.IPPROTO_IP

    # 开启原始数据包模式
    sniffer = socket.socket(socket.AF_INET,socket.SOCK_RAW,socket_protocol)
    sniffer.bind((address,0))

    # 设置在捕获的数据包中包含IP头
    sniffer.setsockopt(socket.IPPROTO_IP,socket.IP_HDRINCL,1)

    # Windows平台需要设置IOCTL以启动混杂模式
    sniffer.ioctl(socket.SIO_RCVALL,socket.RCVALL_ON)

    # 读取单个数据包
    while True:
        print(sniffer.recvfrom(65565))

    # 退出时,关闭混杂模式
    sniffer.ioctl(socket.SIO_RCVALL,socket.RCVALL_OFF)

if __name__ == "__main__":
    GetHostMAC()
    address = GetHostAddress()
    print("本机IP: {}".format(address))
    SnifferIOSock(address)

解码IP数据包头: 解码方法同样运用的是上方的方法,只不过这里我们需要找到完整的IP地址的包头封装格式,然后根据特定的包头格式对数据包进行解包操作即可.

import socket
import os
import struct
from ctypes import *

# 定义IP头部结构体
class IP(Structure):
    _fields_ = [
        ("ihl",            c_ubyte, 4),
        ("version",        c_ubyte, 4),
        ("tos",            c_ubyte),
        ("len",            c_ushort),
        ("id",            c_ushort),
        ("offset",        c_ushort),
        ("ttl",            c_ubyte),
        ("protocol_num",    c_ubyte),
        ("sum",            c_ushort),
        ("src",            c_ulong),      # linux 需要变为 c_uint32
        ("dst",            c_ulong)       # linux 需要变为 c_uint32
    ]
    def __new__(self,socket_buffer=None):
        return self.from_buffer_copy(socket_buffer)

    def __init__(self, socket_buffer=None):
        # 定义协议序号与名称对应关系
        self.protocol_map = {1:"ICMP", 6:"TCP", 17:"UDP"}

        # 将数据包解包为地址
        self.src_address = socket.inet_ntoa(struct.pack("<L",self.src))
        self.dst_address = socket.inet_ntoa(struct.pack("<L",self.dst))
        self.this_ttl = self.ttl

        # self.src_address = socket.inet_ntoa(struct.pack("@I",self.src))
        # self.dst_address = socket.inet_ntoa(struct.pack("@I",self.dst))

        # 协议类型
        try:
            self.protocol = self.protocol_map[self.protocol_num]
        except:
            self.protocol = str(self.protocol_num)

# 获取本机IP地址
def GetHostAddress():
    try:
        sock=socket.socket(socket.AF_INET,socket.SOCK_DGRAM)
        sock.connect(('8.8.8.8',80))
        address =sock.getsockname()[0]
    finally:
        return address
        address.close()

# 执行解包过程,并输出
def SnifferIPAddress(address):

    # 平台选择,nt代表Windows
    if os.name == "nt":
        socket_protocol = socket.IPPROTO_IP
    else:
        socket_protocol = socket.IPPROTO_ICMP

    # 开启原始套接字模式
    sniffer = socket.socket(socket.AF_INET,socket.SOCK_RAW,socket_protocol)
    sniffer.bind((address,0))
    sniffer.setsockopt(socket.IPPROTO_IP,socket.IP_HDRINCL,1)

    if os.name == "nt":
        sniffer.ioctl(socket.SIO_RCVALL,socket.RCVALL_ON)

    # 循环接受数据包并解包
    try:
        while True:
            # 读取数据包
            raw_buffer = sniffer.recvfrom(65565)[0]

            # 将缓冲区的前20个字节按IP头进行解析
            ip_header = IP(raw_buffer[0:20])

            # 输出协议和通信双方IP地址
            print("传输协议: {} --> 原地址: {} --> 传输到: {} --> 生存周期: {}".format(ip_header.protocol,ip_header.src_address,ip_header.dst_address,ip_header.this_ttl))

    # 如果按下Ctrl+C则退出
    except KeyboardInterrupt:
        # 关闭混杂模式
        if os.name == "nt":
            sniffer.ioctl(socket.SIO_RCVALL,socket.RCVALL_OFF)

if __name__ == "__main__":
    address = GetHostAddress()
    SnifferIPAddress(address)

解码ICMP数据包头: 原理与解包IP头相同,但需要注意,由于ICMP头在IP头的下方,所以我们需要先解析出IP头数据包,然后根据IP头中的protocol_num判断如果是ICMP则将其传入ICMP结构做进一步解包即可.

import socket
import os
import struct
from ctypes import *

# 定义IP头部结构体
class IP(Structure):
    _fields_ = [
        ("ihl",            c_ubyte, 4),
        ("version",        c_ubyte, 4),
        ("tos",            c_ubyte),
        ("len",            c_ushort),
        ("id",            c_ushort),
        ("offset",        c_ushort),
        ("ttl",            c_ubyte),
        ("protocol_num",    c_ubyte),
        ("sum",            c_ushort),
        ("src",            c_ulong),      # linux 需要变为 c_uint32
        ("dst",            c_ulong)       # linux 需要变为 c_uint32
    ]
    def __new__(self,socket_buffer=None):
        return self.from_buffer_copy(socket_buffer)

    def __init__(self, socket_buffer=None):
        # 定义协议序号与名称对应关系
        self.protocol_map = {1:"ICMP", 6:"TCP", 17:"UDP"}

        # 将数据包解包为地址
        self.src_address = socket.inet_ntoa(struct.pack("<L",self.src))
        self.dst_address = socket.inet_ntoa(struct.pack("<L",self.dst))
        self.this_ttl = self.ttl

        # self.src_address = socket.inet_ntoa(struct.pack("@I",self.src))
        # self.dst_address = socket.inet_ntoa(struct.pack("@I",self.dst))

        # 协议类型
        try:
            self.protocol = self.protocol_map[self.protocol_num]
        except:
            self.protocol = str(self.protocol_num)

# 定义ICMP结构包头
class ICMP(Structure):
    _fields_ = [
        ("type",            c_ubyte),
        ("code",            c_ubyte),
        ("checksum",        c_ushort),
        ("unused",          c_ushort),
        ("next_hop_mtu",    c_ushort)
    ]

    def __new__(self,socket_buffer=None):
        return self.from_buffer_copy(socket_buffer)

    def __init__(self, socket_buffer=None):
        self.icmp_type = self.type
        self.icmp_code = self.code
        self.icmp_checksum = self.checksum


# 获取本机IP地址
def GetHostAddress():
    try:
        sock=socket.socket(socket.AF_INET,socket.SOCK_DGRAM)
        sock.connect(('8.8.8.8',80))
        address =sock.getsockname()[0]
    finally:
        return address
        address.close()

# 执行解包过程,并输出
def SnifferIPAddress(address):

    # 平台选择,nt代表Windows
    if os.name == "nt":
        socket_protocol = socket.IPPROTO_IP
    else:
        socket_protocol = socket.IPPROTO_ICMP

    # 开启原始套接字模式
    sniffer = socket.socket(socket.AF_INET,socket.SOCK_RAW,socket_protocol)
    sniffer.bind((address,0))
    sniffer.setsockopt(socket.IPPROTO_IP,socket.IP_HDRINCL,1)

    if os.name == "nt":
        sniffer.ioctl(socket.SIO_RCVALL,socket.RCVALL_ON)

    # 循环接受数据包并解包
    try:
        while True:
            # 读取数据包
            raw_buffer = sniffer.recvfrom(65565)[0]

            # 将缓冲区的前20个字节按IP头进行解析
            ip_header = IP(raw_buffer[0:20])

            # 判断协议类型是否为ICMP
            if ip_header.protocol == "ICMP":

                # 计算ICMP包的起始位置
                offset = ip_header.ihl * 4
                buf = raw_buffer[offset:offset + sizeof(ICMP)]

                #解析ICMP数据
                icmp_header = ICMP(buf)
                print("原地址: {} --> 发送到: {} --> 解包协议: {} --> 解包代码: {} --> 校验和: {}".format(ip_header.src_address, ip_header.dst_address, icmp_header.icmp_type,icmp_header.icmp_code,icmp_header.icmp_checksum))

    # 如果按下Ctrl+C则退出
    except KeyboardInterrupt:
        # 关闭混杂模式
        if os.name == "nt":
            sniffer.ioctl(socket.SIO_RCVALL,socket.RCVALL_OFF)

if __name__ == "__main__":
    address = GetHostAddress()
    SnifferIPAddress(address)

我们运行上方的代码,然后在本机Ping测试一下www.lyshark.com此时即可看到,本机与对端的来往数据包.

【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。