基于千兆以太网的FPGA频谱分析,上位机部分

举报
yd_211085766 发表于 2023/10/20 14:40:49 2023/10/20
【摘要】 基于千兆以太网的FPGA的频谱仪显示,上位机的难点显然不在于FFT的频谱分析,如何实时获取数据,与FPGA进行对接成为主要的难点。

一、背景
        该项目原课题为基于千兆以太网的FPGA的频谱仪显示,上位机的难点显然不在于FFT的频谱分析,如何实时获取数据,与FPGA进行对接成为主要的难点。

程序语言:python

环境:Anaconda envs:python3.7

平台:Pycharm; Qt designer

参考平台:Wireshark

二、设计原理
        首先设计信号监听函数,若有数据输入,则接口正确;若无数据输入则继续监听直到传入数据。接收到数据后根据控件可打开线程,首先是线程一,实时监听数据并将数据存入数组,并附有一个时间轴数组与之对应。主线程为作图函数,首先对接受的数据进行FFT变换,之后对信号与频谱作图并实时刷新。线程二为控制绘图线程,点击控件后会改变绘制条件来结束绘制。逻辑图如下:

三、具体设计
1.基于sniff函数实时抓包以太网网口的数据
        首先要安装python抓包的核心库:scapy,之后核心是实时监听网口抓包数据,用到的是sniff函数,它可以用于接收和发包,便于上位机自检。抓包函数只需下面两行:

while 1:
    sniff(filter="udp", prn=pack_callback, iface=IFACES.dev_from_index(11), count=1)
#其中11为以太网的网口,每次接收一个包,prn后的是自己定义的回调函数
        其中回调函数可定义,因为基于千兆以太网的FPGA传来的是UDP包,设置过滤为udp,我们需要从抓到的udp包中获得它的数据部分,于是我们可定义以下回调函数:

    def pack_callback(packet):
        p = packet
 
        # 如果UDP包中含有数据层,则把Raw曾的数据提取出来
        if 'Raw' in p:
            data = p.getlayer(Raw)
            for each in data.load:
                tim = np.append(tim, ttemp / (bagsize *sample_bag))
                # datax为数据数组
                datax = np.append(datax, int(each))
                # ttemp为所有数据的个数
                ttemp += 1  # 数据个数
                # bagsize为一组显示单元,控制数组长度,避免数组无限长导致的开销加大
                if ttemp > bagsize:
                    self.tim = np.delete(self.tim, [0])
                    self.datax = np.delete(self.datax, [0])

        tim为时间数组,用于与datax的数据对应,需要将时间归一化。其次需要及时删掉之前的数据防止数组无限大。其中bagsize为一个包中数据部分大小,此处为512,sample_bag为一秒抓包个数,此处为10。这两个参数需要与发包方对接修改。

        sniff同时可以自发包自抓包来进行自检,发送一个八个0和八个8循环的方波,长度为512,0.1秒发一个包,注意网口要和抓包那边的网口一致并且存在。发包函数如下定义:

 
L=[0,0,0,0,0,0,0,0,8,8,8,8,8,8,8,8,0,0,0,0,0,0,0,0,8,8,8,8,8,8,8,8,0,0,0,0,0,0,0,0,8,8,8,8,8,8,8,8,0,0,0,0,0,0,0,0,8,8,8,8,8,8,8,8,0,0,0,0,0,0,0,0,8,8,8,8,8,8,8,8,0,0,0,0,0,0,0,0,8,8,8,8,8,8,8,8,0,0,0,0,0,0,0,0,8,8,8,8,8,8,8,8,0,0,0,0,0,0,0,0,8,8,8,8,8,8,8,8,0,0,0,0,0,0,0,0,8,8,8,8,8,8,8,8,0,0,0,0,0,0,0,0,8,8,8,8,8,8,8,8,0,0,0,0,0,0,0,0,8,8,8,8,8,8,8,8,0,0,0,0,0,0,0,0,8,8,8,8,8,8,8,8,0,0,0,0,0,0,0,0,8,8,8,8,8,8,8,8,0,0,0,0,0,0,0,0,8,8,8,8,8,8,8,8,0,0,0,0,0,0,0,0,8,8,8,8,8,8,8,8,0,0,0,0,0,0,0,0,8,8,8,8,8,8,8,8,0,0,0,0,0,0,0,0,8,8,8,8,8,8,8,8,0,0,0,0,0,0,0,0,8,8,8,8,8,8,8,8,0,0,0,0,0,0,0,0,8,8,8,8,8,8,8,8,0,0,0,0,0,0,0,0,8,8,8,8,8,8,8,8,0,0,0,0,0,0,0,0,8,8,8,8,8,8,8,8,0,0,0,0,0,0,0,0,8,8,8,8,8,8,8,8,0,0,0,0,0,0,0,0,8,8,8,8,8,8,8,8,0,0,0,0,0,0,0,0,8,8,8,8,8,8,8,8,0,0,0,0,0,0,0,0,8,8,8,8,8,8,8,8,0,0,0,0,0,0,0,0,8,8,8,8,8,8,8,8,0,0,0,0,0,0,0,0,8,8,8,8,8,8,8,8,0,0,0,0,0,0,0,0,8,8,8,8,8,8,8,8,0,0,0,0,0,0,0,0,8,8,8,8,8,8,8,8,0,0,0,0,0,0,0,0,8,8,8,8,8,8,8,8,0,0,0,0,0,0,0,0,8,8,8,8,8,8,8,8,0,0,0,0,0,0,0,0,8,8,8,8,8,8,8,8,0,0,0,0,0,0,0,0,8,8,8,8,8,8,8,8,0,0,0,0,0,0,0,0,8,8,8,8,8,8,8,8,0,0,0,0,0,0,0,0,8,8,8,8,8,8,8,8,0,0,0,0,0,0,0,0,8,8,8,8,8,8,8,8,0,0,0,0,0,0,0,0,8,8,8,8,8,8,8,8,0,0,0,0,0,0,0,0,8,8,8,8,8,8,8,8,0,0,0,0,0,0,0,0,8,8,8,8,8,8,8,8,0,0,0,0,0,0,0,0,8,8,8,8,8,8,8,8,0,0,0,0,0,0,0,0,8,8,8,8,8,8,8,8,0,0,0,0,0,0,0,0,8,8,8,8,8,8,8,8,0,0,0,0,0,0,0,0,8,8,8,8,8,8,8,8,0,0,0,0,0,0,0,0,8,8,8,8,8,8,8,8,0,0,0,0,0,0,0,0,8,8,8,8,8,8,8,8,0,0,0,0,0,0,0,0,8,8,8,8,8,8,8,8,0,0,0,0,0,0,0,0,8,8,8,8,8,8,8,8,0,0,0,0,0,0,0,0,8,8,8,8,8,8,8,8,0,0,0,0,0,0,0,0,8,8,8,8,8,8,8,8,0,0,0,0,0,0,0,0,8,8,8,8,8,8,8,8,0,0,0,0,0,0,0,0,8,8,8,8,8,8,8,8,0,0,0,0,0,0,0,0,8,8,8,8,8,8,8,8,0,0,0,0,0,0,0,0,8,8,8,8,8,8,8,8,0,0,0,0,0,0,0,0,8,8,8,8,8,8,8,8,0,0,0,0,0,0,0,0,8,8,8,8,8,8,8,8,0,0,0,0,0,0,0,0,8,8,8,8,8,8,8,8,0,0,0,0,0,0,0,0,8,8,8,8,8,8,8,8,0,0,0,0,0,0,0,0,8,8,8,8,8,8,8,8,0,0,0,0,0,0,0,0,8,8,8,8,8,8,8,8,0,0,0,0,0,0,0,0,8,8,8,8,8,8,8,8,0,0,0,0,0,0,0,0,8,8,8,8,8,8,8,8,0,0,0,0,0,0,0,0,8,8,8,8,8,8,8,8,0,0,0,0,0,0,0,0,8,8,8,8,8,8,8,8,0,0,0,0,0,0,0,0,8,8,8,8,8,8,8,8]
L3=L[0:512]
line=str(L3)
de=b''.join([bytes([int(i)]) for i in line[1:-1].split(',')])
# # 发送端IP地址不是本机ip地址   目的端IP地址不详      传输层的TCP并未指明数据包类型:syn fin ack 窗口大小 数据包如果分片,要指明序号
pkt = Ether(dst="22:33:44:55:66:77:88")/IP(src='1.1.1.200', dst='2.2.2.200')/UDP(sport=100, dport=200)/de
# # 间隔0.1秒发送一次   总共发送1000次   发送网卡口:11口
sendp(pkt, inter=0.1, count=1000, iface=IFACES.dev_from_index(11))
        当时将网口设置为本地连接时没有任何问题,但是改为以太网的网口会出现错误,具体报错也不记得了,但是解决方法是将npcap退回至1.3.1,希望可以有所帮助。 

关于sniff函数的具体用法我参照了这篇文章:Scapy 嗅探Python仿真实验(GUI界面)--2.2使用sniff监听网口抓包_谁月的博客-CSDN博客

2.FFT作图

        在得到实时抓包的数据后,直接调用scipy的fft函数,得到的是一个复数,其中包括了它的幅度、频率等信息,不过需要手动处理(python fft的文章太多了,就不展开了)。主要问题如下:因为需要一边实时抓包一边作图,就需要设置线程。将抓包函数可定义为线程与控件结合,一开始我把matplotlib的作图函数放在线程下导致报错,原因在于matplotlib的相关函数必须放在主线程下。因为要与pyqt5的界面交互显示,用到了下面这些东西:

from matplotlib.pylab import mpl
from matplotlib.backends.qt_compat import QtWidgets
from matplotlib.backends.backend_qtagg import (FigureCanvas, NavigationToolbar2QT as NavigationToolbar)
from matplotlib.figure import Figure
        之后便是动态图与UI嵌入设计,从matplotlib中学到具体方法,同时也需要设计回调函数与刷新速率,刷新速率应该与抓包速率相同。回调函数只需要表示你想要画的东西,因为里面的参数在实时修改,所以会进行图像刷新。详见这篇:Embedding in Qt — Matplotlib 3.7.1 documentation

         其中刷新速率如下定义:

timer1 = dynamic_canvas.new_timer(1000/sample_bag))
        这方面就不多展开了,只需要将上面那篇文章显示与于CentralWidget改为listview便可以进行交互的设计了。ui设计如下:

四、结语 
        这是在本科第一次做的联合项目,文章介绍了上位机的核心思路,当时拿到课题真的是一无所知,最后还是一步一步做出来了,我把我当时遇到的瓶颈的解决思路向大家分享出来了,剩下的都是简单的码农该干的事了(bushi。希望这篇文章对大家有所帮助。

【版权声明】本文为华为云社区用户原创内容,未经允许不得转载,如需转载请自行联系原作者进行授权。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。