Как можно распараллелить функцию geopandas «to_file»

#python-3.x #python-multiprocessing #geopandas

#python-3.x #python-многопроцессорность #geopandas

Вопрос:

Я пытаюсь реализовать параллельную функцию для Geopandas, которая принимает единичные векторные данные (т. Е. Шейп-файл, содержащий тип данных Multipolygon) и преобразует его в стандартную сетку ячеек с размерами ячеек x и y, определенными пользователем.

Поскольку эта функция может привести к серьезным проблемам с памятью (т. Е. вызванным слишком высоким пространственным разрешением), мне было интересно, можно ли будет итеративно сохранять данные в заданном целевом файле. Таким образом, поскольку каждый параллельный процесс выполняет функцию «GRID», тот же процесс может сохранять данные итеративно в режиме добавления. Таким образом, я считаю, что у вас не будет проблем с памятью.

Вот моя «SHP_to_GRID_Function». Обратите внимание, что приведенный ниже код по-прежнему требует, чтобы все данные, сгенерированные многопроцессорной обработкой, обрабатывались памятью напрямую (поэтому переполнение более чем вероятно для больших наборов данных).

 import pandas as pd
import numpy as np
import geopandas as gpd
from shapely.geometry import Polygon
from multiprocessing import Pool
import os
from functools import partial


def info(title):
    print(title)
    print('module name:', __name__)
    print('parent process:', os.getppid())
    print('process id:', os.getpid())


def parallelize_df(gdf, func, n_cores, dx=100, dy=100, verbose=False):

    Geometries= gdf.loc[:, 'geometry'].values

    pool = Pool(processes=n_cores)
    func_partial=partial(func, dx, dy, verbose) # prod_x has only one argument x (y is fixed to 10) 

    results = pool.map(func_partial, Geometries)

    pool.close()
    pool.join()

    print(np.shape(results))

    GRID = gpd.GeoSeries(np.array(results).ravel())

    print("GRID well created") 

    return GRID

def generate_grid_from_Poligon(dx=100, dy=100, verbose=False, polygon=None):
    if verbose == True:
        info('function parallelize_df')
    else:
        None

    xmin,ymin,xmax,ymax = polygon.bounds

    lenght = dx
    wide = dy

    cols = list(np.arange(int(np.floor(xmin)), int(np.ceil(xmax)), wide))
    rows = list(np.arange(int(np.floor(ymin)), int(np.ceil(ymax)), lenght))
    rows.reverse()

    subpolygons = []
    for x in cols:
        for y in rows:
            subpolygons.append( Polygon([(x,y), (x wide, y), (x wide, y-lenght), (x, y-lenght)]) )


    return subpolygons


def main(GDF, n_cores='standard', dx=100, dy=100, verbose= False):
    """
    GDF: geodataframe
    n_cores: use standard or a positive numerical (int) value. It will set the number of cores to use in the multiprocessing

    args: (dx: dimension in the x coordinate to make the grid
            dy: dimenion in the y coordinate to make the grid)

    """

    if isinstance(n_cores, str):
        import multiprocessing
        N_cores = multiprocessing.cpu_count() -1

    elif isinstance(n_cores, int):

        N_cores =n_cores


    GRID_GDF = parallelize_df(GDF, generate_grid_from_Poligon, n_cores=N_cores, dx=dx, dy=dy, verbose=verbose)

    return GRID_GDF
  

Я благодарю вас за ваше время,

Искренне ваш,

Филипп Лил

Ответ №1:

Я, наконец, нашел решение для своего вопроса. Это не идеально, поскольку для этого требуется несколько процессов записи и один окончательный процесс конкатенации для всех временных файлов, созданных во время выполнения.

Не стесняйтесь предлагать альтернативы.

Вот решение, которое я нашел.

 import numpy as np
import geopandas as gpd
import pandas as pd
from shapely.geometry import Polygon
from multiprocessing import Pool, Lock, freeze_support
import os
from functools import partial
import time

def info(time_value):

    print('module name:', __name__)
    print('parent process:', os.getppid())
    print('process id:', os.getpid())
    print("Time spent: ", time.time() - time_value)

def init(l):

    global lock

    lock=l

def Data_Arranger(to_filename):

    """This function concatenates and deletes temporary files. It is an arranger 
        of the multicessing data results"
    """

    Base = os.path.join(os.path.dirname(to_filename), 'temp')


    Strings = [file for file in os.listdir(Base)]

    Strings = [os.path.join(Base, S) for S in Strings]

    if not os.path.exists(os.path.dirname(to_filename)):
        os.mkdir(os.path.dirname(to_filename))

    Sq = [S for S in Strings if S.endswith('.shp')]

    gpd.GeoDataFrame(pd.concat([gpd.read_file(sq1) for sq1 in Sq]), crs=GDF.crs).to_file(to_filename)

    for sq1 in Sq:
        os.remove(sq1) 

    import shutil

    shutil.rmtree(Base, ignore_errors=True) 




def parallelize_df(gdf, func, n_cores, dx=100, dy=100, verbose=False, to_filename=None):



    Geometries= gdf.loc[:, 'geometry'].values
    crs = gdf.crs

    pool = Pool(processes=n_cores, initializer=init, initargs=(Lock(), ) )

    func_partial=partial(func, dx, dy, verbose, to_filename, crs) # prod_x has only one argument x (y is fixed to 10) 


    pool.map(func_partial, Geometries)

    pool.close()
    pool.join()


def generate_grid_from_gdf(dx=100, dy=100, verbose=False, to_filename=None, crs=None, polygon=None):
    if verbose == True:
        info(time.time())
    else:
        None

    xmin,ymin,xmax,ymax = polygon.bounds

    lenght = dx
    wide = dy

    cols = list(np.arange(int(np.floor(xmin)), int(np.ceil(xmax)), wide))
    rows = list(np.arange(int(np.floor(ymin)), int(np.ceil(ymax)), lenght))
    rows.reverse()

    subpolygons = []
    for x in cols:
        for y in rows:
            subpolygons.append( Polygon([(x,y), (x wide, y), (x wide, y-lenght), (x, y-lenght)]) )



    lock.acquire()

    print('parent process: ', os.getppid(), ' has activated the Lock')
    GDF = gpd.GeoDataFrame(geometry=subpolygons, crs=crs)


    to_filename = os.path.join(os.path.dirname(to_filename), 'temp',  str(os.getpid())   '_'   str(time.time())   '.'   os.path.basename(to_filename).split('.')[-1])

    if not os.path.exists(os.path.dirname(to_filename)):
        os.mkdir(os.path.dirname(to_filename))

    try:
        print("to_filename: ", to_filename)
        GDF.to_file(to_filename)
    except:
        print("error in the file saving")
    lock.release()

    print('parent process: ', os.getppid(), ' has unlocked')




def main(GDF, n_cores='standard', dx=100, dy=100, verbose= False, to_filename=None):
    """
    GDF: geodataframe
    n_cores: use standard or a positive numerical (int) value. It will set the number of cores to use in the multiprocessing

    dx: dimension in the x coordinate to make the grid
    dy: dimenion in the y coordinate to make the grid)
    verbose: whether or not to show info from the processing. Appliable only if applying the function not
            in Windows (LINUX, UBUNTU, etc.), or when running in separte console in Windows.

    to_filename: the path which will be used to save the resultant file.
    """

    if isinstance(n_cores, str):
        import multiprocessing
        N_cores = multiprocessing.cpu_count() -1

    elif isinstance(n_cores, int):

        N_cores =n_cores



    parallelize_df(GDF, generate_grid_from_gdf, n_cores=N_cores, dx=dx, dy=dy, verbose=verbose, to_filename=to_filename)
    Data_Arranger(to_filename)

    ####################################################################################

if "__main__" == __name__:
    freeze_support()
    GDF = gpd.read_file("Someone's_file.shp")


    to_filename = "To_file_directory/To_file_name.shp"

    dx = 500 # resampling to 500 units. Ex: assuming the coordinate reference system is in meters, this function will return polygons of the given geometries in 500m for the longitudinal dimension.

    dy = 500 # same here. Assuming CRS is in meters units, the resultant file will be have polygons of 500m in latitudinal dimension

    main(GDF, dx=dx, dy=dy, verbose=True, to_filename=to_filename)
  

Я благодарю вас за ваше время.

Филипп Лил