前往小程序,Get更优阅读体验!
立即前往
首页
学习
活动
专区
工具
TVP
发布
社区首页 >专栏 >又见dask! 如何使用dask-geopandas处理大型地理数据

又见dask! 如何使用dask-geopandas处理大型地理数据

作者头像
用户11172986
发布2024-06-20 18:10:25
710
发布2024-06-20 18:10:25
举报
文章被收录于专栏:气python风雨气python风雨

前言

读者来信

我之前是 1、先用arcgis 栅格转点 2、给点添加xy坐标 3、给添加xy坐标后的点通过空间连接的方式添加行政区属性 4、最后计算指定行政区的质心 之前的解决办法是用arcgis 完成第一步和第二步,虽然完成的很慢,但是看起来好像没太大问题 但是第三步用arcgis会卡死,后来用geopandas也会卡死,后来了解到dask-geopandas,但是处理了两百万个点左右好像也报错了,不知道是我写的代码有问题还是我对dask的理解有问题,想要请教一下大佬

读者的问题涉及到地理信息系统(GIS)操作的一系列步骤,具体包括将栅格数据转换为点数据、为这些点数据添加XY坐标、通过空间连接给这些点添加行政区属性、以及计算指定行政区的质心。读者在使用ArcGIS软件完成前两步时未遇到明显问题,但在执行第三步时遇到了性能瓶颈,即使用ArcGIS和GeoPandas进行空间连接操作时系统会卡死。为了解决这个问题,读者尝试使用了dask-geopandas来处理约两百万个点的数据,但似乎遇到了错误。

针对这个情况,我们可以从几个方面进行分析和建议:

性能瓶颈分析:

ArcGIS和GeoPandas在处理大量数据时可能会遇到性能问题,特别是在普通硬件上运行时。这是因为这些操作往往需要大量的内存和CPU资源。 空间连接特别是在点数据量很大时,是一个资源密集型的操作,因为它需要对每个点检查其与其他几何对象(如行政区边界)的空间关系。 dask-geopandas的使用:

dask-geopandas旨在解决类似的性能问题,通过并行计算和延迟执行来提高处理大规模地理空间数据的效率。如果在使用dask-geopandas时遇到错误,可能是由于多种原因导致的,包括但不限于代码问题、内存管理、任务调度等。 为了更好地诊断问题,需要检查错误消息的具体内容。这可能会指示是配置问题、资源不足还是代码逻辑错误。 优化建议:

资源分配:确保有足够的计算资源(CPU和内存)来处理数据。对于dask-geopandas,可以通过调整Dask的工作进程数和内存限制来优化性能。 代码审查:仔细检查实现代码,尤其是dask-geopandas的部分,确认是否正确使用了并行计算和数据分区功能。 批处理:如果可能,尝试将数据分成更小的批次进行处理,而不是一次性处理所有点。这可以帮助减少内存压力。 索引和优化:在进行空间连接之前,为行政区数据建立空间索引可以大大提高查询效率。

注意,运行前需要将input的rar文件解压后再运行程序

dask_geopandas环境部署

花了一番功夫解决环境问题,使用以下步骤即可使用dask_geopandas

In [1]:

代码语言:javascript
复制
代码语言:javascript
复制
!pip install dask_geopandas -i https://pypi.mirrors.ustc.edu.cn/simple/
!pip install PyGEOS -i https://pypi.mirrors.ustc.edu.cn/simple/
!pip install --upgrade shapely https://pypi.mirrors.ustc.edu.cn/simple/
!pip install pyogrio -i https://pypi.mirrors.ustc.edu.cn/simpl
代码语言:javascript
复制

dask_geopandas简单示例

将 GeoPandas DataFrame 转换为 Dask-GeoPandas DataFrame 首先,使用 GeoPandas 读取地理数据文件:

python import geopandas df = geopandas.read_file('...') # 使用你的文件路径替换 '...' 然后,将其转换为 Dask-GeoPandas DataFrame:

python import dask_geopandas

将 GeoPandas DataFrame 分区为 Dask-GeoPandas DataFrame,这里分为4个部分

ddf = dask_geopandas.from_geopandas(df, npartitions=4) 默认情况下,这会根据行来简单地重新分区数据。但是,你也可以提供空间分区,以利用 GeoDataFrame 的空间结构。

python

执行空间重分区

ddf = ddf.spatial_shuffle() GeoPandas 的熟悉的空间属性和方法也可用,并且将并行计算:

python

计算几何对象的面积

ddf.geometry.area.compute()

检查几何对象是否在某个多边形内

ddf.within(polygon) 此外,如果你有一个分布式的 dask.dataframe,你可以将 x-y 点的列传递给 set_geometry 方法来设置几何形状。

python import dask.dataframe as dd import dask_geopandas

从 CSV 文件读取数据

ddf = dd.read_csv('...') # 使用你的文件路径替换 '...'

使用经纬度设置几何形状

ddf = ddf.set_geometry( dask_geopandas.points_from_xy(ddf, 'longitude', 'latitude') ) 目前支持 Parquet 和 Feather 文件格式的写入(以及读回):

python

写入到 Parquet 文件

ddf.to_parquet("path/to/dir/")

从 Parquet 文件读取

ddf = dask_geopandas.read_parquet("path/to/dir/") 传统的 GIS 文件格式可以读入到分区的 GeoDataFrame 中(需要 pyogrio),但不支持写入:

python

读取文件,这里以 GeoPackage 文件为例,同时指定分区数为4

ddf = dask_geopandas.read_file("file.gpkg", npartitions=4) 以上就是如何使用 Dask-GeoPandas 对大型地理空间数据进行高效处理的简单示例。

原程序

In [2]:

代码语言:javascript
复制
代码语言:javascript
复制
import geopandas as gpd
import time # 添加时间模块

# 添加dask模块
import dask_geopandas

def process_row():
    outwen = '/home/mw/input/dask6250/201105.shp'
    bianjie = '/home/mw/input/dask6250/xian/2023xian.shp'
    jiabianjie = './'
    
    start_time3 = time.time()
    
    # 读取输入和裁剪边界的 shapefile
    target_gdf = gpd.read_file(outwen)
    join_gdf = gpd.read_file(bianjie)
    
    # 改成dask方式
    target_gdfnew = dask_geopandas.from_geopandas(target_gdf, npartitions=4)
       
    # 重新投影参与连接的边界以匹配目标几何图形的 CRS
    join_gdf = join_gdf.to_crs(target_gdf.crs)
    
    # 改成dask方式
    join_gdfnew = dask_geopandas.from_geopandas(join_gdf, npartitions=4)
    
    # 使用空间连接找到相交的部分
    joined = gpd.sjoin(target_gdfnew, join_gdfnew, how='inner', predicate='intersects')
    
    # 将 'bianjie' 中的属性添加到 'outwen' 中
    joined = joined.drop(columns='index_right')  # 移除多余的索引列
    result = target_gdfnew.merge(joined, how='left', on=target_gdfnew.columns.to_list())
    
    # 保存结果到输出边界
    result.to_file(jiabianjie,  encoding='utf-8-sig')  # 确保使用正确的编码
    
    end_time3 = time.time()
    execution_time3 = end_time3 - start_time3
    
    print(f"'{jiabianjie}'已添加边界,开始时间为:{start_time3:.2f},结束时间为:{end_time3:.2f},执行时间为:{execution_time3:.2f}秒")

process_row()

print('finish')
代码语言:javascript
复制

好,运行一段时间爆内存了,应该考虑以下优化策略:

  1. 直接在Dask中读取Shapefiles

你的代码先用geopandas读取Shapefile,然后转换为dask_geopandas对象。这个过程中,原始数据会完全加载到内存中,这可能是导致内存溢出的原因之一。相反,你应该直接使用dask_geopandas.read_file来避免将整个数据集一次性加载到内存:

代码语言:javascript
复制
python    
target_dgdf = dask_geopandas.read_file(outwen, npartitions=4)    
join_dgdf = dask_geopandas.read_file(bianjie, npartitions=4)
  1. 避免不必要的数据复制

在数据处理过程中,尽量减少不必要的数据复制。例如,在合并或连接操作之前,仔细考虑是否所有列都需要参与操作。

  1. 使用更高效的空间连接

在使用dask_geopandas进行空间连接时,确保操作是高效的。你的代码尝试使用geopandas.sjoin,但是应该使用dask_geopandas.sjoin。此外,确保在执行空间连接之前,两个数据集已经有了匹配的坐标参考系统(CRS)。这样可以避免在每个分区上重复昂贵的CRS转换操作。

  1. 调整npartitions

npartitions的选择对性能和内存使用有重大影响。太少的分区可能会导致单个分区过大,而太多的分区则会增加调度开销。你可能需要实验不同的npartitions值来找到最佳平衡。

  1. 检查最终保存步骤

在保存结果时,如果尝试将整个处理后的数据集写入单个文件,这可能也会导致内存问题。dask_geopandas目前可能不支持直接写入文件格式如Shapefile,因为这通常涉及将数据集合并到单个分区。你可能需要先将数据写入Parquet等格式,或者手动分批写入。

2.0

对自己的资源修改npartitions参数

In [1]:

代码语言:javascript
复制
代码语言:javascript
复制
import dask_geopandas as dgd
import time

input_shapefile = '/home/mw/input/dask6250/201105.shp'
boundary_shapefile = '/home/mw/input/dask6250/xian/2023xian.shp'
output_directory = './output/'

def process_row(target_gdf, join_gdf, jiabianjie_pat):


    start_time = time.time()
    
    # 直接使用dask-geopandas读取shapefiles
    target_dgdf = dgd.read_file(input_shapefile, npartitions=16)
    join_dgdf = dgd.read_file(boundary_shapefile, npartitions=16)
    
    # 确保边界shapefile与目标shapefile的CRS一致
    join_dgdf = join_dgdf.to_crs(target_dgdf.crs)
    
    # 使用空间连接找到相交的部分
    joined = dgd.sjoin(target_dgdf, join_dgdf, how='inner', predicate='intersects')
    
    # 移除多余的索引列
    joined = joined.drop(columns='index_right')
    

    joined.compute().to_file(output_directory + 'result.shp', driver='ESRI Shapefile', encoding='utf-8')

    # end_time = time.time()
    # print(f"已添加边界,开始时间为:{start_time:.2f},结束时间为:{end_time:.2f},执行时间为:{end_time - start_time:.2f}秒")

if __name__ == "__main__":
    process_row(input_shapefile,boundary_shapefile,output_directory)
    print('finish')
代码语言:javascript
复制

In [ ]:

代码语言:javascript
复制
代码语言:javascript
复制
import dask_geopandas as dgd
import time
import gc  # 导入垃圾收集器

input_shapefile = '/home/mw/input/dask6250/201105.shp'
boundary_shapefile = '/home/mw/input/dask6250/xian/2023xian.shp'
output_directory = './'

def process_row(target_gdf, join_gdf, jiabianjie_pat):
    start_time = time.time()
    
    # 根据你的硬件配置调整npartitions,减少分区数以减少内存开销
    target_dgdf = dgd.read_file(input_shapefile, npartitions=16)  # 适当调整npartitions
    join_dgdf = dgd.read_file(boundary_shapefile, npartitions=16)  # 适当调整npartitions
    
    join_dgdf = join_dgdf.to_crs(target_dgdf.crs)
    
    joined = dgd.sjoin(target_dgdf, join_dgdf, how='inner', predicate='intersects')
    
    joined = joined.drop(columns='index_right')
    
    # 计算结果前启动垃圾收集
    gc.collect()
    
    joined.compute().to_file(output_directory + 'result.shp', driver='ESRI Shapefile', encoding='utf-8')
    
    # 手动启动垃圾收集释放内存
    gc.collect()

    end_time = time.time()
    print(f"已添加边界,开始时间为:{start_time:.2f},结束时间为:{end_time:.2f},执行时间为:{end_time - start_time:.2f}秒")

if __name__ == "__main__":
    process_row(input_shapefile, boundary_shapefile, output_directory)
    print('finish')
代码语言:javascript
复制
代码语言:javascript
复制
/opt/conda/lib/python3.9/site-packages/geopandas/_compat.py:111: UserWarning: The Shapely GEOS version (3.10.0-CAPI-1.16.0) is incompatible with the GEOS version PyGEOS was compiled with (3.10.4-CAPI-1.16.2). Conversions between both will be slow.
  warnings.warn(
/opt/conda/lib/python3.9/site-packages/geopandas/io/file.py:362: FutureWarning: pandas.Int64Index is deprecated and will be removed from pandas in a future version. Use pandas.Index with the appropriate dtype instead.
  pd.Int64Index,

3.0

分批运行与采用gpkg方式存储

In [3]:

代码语言:javascript
复制
代码语言:javascript
复制
import dask_geopandas as dgd
import time
import gc
from dask import delayed, compute  # 从dask中导入compute函数

input_shapefile = '/home/mw/input/dask6250/201105.shp'
boundary_shapefile = '/home/mw/input/dask6250/xian/2023xian.shp'
output_directory = './'

@delayed
def process_batch(batch, join_gdf, output_path):
    # 将边界数据转换为目标数据的坐标参考系统
    join_gdf = join_gdf.to_crs(batch.crs)
    
    # 执行空间连接
    joined = dgd.sjoin(batch, join_gdf, how='inner', predicate='intersects')
    
    # 删除不必要的列
    joined = joined.drop(columns='index_right')
    
    # 计算并保存结果
    joined.compute().to_file(output_path, driver='GPKG', layer='result', encoding='utf-8')  # 使用GeoPackage格式

def main():
    start_time = time.time()
    
    # 明确指定npartitions
    target_dgdf = dgd.read_file(input_shapefile, npartitions=16)  # 明确设置npartitions
    join_dgdf = dgd.read_file(boundary_shapefile, npartitions=16)  # 明确设置npartitions
    
    # 将目标数据集分批处理
    batches = target_dgdf.to_delayed()
    
    tasks = [process_batch(batch, join_dgdf, f"{output_directory}result_{i}.gpkg") for i, batch in enumerate(batches)]
    
    # 使用dask的compute函数来执行所有延迟任务
    compute(*tasks)
    
    gc.collect()  # 手动启动垃圾收集释放内存
    
    end_time = time.time()
    print(f"已添加边界,开始时间为:{start_time:.2f},结束时间为:{end_time:.2f},执行时间为:{end_time - start_time:.2f}秒")

if __name__ == "__main__":
    main()
    print('finish')
代码语言:javascript
复制
代码语言:javascript
复制
/opt/conda/lib/python3.9/site-packages/geopandas/_compat.py:111: UserWarning: The Shapely GEOS version (3.10.0-CAPI-1.16.0) is incompatible with the GEOS version PyGEOS was compiled with (3.10.4-CAPI-1.16.2). Conversions between both will be slow.
  warnings.warn(

注意,由于资源限制,以上最终的result并没有运行完全,可以看到project目录下还有一部分gpkg 因为输出文件大于1g的限制,还请有兴趣的在自己的电脑运行,根据相应资源修改参数

另外gpkg可以使用geopandas转为为需要的shp

In [ ]:

代码语言:javascript
复制
代码语言:javascript
复制
import geopandas as gpd
import pandas as pd

# GeoPackage文件列表
gpkg_files = ['path/to/your/first_file.gpkg', 'path/to/your/second_file.gpkg']

# 读取所有GeoPackage文件到GeoDataFrame列表中
gdf_list = [gpd.read_file(gpkg) for gpkg in gpkg_files]

# 合并GeoDataFrame对象
merged_gdf = pd.concat(gdf_list, ignore_index=True)

# 指定输出Shapefile的路径
output_shp_path = 'path/to/your/output_file.shp'

# 将合并后的GeoDataFrame保存为Shapefile
merged_gdf.to_file(output_shp_path, driver='ESRI Shapefile')

print(f"合并后的Shapefile已保存至:{output_shp_path}")
代码语言:javascript
复制

点击链接可查看完整代码与在线运行代码

本文参与 腾讯云自媒体同步曝光计划,分享自微信公众号。
原始发表:2024-02-07,如有侵权请联系 cloudcommunity@tencent.com 删除

本文分享自 气python风雨 微信公众号,前往查看

如有侵权,请联系 cloudcommunity@tencent.com 删除。

本文参与 腾讯云自媒体同步曝光计划  ,欢迎热爱写作的你一起参与!

评论
登录后参与评论
0 条评论
热度
最新
推荐阅读
目录
  • 前言
  • dask_geopandas环境部署
  • dask_geopandas简单示例
    • 将 GeoPandas DataFrame 分区为 Dask-GeoPandas DataFrame,这里分为4个部分
      • 执行空间重分区
        • 计算几何对象的面积
          • 检查几何对象是否在某个多边形内
            • 从 CSV 文件读取数据
              • 使用经纬度设置几何形状
                • 写入到 Parquet 文件
                  • 从 Parquet 文件读取
                    • 读取文件,这里以 GeoPackage 文件为例,同时指定分区数为4
                    • 原程序
                    • 2.0
                    • 3.0
                    相关产品与服务
                    GPU 云服务器
                    GPU 云服务器(Cloud GPU Service,GPU)是提供 GPU 算力的弹性计算服务,具有超强的并行计算能力,作为 IaaS 层的尖兵利器,服务于生成式AI,自动驾驶,深度学习训练、科学计算、图形图像处理、视频编解码等场景。腾讯云随时提供触手可得的算力,有效缓解您的计算压力,提升业务效率与竞争力。
                    领券
                    问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档