#python #optimization #hadoop #mapreduce #hadoop-streaming
#python #оптимизация #hadoop #mapreduce #потоковая передача hadoop
Вопрос:
Я написал задание MapReduce, которое учитывало количество ngram в наборе данных. Результаты представлены в ста файлах размером 300 МБ в формате <ngram>t<count>
. Я хочу объединить их в один результат, но мои несколько попыток объединить потерпели крах («отслеживание задач исчезло»). У меня был тайм-аут в 8 часов, и этот сбой произошел около 8,5 часов, так что это может быть связано. У меня было # reducers = 5 (то же, что и # узлов). Может быть, мне просто нужно оставить больше времени, хотя ошибка, похоже, не указывает на это. Я подозреваю, что мои узлы перегружаются и перестают отвечать на запросы. Моя теория заключается в том, что мой редуктор мог бы использовать некоторую оптимизацию.
Я использую cat
для своего картографа и следующий скрипт Python для моего редуктора:
#!/usr/bin/env python
import sys
counts = {}
for line in sys.stdin:
line = line.strip()
key, count = line.split('t', 1)
try:
count = int(count)
except ValueError:
continue
if key not in counts:
counts[key] = 0
counts[key] = count
for key in sorted(counts.keys()):
print '%st%s'% (key, counts[key])
Обновить:
Как я намекал в одном из своих комментариев, я не понимаю, какая сортировка выполняется Hadoop автоматически. В веб-интерфейсе статус редуктора показывает несколько разных фаз, которые включают «сортировку» и «сокращение». Исходя из этого, я предполагаю, что Hadoop сортирует выходные данные mapper перед отправкой в reduce, но что неясно, выполняется ли сортировка по всем данным, отправленным в reducer, или по каждому файлу перед его уменьшением. Другими словами, мой картограф берет 100 полей, разбивает их на 400 выходных данных, каждый из которых просто cat
передает их в редуктор, затем редукторы (всего 5) получают эти 80 потоков. Сортирует ли сортировка все 80 или сортирует 1, уменьшает его; и т.д.? На основе графиков, которые явно могут не указывать на фактическое поведение, процесс сортировки выполняется перед любым уменьшением. Если сортировка сортирует все входные файлы, то я могу упростить свой редуктор, чтобы не хранить словарь всех значений, а просто распечатать пару ключ-общее количество после изменения ключа.
Что касается использования объединителя, я не думаю, что это было бы полезно в моем случае, поскольку данные, которые я сокращаю, уже были уменьшены в 100 файлах, которые я пытаюсь объединить. Поскольку мои # nodes = # reducers (5 amp; 5), нечего объединять, чего редуктор еще не делает.
Комментарии:
1. Какие ошибки вы получаете при попытке запустить reduce для файлов размером 10 300 МБ и для файлов размером 10 1 МБ? Другими словами, пробовали ли вы отлаживать программу на меньшем наборе тестовых данных и как время выполнения масштабируется в зависимости от размера данных?
2. Покопавшись в журналах, я обнаружил «Ошибку памяти» с установленным количеством [key]. Должен ли я попробовать увеличить количество редукторов? Теперь, когда я думаю об этом, мне интересно, имеет ли сортировка, которая выполняется перед запуском reducer, все 20 файлов, отсортированных вместе. Если это так, я могу просто распечатать количество после изменения ключа
3. @Dolan — вы могли бы использовать объединитель , чтобы уменьшить нагрузку на редуктор. Combiner поддерживает как Java, так и язык сценариев.
4. @Praveen — Можете ли вы подробнее рассказать о роли, которую будет играть объединитель? Я изучил это, но я не вижу разницы между этим редуктором. Другими словами, как только Mapper выведет свои 400 блоков, как объединитель и редуктор получат их?
5. @Dolan — Проверьте это руководство ( goo.gl/ImvLP ). Вывод карты отправляется в объединитель, а затем в редуктор. Картограф и объединитель выполняются на одном узле. Как правило, объединитель и редуктор являются одним и тем же классом. Использование объединителя уменьшит количество ключей, отправленных в редуктор из картографа.
Ответ №1:
Проблема заключалась в моем непонимании того, как работает MapReduce. Все данные, поступающие в Reducer, сортируются. Мой приведенный выше код был полностью неоптимизирован. Вместо этого я просто отслеживаю текущий ключ, а затем распечатываю предыдущий текущий, когда появляется новый ключ.
#!/usr/bin/env python
import sys
cur_key = None
cur_key_count = 0
for line in sys.stdin:
line = line.strip()
key, count = line.split('t', 1)
try:
count = int(count)
except ValueError:
continue
# if new key, reset count, note current key, and output lastk key's result
if key != cur_key:
if cur_key is not None:
print '%st%s'% (cur_key, cur_key_count)
cur_key = key
cur_key_count = 0
cur_key_count = count
# printing out final key if set
if cur_key:
print '%st%s'% (cur_key, cur_key_count)
Ответ №2:
Используйте top
, чтобы проверить, что ваш редуктор привязан к процессору, а не к IO (возможно, вызывая замену) во время его запуска.
8 часов / 20 заданий на хост — это 24 минуты на задание 300 МБ
Возможно, вы могли бы использовать heapq
такой, чтобы структура данных, встроенная в память, сохранялась отсортированной: см. Раздел 8.4.1 из:http://docs.python.org/library/heapq.html