Как инкапсулировать импортированный модуль в метод для многопоточности в python?

#python #python-3.x #multithreading #thread-safety #python-multithreading

#python #python-3.x #многопоточность #безопасность потоков #python-многопоточность

Вопрос:

Я новичок в python, и у меня есть параллельная проблема при использовании внутренних функций импорта библиотек. Проблема в том, что мой код вычисляет различные типы переменных, и в последнем процессе они сохраняются в разные файлы. Но у меня такая же проблема при чтении и записи.

Это пример кода, который работает, потому что является линейным:

 import xarray as xr

def read_concurrent_files(self):

    files_var_type1 = get_files('type1','20200101','20200127')
    files_var_type2 = get_files('type2','20200101','20200127')
    files_var_type3 = get_files('type3','20200101','20200127')

def get_files(self, varType, dateini, datefin):

    # This methods return an array of file paths
    files = self.get_file_list(varType, dateini, datefin)
    files_raw = xr.open_mfdataset(files , engine='cfgrib', 
        combine='nested', concat_dim ='time', decode_coords = False, parallel = True)      
    return files_raw
 

Но когда я вношу эти изменения в код, чтобы он был параллельным, он завершается неудачей:

 import xarray as xr
from multiprocessing.pool import ThreadPool

def read_concurrent_files(self):

    pool = ThreadPool(processes=3)

    async_result1 = pool.apply_async(self.get_files, ('type1','20200101','20200127',))
    async_result2 = pool.apply_async(self.get_files, ('type2','20200101','20200127',))
    async_result3 = pool.apply_async(self.get_files, ('type3','20200101','20200127',))

    files_var_type1 = async_result1.get()
    files_var_type2 = async_result2.get()
    files_var_type3 = async_result3.get()

def get_files(self, varType, dateini, datefin):

    # This methods return an array of file paths
    files = self.get_file_list(varType, dateini, datefin)
    files_raw = xr.open_mfdataset(files , engine='cfgrib', 
        combine='nested', concat_dim ='time', decode_coords = False, parallel = True)      
    return files_raw
 

Проблема заключается в вызове xr.open_mfdataset, который не является потокобезопасным (или я так думаю).

Есть ли способ инкапсулировать библиотеку импорта только в область метода?

Я пришел с других языков, и было легко создать экземпляр в методе или использовать потокобезопасные объекты.

Заранее большое спасибо!!

Ответ №1:

Поскольку я новичок в python, я не знал о различных типах потоков, которые мы можем создавать, поэтому в моем примере выше я использовал ThreadPool, который может быть заблокирован GIL (глобальная блокировка интерпретатора), поэтому, чтобы избежать этого, мы можем использовать другой тип потоков, здесьпример:

 import os
import concurrent.futures

def get_xarray(self):
    tasks = []
    cpu_count = os.cpu_count()
    with concurrent.futures.ProcessPoolExecutor(max_workers = cpu_count) as executor:
        for i in range(0, len(self.files)):
            tasks.append(executor.submit(self.get_xarray_by_file, self.files[i]))

    results = []
    for result in tasks:
        results.append(result.result())
    era_raw = xr.merge(results, compat='override')

    return era_raw.persist().load()

def get_xarray_by_file(self, files):
    era_raw = xr.open_mfdataset(files , engine='cfgrib', 
        combine='nested', concat_dim ='time', decode_coords = False, parallel = True)
    return era_raw.persist().load()
 

В этом случае мы используем ProcessPoolExecutor:

Класс ProcessPoolExecutor — это подкласс Executor, который использует пул процессов для асинхронного выполнения вызовов. ProcessPoolExecutor использует модуль многопроцессорной обработки, что позволяет ему обойти глобальную блокировку интерпретатора, но также означает, что могут выполняться и возвращаться только выбираемые объекты.

Теперь мы можем параллельно читать файлы grib2 или создавать файлы nc или csv из фрейма данных в реальном параллельном режиме.