前言
当前镜像:气象分析3.9
资源:4核16g 注意分开运行,不然会爆内存
阅读本文你将学到:
Dask、multiprocessing、ThreadPoolExecutor、和joblib都是Python中用于实现并行计算和任务调度的库或模块,各有其特点和应用场景:
Dask Dask 是一个灵活的并行计算库,专为大规模数据处理设计。它提供了高级的数据结构,如分布式数组(Dask Array)和数据帧(Dask DataFrame),使得用户能够在分布式内存中处理数据,就像操作常规的NumPy数组或Pandas DataFrame一样。Dask能够自动将计算任务分解成小块并在多核CPU或分布式计算集群上执行,非常适合处理超出单机内存限制的数据集。Dask还提供了一个分布式任务调度器,可以管理计算资源,优化任务执行顺序。
特长与区别:
特长:处理大型数据集,易于扩展到多台机器,高级数据结构支持。 区别:相比其他库,Dask提供了更高级别的抽象,特别适合于数据科学和大数据分析领域。
multiprocessing multiprocessing 是Python标准库的一部分,用于创建多进程应用程序。它允许程序利用多核处理器的能力,通过创建独立的进程来执行任务,从而实现并行计算。multiprocessing模块提供了进程、进程池、队列、锁等多种同步原语,支持进程间的通信和数据共享,适合CPU密集型任务。
特长与区别:
特长:充分利用多核CPU,适用于CPU密集型任务,标准库组件,无需安装额外库。 区别:与线程相比,进程间通信复杂,创建和管理成本较高,但不受GIL限制。
ThreadPoolExecutor ThreadPoolExecutor 是 concurrent.futures 模块中的一个类,用于简化线程池的管理和使用。它基于线程,适合执行大量I/O密集型任务,如网络请求和文件读写,因为线程在等待I/O时可以被切换出去,让其他线程继续执行。线程池自动管理线程的创建和回收,减少了线程创建的开销。
特长与区别:
特长:简化线程池管理,适合I/O密集型任务,快速任务调度。 区别:受GIL限制,在CPU密集型任务中可能不会带来性能提升。
joblib joblib 是一个轻量级的并行处理和内存缓存库,广泛应用于机器学习和科学计算中。它特别擅长于重复任务的并行执行,如交叉验证、参数扫描等,并提供了对numpy数组友好的序列化机制,减少了数据传输的成本。joblib的一个重要特点是它的智能缓存机制,可以避免重复计算,加速训练过程。
特长与区别:
特长:针对数值计算优化,高效的内存缓存,易于在数据科学和机器学习中集成。 区别:相比Dask,joblib更专注于简单的并行任务和数据处理,不提供复杂的分布式计算能力。
选择哪个库取决于具体的应用场景:对于大规模数据处理和分布式计算,Dask是一个好选择;对于CPU密集型任务,multiprocessing更合适;处理大量I/O操作时,ThreadPoolExecutor是优选;而在机器学习和科学计算领域,joblib凭借其高效缓存和对numpy的支持脱颖而出。
由于可视化代码过长隐藏,可点击
更快更强!四种Python并行库批量处理nc数据
运行Fork查看 🔜🔜若没有成功加载可视化图,点击运行可以查看 ps:隐藏代码在【代码已被隐藏】所在行,点击所在行,可以看到该行的最右角,会出现个三角形,点击查看即可
In [10]:
%%timeit
import os
import glob
from netCDF4 import Dataset
from wrf import getvar, latlon_coords
import numpy as np
def read_and_extract_slp(file_path):
with Dataset(file_path) as wrf_file:
slp = getvar(wrf_file, 'slp')
lat, lon = latlon_coords(slp)
# 使用向量化操作找到最小slp的索引
min_loc = np.unravel_index(slp.argmin(), slp.shape)
return [lat[min_loc].data, lon[min_loc].data]
# 获取WRF文件列表
wrf_files = glob.glob('/home/mw/input/typhoon9537/*')
# 初始化存储结果的列表
slp_list = []
# 使用for循环遍历文件列表
for file_path in wrf_files:
slp_data = read_and_extract_slp(file_path)
slp_list.append(slp_data)
8.12 s ± 130 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
In [1]:
%%timeit
import xarray as xr
import dask.array as da
import dask
import glob
import os
from netCDF4 import Dataset
import numpy as np
from wrf import getvar,latlon_coords
# 定义一个函数来读取WRF文件并提取slp变量
def read_and_extract_slp(file_path):
wrf_file = Dataset(file_path)
slp = getvar(wrf_file, 'slp')
lat, lon = latlon_coords(slp)
# 使用向量化操作找到最小slp的索引
min_loc = np.unravel_index(slp.argmin(), slp.shape)
return [lat[min_loc].data, lon[min_loc].data]
# 获取WRF文件列表
wrf_files = glob.glob('/home/mw/input/typhoon9537/*')
# 使用Dask并行处理批量读取和提取slp变量
slp_data = [dask.delayed(read_and_extract_slp)(file) for file in wrf_files]
slp_data_computed = da.compute(*slp_data)
# 将结果存储到一个列表中
slp_list = list(slp_data_computed)
6.83 s ± 267 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
In [1]:
%%timeit
import os
import glob
from netCDF4 import Dataset
from wrf import getvar, latlon_coords
import numpy as np
from concurrent.futures import ThreadPoolExecutor
# 定义一个函数来读取WRF文件并提取slp变量
def read_and_extract_slp(file_path):
with Dataset(file_path) as wrf_file:
slp = getvar(wrf_file, 'slp')
lat, lon = latlon_coords(slp)
# 使用向量化操作找到最小slp的索引
min_loc = np.unravel_index(slp.argmin(), slp.shape)
return [lat[min_loc].data, lon[min_loc].data]
# 获取WRF文件列表
wrf_files = glob.glob('/home/mw/input/typhoon9537/*')
# 使用ThreadPoolExecutor来并行处理文件读取
def process_files(file_list):
slp_list = []
with ThreadPoolExecutor() as executor:
# 使用map方法并行执行read_and_extract_slp函数
slp_list = list(executor.map(read_and_extract_slp, file_list))
return slp_list
# 调用函数并获取结果
slp_list = process_files(wrf_files)
6.98 s ± 240 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
没想到ThreadPoolExecutor速度偶尔胜过dask,但是内存容易炸
那么还是dask的内存管理更胜一筹
In [1]:
%%timeit
from joblib import Parallel, delayed
import os
import glob
from netCDF4 import Dataset
from wrf import getvar, latlon_coords
import numpy as np
# 定义一个函数来读取WRF文件并提取slp变量
def read_and_extract_slp(file_path):
with Dataset(file_path) as wrf_file:
slp = getvar(wrf_file, 'slp')
lat, lon = latlon_coords(slp)
# 使用向量化操作找到最小slp的索引
min_loc = np.unravel_index(slp.argmin(), slp.shape)
return [lat[min_loc].data, lon[min_loc].data]
# 获取WRF文件列表
wrf_files = glob.glob('/home/mw/input/typhoon9537/*')
# 使用joblib并行处理
slp_list = Parallel(n_jobs=-1)(delayed(read_and_extract_slp)(file) for file in wrf_files)
3 s ± 59.1 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
In [6]:
import os
import glob
from netCDF4 import Dataset
from wrf import getvar, latlon_coords
import numpy as np
# 定义一个函数来读取WRF文件并提取slp变量
def read_and_extract_slp(file_path):
with Dataset(file_path) as wrf_file:
slp = getvar(wrf_file, 'slp')
lat, lon = latlon_coords(slp)
# 使用向量化操作找到最小slp的索引
min_loc = np.unravel_index(slp.argmin(), slp.shape)
return [lat[min_loc].data, lon[min_loc].data]
In [9]:
%%timeit
import multiprocessing
# 获取WRF文件列表
wrf_files = glob.glob('/home/mw/input/typhoon9537/*')
# 并行处理
with multiprocessing.Pool() as pool:
slp_list = pool.map(read_and_extract_slp, wrf_files)
3.91 s ± 33.9 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
为什么要将函数和并行分开呢
因为multiprocessing需要确保函数定义在顶级作用域
如果合并运行就会出现以下报错
AttributeError:Can't picklelocal object 'inner..read_and_extract_slp'
出现这个错误是因multiprocessing 在尝试将函数 read_and_extract_slp 传递给子进程时遇到了问题。默认情况下,multiprocessing 使用 pickle 模块来序列化要传递的对象,但 pickle 不能序列化定义在交互式会话或某些特定上下文中的函数。
以上测试均为七次循环求平均
获胜者为joblib
当然只是这里的任务比较特别,要是涉及到纯大型数组计算可能还是dask更胜一筹
简单说一下,当资源为2核8g或者数据量较小时,并行可能并无优势,可能调度完时循环已经跑完了
资源改为4核16g时,并行超越了单循环
当你核数和内存都没困扰时当然是上并行快 ,但是环境不一定能适应多线程
资源匮乏或者无法解决环境问题时还是老实循环或者在列表推导式上做点文章