requests.get(image_url).content 可以读取一个 image_url 中的内容,但是如果有多个 image_url,读取速度会比较慢。因此,可以利用多进程队列去处理。模板如下:
import requests
import json
import traceback
import multiprocessing as mp
queue_before_downloader = mp.Queue() # 队列保存处理之前的数据
queue_after_downloader = mp.Queue() # 队列保存处理之后的数据
num_workers = 10
def chunk(chunk_size=64, num_workers=10): # chunk 一个 batch 的结果
global args
count_none = 0
global queue_after_downloader
ret = []
while True:
item = queue_after_downloader.get()
if item is None:
count_none += 1
if count_none == num_workers:
if len(ret) != 0:
print('latest chunk') # 最后一次chunk
yield ret
return
continue
ret.append(item)
if len(ret) == chunk_size:
yield ret
ret = []
def process_sample(): # 一次解析一个url数据,耗时的函数
global queue_before_downloader
global queue_after_downloader
while True:
info = queue_before_downloader.get()
if info is None: # 最后的处理
print ('put None')
queue_after_downloader.put(None)
break
try:
result = requests.get(url).content
except:
continue
queue_after_downloader.put(result) # 解析后的结果再放入队列
def read_json():
global queue_before_downloader
with open('xxx.json', 'r') as f:
lines = f.readlines()
lines = [json.loads(x) for x in lines]
print(len(lines))
for _line in lines:
queue_before_downloader.put(_line['url']) # 把 url 保存在 before 队列之中
def main():
start = time.time()
global num_workers
# 读取json文件中图像的url,放入多线程队列中
read_json()
global queue_before_downloader
for _ in range(num_workers): # 准备多个workers一起干活
queue_before_downloader.put(None)
processes = []
for _ in range(num_workers):
process = mp.Process(target=process_sample) # 多进程处理函数
processes.append(process)
for process in processes: # 启动进程
process.start()
num_completed = 0
for _idx, items in enumerate(chunk(64, num_workers)): # chunk 一个 batch 处理后的数据
try:
urls = items # pairs; [url1, url2, ...,url64]
num_completed += len(urls)
print('--- {} : {} completed ---'.format(_idx+1, num_completed))
except:
#traceback.print_exc()
continue
for process in processes: # 回收进程
process.join()
if __name__ == "__main__":
main()