一直对python的多线程、多进程、分布式多进程比较好奇。今天浅浅地学习了一下,里面涉及的内容其实比较多,包括进程锁、进程间的通信、进程池、共享内存等等。这里给一个简单的、大家可能会常用到的例子——从多个wrfout文件中提取变量T2并单独保存输出为nc文件,一起感受下多进程的魅力。如果不妥之处,还望大家不吝赐教!
这份代码是大家实际中经常使用的,通过循环来实现从多个wrfout文件中提取变量T2并单独保存输出为nc文件。
import xarray as xr
import numpy as np
import glob
import sys
import os
import argparse
import time
def nc2pkl(args):
st =time.time()
if not os.path.exists(args.outdir):
os.mkdir(args.outdir)
files = glob.glob(args.files)
for file in files:
filename = file.split('/')[-1]
print('Reading ',file)
sys.stdout.flush()
ds = xr.open_dataset(file)
T2 = ds['T2']
sys.stdout.flush()
# Write out everything
ofile = args.outdir+'/'+filename+'.nc'
T2.to_netcdf(ofile)
print('Written to '+ofile)
et = time.time()
print(et -st)
def parse_args():
'''
Parser for nc2pkl
'''
parser = argparse.ArgumentParser()
parser.add_argument('-t','--template',type=str,default="../data/wrfout_d01_2018-08-01_00:00:00")
parser.add_argument('-f','--files',type=str,default="../data/wrfout_d01_2018-08*")
parser.add_argument('-o','--outdir',type=str,default="../output/T2")
return parser.parse_args()
if __name__ == '__main__':
nc2pkl(parse_args())
这份代码里面使用了多进程并行,从num_processes = 4可以知道开了4个进程同时处理,可以简单理解为同一时间同时处理4个wrfout文件。其实能开多少进程取决于我们的计算机有多少核数,在linux上可以通过nproc命令查看核数。
如果大家想使用下面的并行代码满足自己的需求,只需要更改被我用-----框起来的函数定义中的操作即可,比如更改变量,或者增加计算等。
import xarray as xr
import numpy as np
import multiprocessing as mp
from functools import partial
import glob
import sys
import os
import time
#--------------------------------------
def nc2pkl(file_path, output_dir):
if not os.path.exists(output_dir):
os.mkdir(output_dir)
filename = file_path.split('/')[-1]
print('Reading ',file_path)
sys.stdout.flush()
ds = xr.open_dataset(file_path)
T2 = ds['T2']
sys.stdout.flush()
# Write out everything
ofile = output_dir+'/'+filename+'.nc'
T2.to_netcdf(ofile)
print('Written to '+ofile)
#--------------------------------------
def parallel_nc2pkl(input_dir, output_dir, num_processes):
st = time.time()
# 获取所有需要处理的文件路径
file_paths = glob.glob(os.path.join(input_dir, "wrfout_d01_2018-08*"))
# 创建进程池
with mp.Pool(processes=num_processes) as pool:
# 使用partial函数创建一个只有一个参数的nc2pkl函数
worker_func = partial(nc2pkl, output_dir=output_dir)
# 将需要处理的文件路径传递给进程池
pool.map(worker_func, file_paths)
et = time.time()
print(et -st)
if __name__ == "__main__":
# 设置输入和输出目录
input_dir = "../data/"
output_dir = "../output/T2_multi"
# 设置进程数量
num_processes = 4
# 并行处理文件
parallel_nc2pkl(input_dir, output_dir, num_processes)
从中可以看到,并行代码极大地提升了速度。
无论是多进程还是多线程,一旦任务数量多到一个限度,就会消耗掉系统所有的资源,结果就是效率急剧下降,所有任务都做不好。
参考:
【1】https://mofanpy.com/tutorials/python-basic/multiprocessing/why
【2】https://www.liaoxuefeng.com/wiki/1016959663602400/1017627212385376