Spark: управление разделением для уменьшения перетасовки

#apache-spark #optimization #partitioning

#apache-spark #оптимизация #разделение

Вопрос:

Я пытаюсь разобраться в различных способах partitioning создания фрейма данных в Spark, чтобы уменьшить количество перетасовок в определенном конвейере.

Вот фрейм данных, над которым я работаю, он содержит более 4 миллиардов строк и 80 столбцов:

  ----- ------------------- ----------- 
|  msn|          timestamp| Flight_Id |
 ----- ------------------- ----------- 
|50020|2020-08-22 19:16:00|       72.0|
|50020|2020-08-22 19:15:00|       84.0|
|50020|2020-08-22 19:14:00|       96.0|
|50020|2020-08-22 19:13:00|       84.0|
|50020|2020-08-22 19:12:00|       84.0|
|50020|2020-08-22 19:11:00|       84.0|
|50020|2020-08-22 19:10:00|       84.0|
|50020|2020-08-22 19:09:00|       84.0|
|50020|2020-08-22 19:08:00|       84.0|
|50020|2020-08-22 19:07:00|       84.0|
|50020|2020-08-22 19:06:00|       84.0|
|50020|2020-08-22 19:05:00|       84.0|
|50020|2020-08-22 19:04:00|       84.0|
|50020|2020-08-22 19:03:00|       84.0|
|50020|2020-08-22 19:02:00|       84.0|
|50020|2020-08-22 19:01:00|       84.0|
|50020|2020-08-22 19:00:00|       84.0|
|50020|2020-08-22 18:59:00|       84.0|
|50020|2020-08-22 18:58:00|       84.0|
|50020|2020-08-22 18:57:00|       84.0|
 ----- ------------------- ----------- 
  

Это представляет собой набор временных рядов для разных самолетов (всего 41 самолет).
Я делаю только две вещи с этими данными :

  1. Фильтруйте, чтобы сохранить последние 30 минут каждого рейса, используя окно, разделенное на MSN и Flight_ID , и используя order By by timestamp .
  2. Для оставшихся столбцов вычислите mean stdev и нормализуйте данные.

У меня 32 исполнителя с объемом памяти 12 гб каждый, и задание завершилось сбоем после запуска в течение 30 часов со следующим сообщением :

 The driver running the job crashed, ran out of memory, or otherwise became unresponsive while it was running.
  

Глядя на план запроса, я заметил, что у меня более 300 шагов, более 60 из которых связаны с перетасовкой (физический план всех шагов выглядит точно так же):

 AdaptiveSparkPlan(isFinalPlan=false)
 - CollectLimit 1
    - HashAggregate(keys=[], functions=[avg(3546001_421#213), stddev_samp(3546001_421#213)], output=[avg(3546001_421)#10408, stddev_samp(3546001_421)#10417])
       - Exchange SinglePartition, true
          - HashAggregate(keys=[], functions=[partial_avg(3546001_421#213), partial_stddev_samp(3546001_421#213)], output=[sum#10479, count#10480L, n#10423, avg#10424, m2#10425])
             - Project [3546001_421#213]
                - Filter (isnotnull(rank#10238) amp;amp; (rank#10238 <= 1800))
                   - Window [rank(timestamp#10081) windowspecdefinition(Flight_Id_Int#209, timestamp#10081 DESC NULLS LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS rank#10238], [Flight_Id_Int#209], [timestamp#10081 DESC NULLS LAST]
                      - Sort [Flight_Id_Int#209 ASC NULLS FIRST, timestamp#10081 DESC NULLS LAST], false, 0
                         - ShuffleQueryStage 0
                            - Exchange hashpartitioning(Flight_Id_Int#209, 200), true
                               - Union
                                 :- *(1) Project [Flight_Id_Int#209, cast((cast(timestamp#212L as double) / 1.0E9) as timestamp) AS timestamp#10081, 3546001_421#213]
  

У меня есть сильное ощущение, что разделение сначала на msn помогло бы уменьшить количество перетасовки, поскольку большая часть работы выполняется на msn уровне.

Каков мой вопрос и где в моем коде я должен перераспределить? Должен ли я использовать repartition repartition с ключом, hash partitioning я читал документацию по этому другому разделителю, и я не понимаю их использования, и если это действительно решение моей проблемы.

Спасибо.

ПРАВКА 1:

Вот логический план :

 GlobalLimit 1
 - LocalLimit 1
    - Aggregate [avg(3566000_421#214) AS avg(3566000_421)#10594, stddev_samp(3566000_421#214) AS stddev_samp(3566000_421)#10603]
       - Project [3566000_421#214]
          - Filter (isnotnull(rank#10238) amp;amp; (rank#10238 <= 1800))
             - Window [rank(timestamp#10081) windowspecdefinition(msn#208, Flight_Id_Int#209, timestamp#10081 DESC NULLS LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS rank#10238], [msn#208, Flight_Id_Int#209], [timestamp#10081 DESC NULLS LAST]
                - Union
                  :- Project [msn#208, Flight_Id_Int#209, cast((cast(timestamp#212L as double) / 1.0E9) as timestamp) AS timestamp#10081, 3566000_421#214]
  

Вот раздел кода, в котором я собираю данные из озера данных, в котором они хранятся.
К вашему сведению, это делается через пользовательский API с использованием библиотеки под названием FoundryTS . Важно то, что ничего с точки зрения данных не собирается, пока я не вызову to_dataframe() метод. Я перебираю каждый msn из них, чтобы избежать слишком большого вызова, а затем объединяю весь фрейм данных вместе с unionByName

 # Loop over MSN to extract timeseries
        df = []
        for msn in msn_range:
            search_results = (SeriesMetadata.M_REPORT_NUMBER == report_number) amp; (SeriesMetadata.M_AIRCRAFT == msn)

            # Create the intervals to split TimeSeries extract by flight for each MSN
            Start_int = list(df1.where(F.col("msn") == msn).select("Start").toPandas()["Start"])
            End_int = list(df1.where(F.col("msn") == msn).select("End").toPandas()["End"])
            flight_id = list(df1.where(F.col("msn") == msn).select("id_cmsReport").toPandas()["id_cmsReport"])

            flights_interval = [Interval(
                start, end, name=flight_Id
                ) for start, end, flight_Id in zip(
                Start_int, End_int, flight_id
                )]

            """ Collect all the series in a node collections """
            output = fts.search.series(
                search_results,
                object_types=["export-control-us-ear99-a220-dal-airline-series"])
                .map_by(FF.interpolate(
                    before='nearest',
                    internal='nearest',
                    after='nearest',
                    frequency=frequency,
                    rename_columns_by=lambda x: x.metadata["parameter_id"]   "_"   x.metadata["report_number"]),
                    keys='msn') 
                .map_intervals(flights_interval, interval_name='Flight_Id_Int')
                .map(FF.time_range(period_start, period_end))
                .to_dataframe()  # !!!!  numPartitions=32  Foundry Doc : #partition = #executors see if it triggers OOM error

            df.append(output)

        output = df[0]
        for df in df[1:]:
            output = output.unionByName(df)  # Same as union but matches name instead of columns order.

        # Repartition by msn to improve latter calculation
        N = len(msn_range)
        output.repartition(N, 'msn')
  

Ответ №1:

«Драйвер, выполняющий задание, разбился, исчерпал память или иным образом перестал отвечать на запросы во время выполнения».

Первая проблема, которую вам нужно исправить, — увеличить объем памяти драйвера (не исполнителей. Память драйвера по умолчанию в spark часто настолько мала, что при многих запросах происходит сбой.

«Мой вопрос в том, как и где в моем коде я должен перераспределить»

Spark уже выполняет работу по добавлению перераспределений по мере необходимости. Скорее всего, вы только создадите дополнительную работу, вручную перераспределив данные на полпути выполнения. Одной из возможных оптимизаций является сохранение данных в таблице с привязкой, но это потенциально приведет только к удалению первого обмена и только в том случае, если ваш столбец с привязкой точно соответствует разделению хэша первого обмена.

«Глядя на план запроса, я заметил, что у меня более 300 шагов»

То, что вы описали выше, не требует 300 шагов. Здесь что-то не так. Как выглядит ваш оптимизированный логический план? среднее значение и std должны требовать только scan -> partitial agg -> exchange -> final agg. В предоставленном вами плане запроса похоже, что вы намеренно просматриваете только последние 1600 точек данных вместо последних 30 миллионов. Вы имели в виду функцию окна, а не простой агрегат (он же group by)?

Редактировать:

для msn в msn_range:

IMO это может быть частью вашей проблемы. Этот цикл for приводит к тому, что план выполнения становится очень большим, что может быть причиной возникновения проблем с ООМ в драйвере. возможно, вы сможете перевести это во что-то более удобное для spark и не выполняющее столько работы с драйвером, преобразующим этот forloop в spark.paralellize(…).map(/your code/)

Комментарии:

1. Спасибо, я уже увеличил объем памяти в драйвере с моим уровнем разрешения на облачной платформе. Когда я говорю 300 шагов, я имею в виду, что план запроса, который я вам показал, — это просто выдержка. Этот точный план повторяется более 60 раз в плане запроса. Я отредактирую этот пост, чтобы добавить логический план, связанный с этим шагом. Я смотрю на 1800 точек данных, так как у меня есть 1 точка в секунду в реальном наборе данных. Итак, 30m = 1800s.

2. о «перераспределении» я собираюсь добавить раздел кода, относящийся к этому разделу в сообщении. Я надеялся, что перетасовка для получения раздела по msn уменьшит рабочую нагрузку позже, когда моя функция Window и раздел по msn. Что вы подразумеваете под таблицей с привязкой? Как вы это делаете?

3. можете ли вы заменить оконную функцию на ГДЕ timestamp> ‘…’ И timestamp <‘…’ ?

4. кроме того, какая память вашего драйвера установлена на atm?

5. слишком большой параллелизм может вызвать проблемы с производительностью сам по себе из-за увеличения количества вещей, которые драйвер должен отслеживать, что, вероятно, является источником вашей проблемы с памятью в драйвере.

Ответ №2:

Для тех, кому это может помочь,

Вот что я ошибся в разделении :

  1. .to_dataframe() : по умолчанию в нашей облачной платформе Spark создается 200 разделов. Итак, зацикливаясь на 40 msn , я создавал раздел 40 x 200. В итоге мне пришлось решать множество мелких задач.
  2. .repartition() : Поскольку я использовал a Window и partitionBy на msn I, хотя повторное разделение с помощью msn ускорит этот шаг. Но это привело к полной перетасовке моих разделов.

результаты: 59 ГБ записи в случайном порядке в соответствии с Spark Job Tracker и> 55 тыс. задач. Задача, требующая некоторых накладных расходов, это объясняет сбой драйвера.

Что я сделал, чтобы заставить это работать :

  1. Я избавился от этой Window функции

Путем фильтрации ранее в процессе, прежде чем я извлеку данные из массива данных. Я непосредственно добыл для той части полета, которая мне была нужна. Как следствие, меньше Exchange Partition в Физическом плане для того же самого раздела.

Вот обновленный Физический план:

 AdaptiveSparkPlan(isFinalPlan=false)
 - CollectLimit 1
    - HashAggregate(keys=[], functions=[avg(3565000_421#213), stddev_samp(3565000_421#213)], output=[avg(3565000_421)#10246, stddev_samp(3565000_421)#10255])
       - ShuffleQueryStage 0
          - Exchange SinglePartition, true
             - *(43) HashAggregate(keys=[], functions=[partial_avg(3565000_421#213), partial_stddev_samp(3565000_421#213)], output=[sum#10317, count#10318L, n#10261, avg#10262, m2#10263])
                - Union
                  :- *(1) Project [3565000_421#213]
                  :   - *(1) Scan ExistingRDD[msn#208,Flight_Id_Int#209,Flight_Id_Int.start#210L
  
  1. Я уменьшил объем раздела:

Я произвольно устанавливаю его равным 5 в .to_dataframe() вызове для каждого из 40 msn .

Сборка завершилась успешно после 24 часов записи в случайном порядке. 1,1 МБ и> 27 задач.

Как отметил @Andrew Long, это, вероятно, неоптимально из-за for цикла. Я все еще создаю не менее 200 разделов для 32 исполнителей, в результате чего получается> 27 тыс. задач для управления.

Наконец-то:

В качестве последнего шага я избавился от цикла for, полагаясь на базовый API для извлечения данных в 1 большом фрейме данных и принудительно разделил до 32. Сборка все еще выполняется на момент написания этой статьи, и я отредактирую сообщение с результатом. Но на самом деле задач для управления намного меньше (в 4 раза).

РЕДАКТИРОВАНИЕ 1 — обновление

Рад сообщить, что, избавившись от цикла for и разделив фрейм данных на 64 раздела (32 исполнителя х 2 ядра) Я смог выполнить ту же работу всего за 11 часов (вместо 24 часов) с 1,9 МБ записи в случайном порядке и только 5 Тыс. задач.

PS: Я упомянул 32 (а не 64) раздела выше, но задание не удалось выполнить с 32 и имело неоптимальный параллелизм (<20), поэтому оно занимало больше времени, и у меня были незанятые исполнители. 64 в моем случае, кажется, самое приятное.