Ts下载
【摘要】 from tkinter import *import timeimport urllib.requestimport http.cookiejarimport urllib.errorimport urllib.parseimport reimport socketimport osfrom pathlib import Pathfrom Cryptodome.Cipher import ...
from tkinter import *
import time
import urllib.request
import http.cookiejar
import urllib.error
import urllib.parse
import re
import socket
import os
from pathlib import Path
from Cryptodome.Cipher import AES
from concurrent.futures import ThreadPoolExecutor
LOG_LINE_NUM = 0
class MY_GUI():
# 初始化方法 构造方法
def __init__(self, init_window_name):
self.init_window_name=init_window_name
self.playlist_url = None
self.total=0
self.max_num = 250
self.header = {"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
"Accept-Language": "zh-CN,zh;q=0.8,en-US;q=0.5,en;q=0.3",
"User-Agent": "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/38.0.2125.122 Safari/537.36 SE 2.X MetaSr 1.0",
"Connection": "keep-alive"}
self.cjar = http.cookiejar.CookieJar()
self.cookie = urllib.request.HTTPCookieProcessor(self.cjar)
self.opener = urllib.request.build_opener(self.cookie)
urllib.request.install_opener(self.opener)
self.pool = ThreadPoolExecutor(max_workers=10)
self.sum = 0
# 利用socket模块,使得每次重新下载的时间变短
socket.setdefaulttimeout(20)
def download_file(self, url, target):
# 解决下载不完全问题且避免陷入死循环
try:
urllib.request.urlretrieve(url, target)
except socket.timeout:
count = 1
while count <= 5:
try:
urllib.request.urlretrieve(url, target)
break
except socket.timeout:
err_info = url + ' Reloading for %d time' % count if count == 1 else 'Reloading for %d times' % count
print(END, str(err_info) + '\n')
count += 1
except:
# 解决远程主机关闭问题
self.download_file(url, target)
if count > 5:
print(str('downloading fialed!'))
except:
# 解决远程主机关闭问题
self.download_file(url, target)
# 打开 m3u8
def open_web(self, url):
try:
response = self.opener.open(url, timeout=3)
except urllib.error.URLError as e:
print(END, str('open ' + url + ' error') + '\n')
if hasattr(e, 'code'):
print(END, str(e.code) + '\n')
if hasattr(e, 'reason'):
print(END, str(e.reason) + '\n')
else:
return response.read()
'''第一步、解析m3u8'''
def get_available_IP(self):
print(END, str('开始获取真实的url') + '\n')
data = self.open_web(self.m3u8_url).decode('utf-8')
if self.url == "":
tv_lists = re.findall('\n(https.*)\n', data)
else:
tv_lists = re.findall(',\n(.*).ts\n', data)
self.total = tv_lists.__len__()
return tv_lists
# 下载.ts
def download_for_multi_process(self, ts):
if self.url == "":
# 去除前缀 https://valipl-vip.cp12.wasu.tv
ts = ts.partition("https://valipl-vip.cp12.wasu.tv")[2]
ts_url = "https://ip1914634141.mobgslb.tbcache.com"+ts+"&ali_redirect_domain=valipl-vip.cp12.wasu.tv&ali_redirect_ex_ftag=c9f6fb6d0507b94ee99946cf3b7597ebff25a66476d6eb4b&ali_redirect_ex_tmining_ts=1607678029&ali_redirect_ex_tmining_expire=3600&ali_redirect_ex_hot=100"
downUrl = ts_url
else:
ts_url = ts + ".ts"
downUrl = self.url + ts_url
str_style = "%05d" % self.sum
path_str = str_style + '.ts'
downPath = self.sourceFile + path_str
if os.path.isfile(downPath) and os.path.getsize(downPath) > 0:
print(END, str("file already exist") + '\n')
else:
self.download_file(downUrl, downPath)
self.sum = self.sum + 1
jd = str(self.sum)+"/"+str(self.total)
print(END, str("进度:" +jd) + '\n')
# 开始线程
def download_with_multi_process(self, ts_list):
print(END, str('开始多线程下载') + '\n')
task = self.pool.map(self.download_for_multi_process, ts_list) # 此时非阻塞
for t in task: # 此时会变成阻塞
pass
'''第四步、合并ts文件'''
def merge_ts_file_with_os(self):
print(END, str('开始合并') + '\n')
# 合并ts文件
os.chdir(self.chdir)
shell_str = f'copy /b *.ts {self.merge_ts_path}'
os.system(shell_str)
os.system(f'del /Q *.ts')
os.chdir(self.sourceFile)
os.system(f'del /Q *.ts')
os.chdir(self.chdir)
print(END, str('合并完成') + '\n')
# 解密
def decodeFile(self):
key = self.key
self.chdir = self.sourceFile
if len(key):
# key=bytes(key)
cryptor = AES.new(key, AES.MODE_CBC, key)
# 读取ts文件
sourceFile = self.sourceFile
savefile_path = self.saveFile
list_file = os.listdir(sourceFile)
for file in list_file:
c_fule_name = file
file_line = file
if ".ts" in c_fule_name:
fo = open(os.path.join(sourceFile, file_line), 'rb');
if len(key): # AES 解密,有key就是需要解密
with open(os.path.join(savefile_path, c_fule_name), 'ab') as f:
f.write(cryptor.decrypt(fo.read()))
print(END, str("解密完成") + '\n')
self.chdir = self.saveFile
self.merge_ts_file_with_os() # 合并ts文件
#设置窗口
def set_init_window(self):
self.init_window_name.title("ts文件下载工具_v2.0 by: 龙林基") #窗口名
self.init_window_name.geometry('500x600+10+10')
#标签
self.init_data_label = Label(self.init_window_name, text="请求数据")
self.init_data_label.grid(row=0)
self.request_m3u8_label= Label(self.init_window_name, text="m3u8_url:")
self.request_m3u8_label.grid(row=2)
self.request_m3u8_Text = Entry(self.init_window_name, width=55)
self.request_m3u8_Text.grid(row=2, column=1, pady=5)
self.request_separator_label = Label(self.init_window_name, text="separator:")
self.request_separator_label.grid(row=3)
self.request_separator_Text = Entry(self.init_window_name, width=55) # separator录入框
self.request_separator_Text.grid(row=3, column=1, pady=5)
self.request_fileName_label = Label(self.init_window_name, text="file_name:")
self.request_fileName_label.grid(row=4)
self.request_fileName_Text = Entry(self.init_window_name, width=55) # fileName录入框
self.request_fileName_Text.grid(row=4, column=1, pady=5)
self.request_fileName_Text.insert(0, "fileName")
self.request_key_label = Label(self.init_window_name, text="key:")
self.request_key_label.grid(row=5)
self.request_key_Text = Entry(self.init_window_name, width=55) # ket录入框
self.request_key_Text.grid(row=5, column=1, pady=5)
self.request_sourceFile_label = Label(self.init_window_name, text="sourceFile:")
self.request_sourceFile_label.grid(row=6)
self.request_sourceFile_Text = Entry(self.init_window_name, width=55) # sourceFile录入框
self.request_sourceFile_Text.grid(row=6, column=1, pady=5)
self.request_sourceFile_Text.insert(0, "D:\\soft\\python\\project\\download\\dd\\")
self.request_saveFile_label = Label(self.init_window_name, text="saveFile:")
self.request_saveFile_label.grid(row=8)
self.request_saveFile_Text = Entry(self.init_window_name, width=55) # saveFile录入框
self.request_saveFile_Text.grid(row=8, column=1, pady=5)
self.request_saveFile_Text.insert(0, "D:\\soft\\python\\project\\download\\decode\\")
# 按钮
self.submit_button = Button(self.init_window_name, text="submit", width=5,
command=self.runmain) # 调用内部方法 加()为直接调用
self.submit_button.grid(row=12, column=0, sticky=W, pady=5)
self.reset_button = Button(self.init_window_name, text="reset", width=5,
command=self.reset) # 调用内部方法 加()为直接调用
self.reset_button.grid(row=12, column=1, sticky=W, pady=5)
# 主函数
def runmain(self):
m3u8s = []
names = []
keys = []
substr = str(self.request_separator_Text.get())
if len(str(self.request_m3u8_Text.get())) > 1:
m3u8s = str(self.request_m3u8_Text.get()).split(",")
if len(str(self.request_fileName_Text.get())) > 1:
names = str(self.request_fileName_Text.get()).split(",")
if len(str(self.request_key_Text.get())) > 1:
keys = str(self.request_key_Text.get()).split(",")
# 下载路径
self.sourceFile = str(self.request_sourceFile_Text.get()).split("\n")[0]
self.saveFile = str(self.request_saveFile_Text.get()).split("\n")[0]
# 判断路径是否存在
my_file = Path(self.sourceFile)
if not my_file.exists():
os.makedirs(my_file)
my_file = Path(self.saveFile)
if not my_file.exists():
os.makedirs(my_file)
# 循环下载
for i in range(len(m3u8s)):
m3u8_url = m3u8s[i]
if substr == '':
web_url="";
else:
web_url = m3u8_url.partition(substr)[0];
file_name = re.sub('\s+', '', names[i]).strip()
if keys.__len__() is 0:
key = ''
else:
key = keys[i].encode('utf8')
merge_ts_path = f'{file_name}.mp4'
self.m3u8_url = m3u8_url
self.url = web_url
self.merge_ts_path = merge_ts_path
self.key = key
self.sum =0
# down = MY_GUI(web_url, m3u8_url, merge_ts_path, key) # 构造方法
ts_list = self.get_available_IP() # 第一步、获取真正的ts 列表
self.download_with_multi_process(ts_list) # 开始多线程下载
self.decodeFile()
# reset
def reset(self):
# self.log_data_Text.delete()
self.request_m3u8_Text.delete(0, END)
self.request_separator_Text.delete(0, END)
self.request_fileName_Text.delete(0, END)
self.request_key_Text.delete(0, END)
#获取当前时间
def get_current_time(self):
current_time = time.strftime('%Y-%m-%d %H:%M:%S',time.localtime(time.time()))
return current_time
#日志动态打印
def write_log_to_Text(self,logmsg):
global LOG_LINE_NUM
current_time = self.get_current_time()
logmsg_in = str(current_time) +" " + str(logmsg) + "\n" #换行
self.log_data_Text.insert(END, logmsg_in)
def gui_start():
init_window = Tk() #实例化出一个父窗口
ZMJ_PORTAL = MY_GUI(init_window)
# 设置根窗口默认属性
ZMJ_PORTAL.set_init_window()
init_window.mainloop() #父窗口进入事件循环,可以理解为保持窗口运行,否则界面不展示
gui_start()
【声明】本内容来自华为云开发者社区博主,不代表华为云及华为云开发者社区的观点和立场。转载时必须标注文章的来源(华为云社区)、文章链接、文章作者等基本信息,否则作者和本社区有权追究责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱:
cloudbbs@huaweicloud.com
- 点赞
- 收藏
- 关注作者
评论(0)