前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >Python并行——速度++++++++

Python并行——速度++++++++

作者头像
自学气象人
发布2023-06-21 15:02:59
2180
发布2023-06-21 15:02:59
举报
文章被收录于专栏:自学气象人

一直对python的多线程、多进程、分布式多进程比较好奇。今天浅浅地学习了一下,里面涉及的内容其实比较多,包括进程锁、进程间的通信、进程池、共享内存等等。这里给一个简单的、大家可能会常用到的例子——从多个wrfout文件中提取变量T2并单独保存输出为nc文件,一起感受下多进程的魅力。如果不妥之处,还望大家不吝赐教!

常规代码

这份代码是大家实际中经常使用的,通过循环来实现从多个wrfout文件中提取变量T2并单独保存输出为nc文件。

代码语言:javascript
复制
import xarray as xr
import numpy as np
import glob
import sys
import os
import argparse
import time 

def nc2pkl(args):
    st =time.time()
    if not os.path.exists(args.outdir):
            os.mkdir(args.outdir)
    files = glob.glob(args.files)
    
    for file in files:
        filename = file.split('/')[-1]
        print('Reading ',file)
        sys.stdout.flush()
        ds = xr.open_dataset(file)
        T2 = ds['T2']

        sys.stdout.flush()
        # Write out everything 
        ofile = args.outdir+'/'+filename+'.nc'
        T2.to_netcdf(ofile)
        print('Written to '+ofile)

    et = time.time()
    print(et -st)
  
def parse_args():
    '''
    Parser for nc2pkl
    '''
    parser = argparse.ArgumentParser()
    parser.add_argument('-t','--template',type=str,default="../data/wrfout_d01_2018-08-01_00:00:00")
    parser.add_argument('-f','--files',type=str,default="../data/wrfout_d01_2018-08*")
    parser.add_argument('-o','--outdir',type=str,default="../output/T2")
    return parser.parse_args()

if __name__ == '__main__':
    nc2pkl(parse_args())

多进程并行代码

这份代码里面使用了多进程并行,从num_processes = 4可以知道开了4个进程同时处理,可以简单理解为同一时间同时处理4个wrfout文件。其实能开多少进程取决于我们的计算机有多少核数,在linux上可以通过nproc命令查看核数。

如果大家想使用下面的并行代码满足自己的需求,只需要更改被我用-----框起来的函数定义中的操作即可,比如更改变量,或者增加计算等。

代码语言:javascript
复制
import xarray as xr
import numpy as np
import multiprocessing as mp
from functools import partial
import glob
import sys
import os
import time

#--------------------------------------
def nc2pkl(file_path, output_dir):
    if not os.path.exists(output_dir):
            os.mkdir(output_dir)
    
    filename = file_path.split('/')[-1]
    print('Reading ',file_path)
    sys.stdout.flush()
    ds = xr.open_dataset(file_path)
    T2 = ds['T2']

    sys.stdout.flush()
    # Write out everything 
    ofile = output_dir+'/'+filename+'.nc'
    T2.to_netcdf(ofile)
    print('Written to '+ofile)
#--------------------------------------

def parallel_nc2pkl(input_dir, output_dir, num_processes):
    st = time.time()
    # 获取所有需要处理的文件路径
    file_paths = glob.glob(os.path.join(input_dir, "wrfout_d01_2018-08*"))

    # 创建进程池
    with mp.Pool(processes=num_processes) as pool:
        # 使用partial函数创建一个只有一个参数的nc2pkl函数
        worker_func = partial(nc2pkl, output_dir=output_dir)

        # 将需要处理的文件路径传递给进程池
        pool.map(worker_func, file_paths)
    et = time.time()
    print(et -st)

if __name__ == "__main__":
    # 设置输入和输出目录
    input_dir = "../data/"
    output_dir = "../output/T2_multi"

    # 设置进程数量
    num_processes = 4

    # 并行处理文件
    parallel_nc2pkl(input_dir, output_dir, num_processes)

计算效率

常规代码耗时及CPU使用情况
并行代码耗时及CPU使用情况

从中可以看到,并行代码极大地提升了速度。

注意

无论是多进程还是多线程,一旦任务数量多到一个限度,就会消耗掉系统所有的资源,结果就是效率急剧下降,所有任务都做不好。

参考:

【1】https://mofanpy.com/tutorials/python-basic/multiprocessing/why

【2】https://www.liaoxuefeng.com/wiki/1016959663602400/1017627212385376

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2023-04-01,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 自学气象人 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 常规代码
  • 多进程并行代码
  • 计算效率
  • 注意
领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档