基于200I DK A2的--实时语音字幕生成器,视频文本摘录器

举报
yd_286753431 发表于 2023/12/19 04:48:44 2023/12/19
【摘要】 PS:巨量时间,不断踩坑,悲凉血泪史,总结成的小小见解。如果各位觉得写的还行,劳请各位老师们点赞收藏一波!!各位老师的鼓励是我坚持写作的动力!!我玩的就是真实!!!我玩的就是真实!!!我玩的就是真实!!!以下解决方案仅代表个人见解,如果有更好的方案,希望大家不吝提出,共同学习问:做这个设计的目的是什么?想要解决怎样的痛点?1.在学习的过程中,总会遇到一些视频并没有提供相应的字幕,导致理解上有...

PS:巨量时间,不断踩坑,悲凉血泪史,总结成的小小见解。

如果各位觉得写的还行,劳请各位老师们点赞收藏一波!!各位老师的鼓励是我坚持写作的动力!!

我玩的就是真实!!!我玩的就是真实!!!我玩的就是真实!!!


以下解决方案仅代表个人见解,如果有更好的方案,希望大家不吝提出,共同学习


问:做这个设计的目的是什么?想要解决怎样的痛点?

1.在学习的过程中,总会遇到一些视频并没有提供相应的字幕,导致理解上有些偏差

听课20分钟后,突然想起课程里提及的要点,但是记忆很模糊,想要去找到这个模糊的知识点巩固下

但是需要在原有视频中花较大时间去复原记忆点的位置


2.亦或是,实时网络会议过程,需要整理相应的文本摘要,或者是回顾老师,领导讲述的重点内容。

那么由于是实时视频,没有回放,没有录屏。无从下手


3.这个算是这个设计后续的升级版本,直接对英文进行在线翻译,解决全英语课堂的语言问题


问:怎么设计呢?设计思路是什么?

首先这个设计时基于昇腾Altas 200I DK A2 的speech-recognition实验,融入了语音侦听,socket传输功能

我的思路是,通过电脑声音录制中立体声混音特性,对电脑的Realtek High Definition Audio进行侦听

然后通过socket传输到Altas 200I DK A2, 通过生成模型实时推理,生成相应的字幕或是文本


问:本设计使用的流控是怎么样的呢?

个人觉得本设计使用的流控还是比较取巧,比较伪。并没有达到心目中真正的流翻译。

通过对音频进行pyaudio 进行 侦听,将实时固定chunk 以wav格式加密,然后通过socket传输到设备

再进行推理。所以会有多个wav 格式的数据传输至设备,使用ByteIO内存管理,不用写入内存

如果大家有更好的方法,可以多交流。


问:最终效果怎么样?


问:不足在哪里?

1.由于使用wenet,模型选择并非是大模型,可能会导致在部分专有名字上翻译不到位

2.如果存在背景音乐将会造成很大的干扰,需要独立人声后进行翻译更精准,可以用凤凰传奇的《奢香夫人》进行案例测试

3.当然对比微信语音翻译,在音乐文本翻译上确实弱一些。

4.如果可能,使用whisper模型进行测试可能会有更好效果


下面就贴具体代码,欢迎大家有更好解决方案,或者代码方案,能一起交流


问:电脑端的代码是怎么样的?


class Handle_Process(Process):
    def __init__(self,queue):
        super(Handle_Process, self).__init__()
        self.name = 'Handle_Process'
        self.thread = Event()
        self.queue = queue
        self.frames = []
        self.sk=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
        ip_port=('192.168.137.100',55)
        self.sk.connect(ip_port)
        
    def stop(self):
        self.thread.set()
    
    def check(self):
        return self.thread.is_set()
    
    def handler(self):
        try:
            if not self.queue.empty():
                data = self.queue.get()
                self.frames = self.tranfer2wav(data)
                self.frames = self.frames + bytes('END', encoding = "utf8")
                self.sk.send(self.frames)#将发送的数据进行编码d
        except Exception as e:
            print(e)

    def compute_fbank(self,waveform,
                    sample_rate,
                    num_mel_bins=80,
                    frame_length=25,
                    frame_shift=10,
                    dither=0.0):
        """提取filter bank音频特征"""
        AMPLIFY_FACTOR = 1 << 15
        waveform = waveform * AMPLIFY_FACTOR
        mat = kaldi.fbank(waveform,
                        num_mel_bins=num_mel_bins,
                        frame_length=frame_length,
                        frame_shift=frame_shift,
                        dither=dither,
                        energy_floor=0.0,
                        sample_frequency=sample_rate)
        return mat

    def resample(self,waveform, sample_rate, resample_rate=16000):
        waveform = torchaudio.transforms.Resample(
            orig_freq=sample_rate, new_freq=resample_rate)(waveform)
        return waveform, resample_rate


    def tranfer2wav(self,data):
        chunk_description = b'RIFF'
        sub_chunk2_size = len(data)
        chunk_size = sub_chunk2_size + 44 - 8
        wave_description = b'WAVE'
        fmt_subchunk = b'fmt '
        sub_chunk1_size = 16
        audio_format = 1
        num_channels = 1
        sample_rate = 16000
        bits_per_sample = 16
        byte_rate = int(sample_rate * num_channels * bits_per_sample / 8)
        block_align = int(num_channels * bits_per_sample / 8)
        data_subchunk = b'data'
        

        buf = []
        buf.append(chunk_description)
        buf.append(struct.pack('<i', chunk_size))
        buf.append(wave_description)
        buf.append(fmt_subchunk)
        buf.append(struct.pack('<i', sub_chunk1_size))
        buf.append(struct.pack('<h', audio_format))
        buf.append(struct.pack('<h', num_channels))
        buf.append(struct.pack('<i', sample_rate))
        buf.append(struct.pack('<i', byte_rate))
   

        buf.append(struct.pack('<h', block_align))
        buf.append(struct.pack('<h', bits_per_sample))
        buf.append(data_subchunk)
        buf.append(struct.pack('<i', sub_chunk2_size))
        buf.append(data)
        buf = b''.join(buf)
        return buf
    

    
    def run(self):
        while True:
            try:
                if self.check():
                    self.sk.close()
                    break
                self.handler()
                time.sleep(0.01)
            except Exception as e:
                print(e)复制


问:设备端的代码是怎么样的?


    def transcribe(self):
        """执行模型推理,将录音文件转为文本。"""
        try:
            tcp_server_socket= socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        except Exception as e:
            print(e)
            tcp_server_socket.close()
            exit()

        address=('192.168.137.100',55)
        tcp_server_socket.bind(address)
        tcp_server_socket.listen(3)
        total_recv_data = b''
        while True: 
            try:
                client_socket, clientAddr=tcp_server_socket.accept()
            except Exception as e:
                print(e)
                client_socket.close()
                tcp_server_socket.close()
                exit()
                
            while True:
                try:
                    recv_data = client_socket.recv(1024)
                    #print(len(recv_data))
                except Exception as e:
                    print(e)
                    client_socket.close()
                    tcp_server_socket.close()
                    exit()
                    
                if recv_data:
                    if len(recv_data) < 1024 and recv_data[-3:] == bytes('END', encoding = "utf8"):
                        total_recv_data = total_recv_data + recv_data
                        total_recv_data = total_recv_data[:-3]
                       
                        waveform, sample_rate = torchaudio.load(io.BytesIO(total_recv_data))
                        feature = compute_fbank(waveform, sample_rate)
                        feats_pad = feature.numpy().astype(np.float32)
                        feats_lengths = np.array([feature.shape[0]]).astype(np.int32)                             
                        
                        feats_pad = feature.unsqueeze(0)
                        feats_pad = feats_pad.numpy().astype(np.float32)
                        output = self.model.infer([feats_pad, feats_lengths])
                        txt = self.post_process(output)
                        print(txt)
                        total_recv_data = b''
                        
                    else:
                        total_recv_data = total_recv_data + recv_data

                    
                else:
                    break
            client_socket.close()
        tcp_server_socket.close()  复制



具体代码详见附件,上文只是粘贴部分代码

希望大家能够多交流,提供更好的思路~~~

【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。