Как собирать данные из нескольких потоков в python?

#python #multithreading

#python #многопоточность

Вопрос:

Я хочу использовать несколько потоков в Python для вычисления значений пикселей для изображения, которое будет создано в конце, хотя у меня возникают проблемы с выяснением, как получить результат потока обратно и собрать его. Вот настройка:

Создается Queue.Queue() объект, а также threading.Thread() дочерний класс:

 q = Queue.Queue()
class myThread(threading.Thread):
  def __init__(self, queue):
    self.queue = queue
    threading.Thread.__init__(self)
  def run(self):
    while True: # loop forever
      task = self.queue.get()
      rs = self.do_work(task) # I've got the resu< now what to do with it?
      self.queue.task_done()
  

Идея в том, что я хочу собрать пиксельные данные для изображения размером 500×500, которое изначально представляет собой список из 250 000 (500×500) элементов, которые в конечном итоге будут преобразованы в изображение с помощью PIL:

 pixels = array.array('B', pixels).tostring()
im = Image.fromstring('L', size, pixels)
im.show()
  

Итак, я заполняю очередь задачами для каждого пикселя и создаю пул потоков:

 for i in range(5):
  t = myThread(q)
  t.setDaemon(True)
  t.start()
for y in range(500):
  for x in range(500):
    q.put({'x':x, 'y':y})
q.join()
  

Итак, как собрать все данные? Я думаю, было бы плохой идеей передавать список из 250 000 элементов в каждый поток, как из-за размера передаваемого массива данных, так и из-за того, что в каждом потоке тогда отсутствовали бы данные из других потоков.

РЕДАКТИРОВАТЬ: Для тех из вас, кто задается вопросом, стоит ли вообще делать это многопоточным способом, работа, которая выполняется для вычисления координат изображения, — это несколько функций шума perlin. Он генерирует массив точек perlin 2D noise (сетка 5×5) плюс несколько октав (сетки 10×10, 20×20 и 40×40) и вычисляет значения пикселей между этими точками. Таким образом, для каждого пикселя в конечном изображении необходимо выполнить три математические операции на октаву (усреднение точек X вокруг заданной точки, усреднение точек Y вокруг заданной точки и усреднение этих средних значений), а затем выполнить средневзвешенное значение между результатами октавы.

На моем 8-ядерном Mac я вижу, что процесс Python использует 1 поток и 100% процессора при запуске. Хотя я знаю, что у меня 8 ядер, и я видел, что процессы показывают использование процессора на 400-600%, чтобы показать, что они используют преимущества других ядер, и я просто надеялся, что этот скрипт на Python сможет сделать то же самое.

Комментарии:

1. В чистом Python нет смысла выполнять интенсивную работу с процессором в разных потоках из-за GIL.

2. Используете ли вы IronPython? Если нет, то очень мало смысла в разделении вычислительно-ресурсоемких задач по потокам; как упоминал Дж.Ф. Себастьян, стандартный интерпретатор Python фактически не позволит вам запускать эти потоки одновременно.

3. Я отредактировал свой вопрос, чтобы указать, что я на самом деле делаю для каждого пикселя; является ли это задачей с высокой нагрузкой на процессор? Имеет ли какое-либо значение, является ли машина, на которой он запущен, многоядерной, а не просто гиперпоточной?

Ответ №1:

В Python есть глобальная блокировка для изменения структур данных на уровне python, называемая GIL. Это затрудняет эффективное выполнение того, что вы хотите делать с потоками.

Но не отчаивайтесь! Разработчики kind предоставили нам модуль многопроцессорной обработки. Замените многопоточность на многопроцессорность (чтобы использовать многопроцессорность.Процесс и многопроцессорность.Вместо этого очередь) и вуаля, ваше приложение является многопроцессорным приложением.

Что касается вашего вопроса, вы хотите иметь другую очередь, идущую в другом направлении.

Комментарии:

1. Спасибо! Я взгляну на multiprocessing модуль; пропустил его в моем туре по документации.

Ответ №2:

Я думаю, вам следует использовать две очереди.

Один для заданий / tasks, один для выходных данных.

Как только задача выполнена, поместите результат в очередь вывода.

Ответ №3:

У меня был бы глобальный список, к которому мог бы получить доступ каждый поток. У меня действительно была подобная ситуация, и я сделал это таким образом без проблем.