Снова выполнены неудачные потоки

#python #multithreading #python-2.7

#python #многопоточность #python-2.7

Вопрос:

Итак, у меня есть скрипт, который использует около 50 тыс. потоков, но одновременно запускает только 10. Для этого я использую библиотеку потоков и BoundedSemaphore, чтобы ограничить количество потоков до 10 одновременно. В некоторых случаях недостаточно памяти для всех потоков, но важно, чтобы все потоки обрабатывались, поэтому я хотел бы повторить те потоки, которые были убиты из-за нехватки памяти.

 import some_other_script, threading


class myThread (threading.Thread):
    def __init__(self, item):
        threading.Thread.__init__(self)
        self.item = item
    def run(self):
        threadLimiter.acquire()
        some_other_script.method(self.item)
        somelist.remove(self.item)
        threadLimiter.release()


threadLimiter = threading.BoundedSemaphore(10)

somelist = ['50,000 Items','.....]
for item in somelist:
    myThread(item).start()
  

Как вы можете видеть, единственная идея, которую я мог придумать до сих пор, заключалась в том, чтобы удалить элемент, который был обработан, из списка в каждом потоке somelist.remove(self.item) . (Каждый элемент уникален и присутствует в списке только один раз).
Моя идея заключалась в том, что я мог бы запустить цикл while вокруг цикла for, чтобы проверить, содержит ли он все еще элементы, что не сработало, потому что после завершения цикла for потоки не завершены, поэтому список не пуст.
Что я хочу сделать, так это перехватить те, которые терпят неудачу, потому что в системах заканчивается память, и выполнить их снова (и снова, если потребуется).

Заранее большое вам спасибо!

Комментарии:

1. Так что не так с вашей идеей с while циклом? Вы спрашиваете, как это реализовать? Альтернативы? Проверка кода?

2. Почему бы не создавать потоки динамически (только тогда, когда они необходимы)?. Этот подход — безумие!

Ответ №1:

Это решает как проблему слишком большого количества активных потоков, так и проблему в вашем вопросе:

     def get_items():
          threads = threading.enumerate()
          items = set()
          for thr in threads:
              if isinstance(thr, myThread): items.add(thr.item)
          return items
    def manageThreads(howmany):
         while bigset:
             items = get_items()
             items_to_add = bigset.difference(items)
             while len(items) < howmany:
                 item = items_to_add.pop()
                 processor = myThread(item)
                 processor.start()
             with thread_done:    
                 thread_done.wait()
   thread_done = threading.Condition()
   bigset = set(["50,000 items", "..."])
   manageThreads(10)
  

Метод запуска класса mythread:

 def run(self):
    try:
        some_other_script.method(self.item)
        bigset.remove(self.item)
    finally:
        with thread_done:
            thread_done.notify()
  

Threading.enumerate() возвращает список активных в данный момент объектов потока. Итак, функция manageThreads сначала создает 10 потоков, затем ожидает завершения одного, затем снова проверяет количество потоков и так далее. Если потоку не хватает памяти или во время обработки возникает другая ошибка, он не удалит элемент из bigset , в результате чего менеджер перенаправит его в другой поток.

Комментарии:

1. Привет, спасибо за ваш ответ! На данный момент я не вижу, как я передаю потоку только один item из моих biglist потоков за раз?

2. Хороший момент. Я забыл добавить код для получения элемента. Все элементы biglist уникальны? Имеет ли значение порядок их обработки?

3. да, каждый элемент является uniq в форме aaa, aab, aac, aba, aca, bba, bca, … и т.д., и нет, порядок не имеет значения