#python #python-3.x #multithreading #thread-safety #python-multithreading
#python #python-3.x #многопоточность #безопасность потоков #python-многопоточность
Вопрос:
Я новичок в python, и у меня есть параллельная проблема при использовании внутренних функций импорта библиотек. Проблема в том, что мой код вычисляет различные типы переменных, и в последнем процессе они сохраняются в разные файлы. Но у меня такая же проблема при чтении и записи.
Это пример кода, который работает, потому что является линейным:
import xarray as xr
def read_concurrent_files(self):
files_var_type1 = get_files('type1','20200101','20200127')
files_var_type2 = get_files('type2','20200101','20200127')
files_var_type3 = get_files('type3','20200101','20200127')
def get_files(self, varType, dateini, datefin):
# This methods return an array of file paths
files = self.get_file_list(varType, dateini, datefin)
files_raw = xr.open_mfdataset(files , engine='cfgrib',
combine='nested', concat_dim ='time', decode_coords = False, parallel = True)
return files_raw
Но когда я вношу эти изменения в код, чтобы он был параллельным, он завершается неудачей:
import xarray as xr
from multiprocessing.pool import ThreadPool
def read_concurrent_files(self):
pool = ThreadPool(processes=3)
async_result1 = pool.apply_async(self.get_files, ('type1','20200101','20200127',))
async_result2 = pool.apply_async(self.get_files, ('type2','20200101','20200127',))
async_result3 = pool.apply_async(self.get_files, ('type3','20200101','20200127',))
files_var_type1 = async_result1.get()
files_var_type2 = async_result2.get()
files_var_type3 = async_result3.get()
def get_files(self, varType, dateini, datefin):
# This methods return an array of file paths
files = self.get_file_list(varType, dateini, datefin)
files_raw = xr.open_mfdataset(files , engine='cfgrib',
combine='nested', concat_dim ='time', decode_coords = False, parallel = True)
return files_raw
Проблема заключается в вызове xr.open_mfdataset, который не является потокобезопасным (или я так думаю).
Есть ли способ инкапсулировать библиотеку импорта только в область метода?
Я пришел с других языков, и было легко создать экземпляр в методе или использовать потокобезопасные объекты.
Заранее большое спасибо!!
Ответ №1:
Поскольку я новичок в python, я не знал о различных типах потоков, которые мы можем создавать, поэтому в моем примере выше я использовал ThreadPool, который может быть заблокирован GIL (глобальная блокировка интерпретатора), поэтому, чтобы избежать этого, мы можем использовать другой тип потоков, здесьпример:
import os
import concurrent.futures
def get_xarray(self):
tasks = []
cpu_count = os.cpu_count()
with concurrent.futures.ProcessPoolExecutor(max_workers = cpu_count) as executor:
for i in range(0, len(self.files)):
tasks.append(executor.submit(self.get_xarray_by_file, self.files[i]))
results = []
for result in tasks:
results.append(result.result())
era_raw = xr.merge(results, compat='override')
return era_raw.persist().load()
def get_xarray_by_file(self, files):
era_raw = xr.open_mfdataset(files , engine='cfgrib',
combine='nested', concat_dim ='time', decode_coords = False, parallel = True)
return era_raw.persist().load()
В этом случае мы используем ProcessPoolExecutor:
Класс ProcessPoolExecutor — это подкласс Executor, который использует пул процессов для асинхронного выполнения вызовов. ProcessPoolExecutor использует модуль многопроцессорной обработки, что позволяет ему обойти глобальную блокировку интерпретатора, но также означает, что могут выполняться и возвращаться только выбираемые объекты.
Теперь мы можем параллельно читать файлы grib2 или создавать файлы nc или csv из фрейма данных в реальном параллельном режиме.