Эффективный способ уменьшить результаты MapReduce?

#python #optimization #hadoop #mapreduce #hadoop-streaming

#python #оптимизация #hadoop #mapreduce #потоковая передача hadoop

Вопрос:

Я написал задание MapReduce, которое учитывало количество ngram в наборе данных. Результаты представлены в ста файлах размером 300 МБ в формате <ngram>t<count> . Я хочу объединить их в один результат, но мои несколько попыток объединить потерпели крах («отслеживание задач исчезло»). У меня был тайм-аут в 8 часов, и этот сбой произошел около 8,5 часов, так что это может быть связано. У меня было # reducers = 5 (то же, что и # узлов). Может быть, мне просто нужно оставить больше времени, хотя ошибка, похоже, не указывает на это. Я подозреваю, что мои узлы перегружаются и перестают отвечать на запросы. Моя теория заключается в том, что мой редуктор мог бы использовать некоторую оптимизацию.

Я использую cat для своего картографа и следующий скрипт Python для моего редуктора:

 #!/usr/bin/env python
import sys

counts = {}
for line in sys.stdin:
    line = line.strip()
    key, count = line.split('t', 1)

    try:
        count = int(count)
    except ValueError:
        continue

    if key not in counts:
        counts[key] = 0
    counts[key]  = count

for key in sorted(counts.keys()):
    print '%st%s'% (key, counts[key])
  

Обновить:
Как я намекал в одном из своих комментариев, я не понимаю, какая сортировка выполняется Hadoop автоматически. В веб-интерфейсе статус редуктора показывает несколько разных фаз, которые включают «сортировку» и «сокращение». Исходя из этого, я предполагаю, что Hadoop сортирует выходные данные mapper перед отправкой в reduce, но что неясно, выполняется ли сортировка по всем данным, отправленным в reducer, или по каждому файлу перед его уменьшением. Другими словами, мой картограф берет 100 полей, разбивает их на 400 выходных данных, каждый из которых просто cat передает их в редуктор, затем редукторы (всего 5) получают эти 80 потоков. Сортирует ли сортировка все 80 или сортирует 1, уменьшает его; и т.д.? На основе графиков, которые явно могут не указывать на фактическое поведение, процесс сортировки выполняется перед любым уменьшением. Если сортировка сортирует все входные файлы, то я могу упростить свой редуктор, чтобы не хранить словарь всех значений, а просто распечатать пару ключ-общее количество после изменения ключа.

Что касается использования объединителя, я не думаю, что это было бы полезно в моем случае, поскольку данные, которые я сокращаю, уже были уменьшены в 100 файлах, которые я пытаюсь объединить. Поскольку мои # nodes = # reducers (5 amp; 5), нечего объединять, чего редуктор еще не делает.

Комментарии:

1. Какие ошибки вы получаете при попытке запустить reduce для файлов размером 10 300 МБ и для файлов размером 10 1 МБ? Другими словами, пробовали ли вы отлаживать программу на меньшем наборе тестовых данных и как время выполнения масштабируется в зависимости от размера данных?

2. Покопавшись в журналах, я обнаружил «Ошибку памяти» с установленным количеством [key]. Должен ли я попробовать увеличить количество редукторов? Теперь, когда я думаю об этом, мне интересно, имеет ли сортировка, которая выполняется перед запуском reducer, все 20 файлов, отсортированных вместе. Если это так, я могу просто распечатать количество после изменения ключа

3. @Dolan — вы могли бы использовать объединитель , чтобы уменьшить нагрузку на редуктор. Combiner поддерживает как Java, так и язык сценариев.

4. @Praveen — Можете ли вы подробнее рассказать о роли, которую будет играть объединитель? Я изучил это, но я не вижу разницы между этим редуктором. Другими словами, как только Mapper выведет свои 400 блоков, как объединитель и редуктор получат их?

5. @Dolan — Проверьте это руководство ( goo.gl/ImvLP ). Вывод карты отправляется в объединитель, а затем в редуктор. Картограф и объединитель выполняются на одном узле. Как правило, объединитель и редуктор являются одним и тем же классом. Использование объединителя уменьшит количество ключей, отправленных в редуктор из картографа.

Ответ №1:

Проблема заключалась в моем непонимании того, как работает MapReduce. Все данные, поступающие в Reducer, сортируются. Мой приведенный выше код был полностью неоптимизирован. Вместо этого я просто отслеживаю текущий ключ, а затем распечатываю предыдущий текущий, когда появляется новый ключ.

 #!/usr/bin/env python
import sys

cur_key = None
cur_key_count = 0
for line in sys.stdin:
    line = line.strip()
    key, count = line.split('t', 1)

    try:
        count = int(count)
    except ValueError:
        continue

    # if new key, reset count, note current key, and output lastk key's result
    if key != cur_key:
        if cur_key is not None:
            print '%st%s'% (cur_key, cur_key_count)
        cur_key = key
        cur_key_count = 0
    cur_key_count  = count
# printing out final key if set
if cur_key:
    print '%st%s'% (cur_key, cur_key_count)
  

Ответ №2:

Используйте top , чтобы проверить, что ваш редуктор привязан к процессору, а не к IO (возможно, вызывая замену) во время его запуска.

8 часов / 20 заданий на хост — это 24 минуты на задание 300 МБ

Возможно, вы могли бы использовать heapq такой, чтобы структура данных, встроенная в память, сохранялась отсортированной: см. Раздел 8.4.1 из:http://docs.python.org/library/heapq.html