#dask #dask-distributed
#dask #dask-распределенный
Вопрос:
Я запускаю конвейер для нескольких изображений. Конвейер состоит из чтения изображений из файловой системы, выполнения такой обработки для каждого из них, а затем сохранения изображений в файловой системе. Однако рабочий dask терпит неудачу из-за ошибки памяти. Есть ли способ гарантировать, что рабочие dask не загружают слишком много изображений в память? т. Е. Подождите, пока на рабочем не останется достаточно места, прежде чем запускать конвейер обработки нового изображения.
У меня есть один планировщик и 40 рабочих с 4 ядрами, 15 ГБ оперативной памяти и работает Centos7. Я пытаюсь обработать 125 изображений в пакете; каждое изображение довольно большое, но достаточно маленькое, чтобы поместиться на рабочем устройстве; для всего процесса требуется около 3 ГБ.
Я попытался обработать меньшее количество изображений, и это отлично работает.
ОТРЕДАКТИРОВАНО
from dask.distributed import Client, LocalCluster
# LocalCluster is used to show the config of the workers on the actual cluster
client = Client(LocalCluster(n_workers=2, resources={'process': 1}))
paths = ['list', 'of', 'paths']
# Read the file data from each path
data = client.map(read, path, resources={'process': 1)
# Apply foo to the data n times
for _ in range(n):
data = client.map(foo, x, resources={'process': 1)
# Save the processed data
data.map(save, x, resources={'process': 1)
# Retrieve results
client.gather(data)
Я ожидал, что изображения будут обрабатываться, поскольку на рабочих было доступно место, но похоже, что все изображения загружаются одновременно на разных рабочих.
РЕДАКТИРОВАТЬ: мои проблемы в том, что все задачи назначаются работникам, и им не хватает памяти. Я нашел, как ограничить количество задач, которые обрабатываются рабочим в один момент [https://distributed.readthedocs.io/en/latest/resources.html#resources-are-applied-separately-to-each-worker-process]( смотрите здесь). Однако с этим ограничением, когда я выполняю свою задачу, все они завершают шаг чтения, затем шаг обработки и, наконец, шаг сохранения. Это проблема, поскольку изображение передается на диск.
Есть ли способ завершить каждую задачу перед запуском новой? например, на Worker-1: чтение (img1)-> обработка (img1) -> сохранение (img1) -> чтение (img2) ->…
Ответ №1:
Dask обычно не знает, сколько памяти потребуется задаче, он может знать только размер выходных данных, и это только после их завершения. Это потому, что Dask просто выполняет функцию pthon, а затем ожидает ее завершения; но все osrt вещей может произойти в функции python. Обычно вы должны ожидать, что начнется столько задач, сколько у вас есть доступных рабочих ядер — сколько вы найдете.
Если вы хотите уменьшить общую нагрузку на память, то ваше решение должно быть простым: иметь достаточно небольшое количество рабочих, чтобы, если все они использовали максимальную память, которую вы можете ожидать, у вас все еще оставался некоторый запас в системе, чтобы справиться.
Для РЕДАКТИРОВАНИЯ: вы можете попробовать запустить optimize на графике перед отправкой (хотя, я думаю, это должно произойти в любом случае), поскольку похоже, что ваши линейные цепочки задач должны быть «слиты». http://docs.dask.org/en/latest/optimize.html
Комментарии:
1. Мне удалось установить ограничение ресурсов для каждого работника. Однако у меня есть некоторые проблемы с планированием задач (см. Редактирование в вопросе).
2. Я попробовал предложенный вами метод. Однако я не знал, как получить dask_graph с помощью клиентского API, если это возможно, поэтому я вернулся к использованию bag API. Это вернуло проблему ограничения ресурсов для рабочих. У вас есть идея о том, как ограничить рабочий ресурс при использовании dask.bag?
3. обязательно используйте клиент для вычисления и ограничьте количество рабочих.
4. Мне удалось сделать это с помощью fuse и установив ограничение ресурсов для каждого работника. Спасибо