Агрегация на стороне карты Spark: только для каждого раздела?

#apache-spark

#apache-spark

Вопрос:

Я читал о сокращении / агрегации на стороне карты, и есть одна вещь, которую я, похоже, не могу четко понять. Это происходит только для каждого раздела или оно шире по объему? Я имею в виду, уменьшается ли это также для разных разделов, если один и тот же ключ появляется в нескольких разделах, обрабатываемых одним и тем же исполнителем?

Теперь у меня есть еще несколько вопросов в зависимости от того, является ли ответ «только для каждого раздела» или нет.

Предполагая, что это для каждого раздела:

  • Каковы хорошие способы справиться с ситуацией, когда я знаю, что мой набор данных хорошо поддается дальнейшему сокращению между локальными разделами перед перетасовкой. Например. Я обрабатываю 10 разделов на исполнителя, и я знаю, что все они содержат много перекрывающихся ключей, поэтому потенциально его можно сократить до 1/10. По сути, я ищу локальное сокращение () (как и многие другие). На ум приходит Coalesce (), какие-либо общие методы решения этой проблемы?

Предполагая, что это уменьшает количество разделов:

  • Происходит ли это для каждого исполнителя? Как насчет исполнителей, назначенных одному и тому же рабочему узлу, имеют ли они возможность уменьшать количество разделов друг друга, признавая, что они расположены совместно?
  • Происходит ли это для каждого ядра (потока) в исполнителе? Причина, по которой я спрашиваю об этом, заключается в том, что некоторые из рассмотренных мной диаграмм, похоже, показывают сопоставитель для каждого ядра / потока исполнителя, похоже, что результаты всех задач, выходящих из этого ядра, передаются в один экземпляр сопоставителя. (что записывает в случайном порядке, если я не ошибаюсь)
  • Является ли оно детерминированным? Например. если у меня есть запись, скажем, A = 1 из 10 разделов, обрабатываемых одним и тем же исполнителем, могу ли я ожидать увидеть A = 10 для задачи, считывающей выходные данные в случайном порядке? Или это наилучший вариант, например, он все еще уменьшается, Но есть некоторые ограничения (размер буфера и т. Д.), Поэтому при чтении в случайном порядке могут встречаться значения A = 4 и A = 6.

Ответ №1:

Агрегирование на стороне карты аналогично combiner подходу Hadoop. Локальное уменьшение также имеет смысл для Spark и означает меньшую перетасовку. Так что это работает для каждого раздела — как вы заявляете.

При применении функций сокращения, например, groupBy amp; sum, сначала происходит перетасовка, так что ключи находятся в одном разделе, так что может произойти вышеупомянутое (с фреймами данных автоматически). Но, скажем, простое количество также уменьшится локально, а затем общее количество будет вычислено путем передачи промежуточных результатов обратно в драйвер.

Таким образом, результаты объединяются в драйвере от исполнителей — в зависимости от того, что на самом деле запрашивается, например, сбор, печать подсчета. Но если запись выполняется после агрегирования какого-либо характера, то сокращение ограничивается исполнителем на рабочем.

Комментарии:

1. Спасибо, я немного подождал, чтобы узнать, будет ли больше ответов. К сожалению, если я ничего не упускаю, и если фраза «это работает для каждого раздела» не означает «Это работает ТОЛЬКО для каждого раздела, точка», это все еще несколько расплывчато для меня. Позвольте мне еще больше упростить, у меня есть rdd.mapPartitions().reduceByKey(), вывод mapPartitions() равен A= 1 и A = 2 для 2 отдельных разделов (на одном исполнителе) перед reduce — shuffle . Может ли сокращение на стороне карты применяться к этим 2 разным разделам, в результате чего потенциально A = 3 будет записано для перемешивания, или оно вообще не применяется к разделам?

2. Концепция заключается В КАЖДОМ РАЗДЕЛЕ. не имеет значения, находятся ли разделы на одном и том же работнике или обслуживаются одним и тем же исполнителем. Затем выполняется окончательное агрегирование по всем этим разделам. Ответ правильный.