Ts下载
【摘要】 from tkinter import *import timeimport urllib.requestimport http.cookiejarimport urllib.errorimport urllib.parseimport reimport socketimport osfrom pathlib import Pathfrom Cryptodome.Cipher import ...
from tkinter import * import time import urllib.request import http.cookiejar import urllib.error import urllib.parse import re import socket import os from pathlib import Path from Cryptodome.Cipher import AES from concurrent.futures import ThreadPoolExecutor LOG_LINE_NUM = 0 class MY_GUI(): # 初始化方法 构造方法 def __init__(self, init_window_name): self.init_window_name=init_window_name self.playlist_url = None self.total=0 self.max_num = 250 self.header = {"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8", "Accept-Language": "zh-CN,zh;q=0.8,en-US;q=0.5,en;q=0.3", "User-Agent": "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/38.0.2125.122 Safari/537.36 SE 2.X MetaSr 1.0", "Connection": "keep-alive"} self.cjar = http.cookiejar.CookieJar() self.cookie = urllib.request.HTTPCookieProcessor(self.cjar) self.opener = urllib.request.build_opener(self.cookie) urllib.request.install_opener(self.opener) self.pool = ThreadPoolExecutor(max_workers=10) self.sum = 0 # 利用socket模块,使得每次重新下载的时间变短 socket.setdefaulttimeout(20) def download_file(self, url, target): # 解决下载不完全问题且避免陷入死循环 try: urllib.request.urlretrieve(url, target) except socket.timeout: count = 1 while count <= 5: try: urllib.request.urlretrieve(url, target) break except socket.timeout: err_info = url + ' Reloading for %d time' % count if count == 1 else 'Reloading for %d times' % count print(END, str(err_info) + '\n') count += 1 except: # 解决远程主机关闭问题 self.download_file(url, target) if count > 5: print(str('downloading fialed!')) except: # 解决远程主机关闭问题 self.download_file(url, target) # 打开 m3u8 def open_web(self, url): try: response = self.opener.open(url, timeout=3) except urllib.error.URLError as e: print(END, str('open ' + url + ' error') + '\n') if hasattr(e, 'code'): print(END, str(e.code) + '\n') if hasattr(e, 'reason'): print(END, str(e.reason) + '\n') else: return response.read() '''第一步、解析m3u8''' def get_available_IP(self): print(END, str('开始获取真实的url') + '\n') data = self.open_web(self.m3u8_url).decode('utf-8') if self.url == "": tv_lists = re.findall('\n(https.*)\n', data) else: tv_lists = re.findall(',\n(.*).ts\n', data) self.total = tv_lists.__len__() return tv_lists # 下载.ts def download_for_multi_process(self, ts): if self.url == "": # 去除前缀 https://valipl-vip.cp12.wasu.tv ts = ts.partition("https://valipl-vip.cp12.wasu.tv")[2] ts_url = "https://ip1914634141.mobgslb.tbcache.com"+ts+"&ali_redirect_domain=valipl-vip.cp12.wasu.tv&ali_redirect_ex_ftag=c9f6fb6d0507b94ee99946cf3b7597ebff25a66476d6eb4b&ali_redirect_ex_tmining_ts=1607678029&ali_redirect_ex_tmining_expire=3600&ali_redirect_ex_hot=100" downUrl = ts_url else: ts_url = ts + ".ts" downUrl = self.url + ts_url str_style = "%05d" % self.sum path_str = str_style + '.ts' downPath = self.sourceFile + path_str if os.path.isfile(downPath) and os.path.getsize(downPath) > 0: print(END, str("file already exist") + '\n') else: self.download_file(downUrl, downPath) self.sum = self.sum + 1 jd = str(self.sum)+"/"+str(self.total) print(END, str("进度:" +jd) + '\n') # 开始线程 def download_with_multi_process(self, ts_list): print(END, str('开始多线程下载') + '\n') task = self.pool.map(self.download_for_multi_process, ts_list) # 此时非阻塞 for t in task: # 此时会变成阻塞 pass '''第四步、合并ts文件''' def merge_ts_file_with_os(self): print(END, str('开始合并') + '\n') # 合并ts文件 os.chdir(self.chdir) shell_str = f'copy /b *.ts {self.merge_ts_path}' os.system(shell_str) os.system(f'del /Q *.ts') os.chdir(self.sourceFile) os.system(f'del /Q *.ts') os.chdir(self.chdir) print(END, str('合并完成') + '\n') # 解密 def decodeFile(self): key = self.key self.chdir = self.sourceFile if len(key): # key=bytes(key) cryptor = AES.new(key, AES.MODE_CBC, key) # 读取ts文件 sourceFile = self.sourceFile savefile_path = self.saveFile list_file = os.listdir(sourceFile) for file in list_file: c_fule_name = file file_line = file if ".ts" in c_fule_name: fo = open(os.path.join(sourceFile, file_line), 'rb'); if len(key): # AES 解密,有key就是需要解密 with open(os.path.join(savefile_path, c_fule_name), 'ab') as f: f.write(cryptor.decrypt(fo.read())) print(END, str("解密完成") + '\n') self.chdir = self.saveFile self.merge_ts_file_with_os() # 合并ts文件 #设置窗口 def set_init_window(self): self.init_window_name.title("ts文件下载工具_v2.0 by: 龙林基") #窗口名 self.init_window_name.geometry('500x600+10+10') #标签 self.init_data_label = Label(self.init_window_name, text="请求数据") self.init_data_label.grid(row=0) self.request_m3u8_label= Label(self.init_window_name, text="m3u8_url:") self.request_m3u8_label.grid(row=2) self.request_m3u8_Text = Entry(self.init_window_name, width=55) self.request_m3u8_Text.grid(row=2, column=1, pady=5) self.request_separator_label = Label(self.init_window_name, text="separator:") self.request_separator_label.grid(row=3) self.request_separator_Text = Entry(self.init_window_name, width=55) # separator录入框 self.request_separator_Text.grid(row=3, column=1, pady=5) self.request_fileName_label = Label(self.init_window_name, text="file_name:") self.request_fileName_label.grid(row=4) self.request_fileName_Text = Entry(self.init_window_name, width=55) # fileName录入框 self.request_fileName_Text.grid(row=4, column=1, pady=5) self.request_fileName_Text.insert(0, "fileName") self.request_key_label = Label(self.init_window_name, text="key:") self.request_key_label.grid(row=5) self.request_key_Text = Entry(self.init_window_name, width=55) # ket录入框 self.request_key_Text.grid(row=5, column=1, pady=5) self.request_sourceFile_label = Label(self.init_window_name, text="sourceFile:") self.request_sourceFile_label.grid(row=6) self.request_sourceFile_Text = Entry(self.init_window_name, width=55) # sourceFile录入框 self.request_sourceFile_Text.grid(row=6, column=1, pady=5) self.request_sourceFile_Text.insert(0, "D:\\soft\\python\\project\\download\\dd\\") self.request_saveFile_label = Label(self.init_window_name, text="saveFile:") self.request_saveFile_label.grid(row=8) self.request_saveFile_Text = Entry(self.init_window_name, width=55) # saveFile录入框 self.request_saveFile_Text.grid(row=8, column=1, pady=5) self.request_saveFile_Text.insert(0, "D:\\soft\\python\\project\\download\\decode\\") # 按钮 self.submit_button = Button(self.init_window_name, text="submit", width=5, command=self.runmain) # 调用内部方法 加()为直接调用 self.submit_button.grid(row=12, column=0, sticky=W, pady=5) self.reset_button = Button(self.init_window_name, text="reset", width=5, command=self.reset) # 调用内部方法 加()为直接调用 self.reset_button.grid(row=12, column=1, sticky=W, pady=5) # 主函数 def runmain(self): m3u8s = [] names = [] keys = [] substr = str(self.request_separator_Text.get()) if len(str(self.request_m3u8_Text.get())) > 1: m3u8s = str(self.request_m3u8_Text.get()).split(",") if len(str(self.request_fileName_Text.get())) > 1: names = str(self.request_fileName_Text.get()).split(",") if len(str(self.request_key_Text.get())) > 1: keys = str(self.request_key_Text.get()).split(",") # 下载路径 self.sourceFile = str(self.request_sourceFile_Text.get()).split("\n")[0] self.saveFile = str(self.request_saveFile_Text.get()).split("\n")[0] # 判断路径是否存在 my_file = Path(self.sourceFile) if not my_file.exists(): os.makedirs(my_file) my_file = Path(self.saveFile) if not my_file.exists(): os.makedirs(my_file) # 循环下载 for i in range(len(m3u8s)): m3u8_url = m3u8s[i] if substr == '': web_url=""; else: web_url = m3u8_url.partition(substr)[0]; file_name = re.sub('\s+', '', names[i]).strip() if keys.__len__() is 0: key = '' else: key = keys[i].encode('utf8') merge_ts_path = f'{file_name}.mp4' self.m3u8_url = m3u8_url self.url = web_url self.merge_ts_path = merge_ts_path self.key = key self.sum =0 # down = MY_GUI(web_url, m3u8_url, merge_ts_path, key) # 构造方法 ts_list = self.get_available_IP() # 第一步、获取真正的ts 列表 self.download_with_multi_process(ts_list) # 开始多线程下载 self.decodeFile() # reset def reset(self): # self.log_data_Text.delete() self.request_m3u8_Text.delete(0, END) self.request_separator_Text.delete(0, END) self.request_fileName_Text.delete(0, END) self.request_key_Text.delete(0, END) #获取当前时间 def get_current_time(self): current_time = time.strftime('%Y-%m-%d %H:%M:%S',time.localtime(time.time())) return current_time #日志动态打印 def write_log_to_Text(self,logmsg): global LOG_LINE_NUM current_time = self.get_current_time() logmsg_in = str(current_time) +" " + str(logmsg) + "\n" #换行 self.log_data_Text.insert(END, logmsg_in) def gui_start(): init_window = Tk() #实例化出一个父窗口 ZMJ_PORTAL = MY_GUI(init_window) # 设置根窗口默认属性 ZMJ_PORTAL.set_init_window() init_window.mainloop() #父窗口进入事件循环,可以理解为保持窗口运行,否则界面不展示 gui_start()
【版权声明】本文为华为云社区用户原创内容,转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息, 否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱:
cloudbbs@huaweicloud.com
- 点赞
- 收藏
- 关注作者
评论(0)