大家好,又见面了,我是你们的朋友全栈君。
import urllib.request
import requests, os, threading
from Crypto.Cipher import AES
from src.Pacho.moviePa.tsdownload import aes_decode
class m3u8down(object):
def __init__(self, url, listheaders, dicheaders):
self.url = url # 这里的url是index.m3u8地址
self.headers = listheaders
self.header = dicheaders
self.ts_parts = []
self.down_path = 'D:/workspace/download/Mp4'
self.tsthreads = []
self.key = None
def aes_decode(self, data, key):
"""AES解密
:param key: 密钥(16.32)一般16的倍数
:param data: 要解密的数据
:return: 处理好的数据
"""
cryptor = AES.new(key, AES.MODE_CBC, key)
plain_text = cryptor.decrypt(data)
return plain_text.rstrip(b'\0') # .decode("utf-8")
def to_ts(self):
requests.packages.urllib3.disable_warnings()
content_all = requests.get(self.url, verify=False, timeout=200).text
if "#EXTM3U" not in content_all:
raise BaseException("非M3U8的链接")
if "EXT-X-VERSION" in content_all:
file_line = content_all.split("\n")
# print(file_line)
self.get_tsurls(self.url, file_line)
def get_tsurls(self, m3u8url, lines):
for index, line in enumerate(lines): # m3u8文件中有ts,获取ts地址并添加索引
if "EXTINF" in line: # 找ts地址
if "/" not in lines[index + 1]: # 判断.ts是否是路径 'DjbgADY7468014.ts' or '/20181221/.../VRYKBY4319009.ts'
ts_url = m3u8url.rsplit("/", 1)[0] + "/" + lines[index + 1] # 拼出ts片段的URL
else:
ts_url = m3u8url.rsplit("/", 1)[0] + "/" + lines[index + 1].rsplit("/", 1)[-1] # 拼出ts片段的URL
self.ts_parts.append(ts_url)
if "#EXT-X-KEY" in line:
# #EXT-X-KEY:METHOD=AES-128,URI="encryption.key"
key_url = m3u8url.rsplit("/", 1)[0] + "/" + line.split('"')[1]
self.key = requests.get(url=key_url, timeout=120, headers=self.header).content # 获取秘钥
def load_ts(self, ts_url, files, count):
if self.key:
self.auto_keydown(ts_url, files, self.header, self.key)
else:
self.auto_down(ts_url, files, self.headers)
print('第 %d/%d 个文件下载完成, 下载地址是%s' % (count, len(self.ts_parts), ts_url))
count += 1
def auto_down(self, url, filename, headers): # 下载失败后,自调用从新下载
try:
opener = urllib.request.build_opener() # 创建opener对象
opener.addheaders = self.headers
urllib.request.install_opener(opener) # 将opener对象装入urllib.request
urllib.request.urlretrieve(url, filename)
except Exception as ex:
# print(ex.args, url)
return self.auto_down(url, filename, headers)
def auto_keydown(self, url, filename, headers, key): # 下载失败后,自调用从新下载
try:
response = requests.get(url=url, timeout=120, headers=headers)
with open(filename, 'ab+') as f:
data = aes_decode(response.content, key)
f.write(data)
f.close()
except Exception as ex:
# print(ex.args, url)
return self.auto_down(url, filename, headers, key)
def threads(self):
for i in range(len(self.ts_parts)):
files = self.down_path + '/' + 'tsm{:0>5}.ts'.format(i)
if os.path.exists(files): # 判断文件是已下载,且文件大小变为空。是则结束本次循环,继续循环
sz = os.path.getsize(files)
if not sz:
os.remove(files) # 删除空文件
print("删除空字节视频文件", files.rsplit("/", 1)[-1])
else:
continue
t = threading.Thread(target=self.load_ts, args=(self.ts_parts[i], files, i))
self.tsthreads.append(t)
def main(): # This is m3u8 url
url = 'https://www.XXXXX.com/20200612/jDCLCWyb/1500kb/hls/index.m3u8'
hd = [('User-Agent', 'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/36.0.1941.0 Safari/537.36')]
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 6.3; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.36",
"Accept-Encoding": "gzip, deflate, br", "Connection": "keep-alive",
"Accept-Language": "zh-CN,zh;q=0.8,zh-TW;q=0.7,zh-HK;q=0.5,en-US;q=0.3,en;q=0.2"}
m3u8 = m3u8down(url, hd, headers)
m3u8.to_ts()
m3u8.threads()
for th in m3u8.tsthreads:
th.start()
for th in m3u8.tsthreads:
th.join()
print("{:-^20}".format("下载结束"))
if __name__ == '__main__':
main()
发布者:全栈程序员栈长,转载请注明出处:https://javaforall.cn/161206.html原文链接:https://javaforall.cn