Как отправить каждую группу одновременно исполнителям spark?

#scala #apache-spark #apache-spark-sql #user-defined-functions

#scala #apache-spark #apache-spark-sql #определяемые пользователем функции

Вопрос:

Я не могу отправить каждую группу фреймов данных одновременно исполнителю.

У меня есть данные, приведенные ниже в company_model_vals_df dataframe .

  ----------------------------------------------------------------------------------------
 | model_id  |  fiscal_year  | fiscal_quarter | col1 | col2 | col3 | col4 | col5 | col6 |
 ----------------------------------------------------------------------------------------
 |    1      | 2018          |   1             | r1   | r2   | r3   |  r4  | r5   |  r6 |
 |    1      | 2018          |   2             | r1   | r2   | r3   |  r4  | r5   |  r6 |
 |    1      | 2018          |   1             | r1   | r2   | r3   |  r4  | r5   |  r6 |
 |    1      | 2018          |   2             | r1   | r2   | r3   |  r4  | r5   |  r6 |
 |    1      | 2018          |   1             | r1   | r2   | r3   |  r4  | r5   |  r6 |
 |    2      | 2017          |   3             | r1   | r2   | r3   |  r4  | r5   |  r6 |
 |    2      | 2017          |   1             | r1   | r2   | r3   |  r4  | r5   |  r6 |
 |    2      | 2017          |   3             | r1   | r2   | r3   |  r4  | r5   |  r6 |
 |    2      | 2017          |   3             | r1   | r2   | r3   |  r4  | r5   |  r6 |
 |    2      | 2017          |   1             | r1   | r2   | r3   |  r4  | r5   |  r6 |
 ----------------------------------------------------------------------------------------
  

Я хочу отправить все сгруппированные данные исполнителю, чтобы обработать их по одному за раз.

Для этого я делаю, как показано ниже:

 var dist_company_model_vals_df =  company_model_vals_df.select("model_id","fiscal_quarter","fiscal_year").distinct()

// Want to send each group at a time to write by executors.

dist_company_model_vals_df.foreach(rowDf => {
  writeAsParquet(rowDf , parquet_file)    // this simply writes the data as parquet file
})
  

Ошибка :

Это вызывает исключение NullPointerException, поскольку rowDf не найден на стороне исполнителя. Каков правильный способ справиться с этим в spark-sql с использованием Scala 2.11?

Часть 2: Вопрос

Когда я выполняю company_model_vals_df.groupBy(«model_id», «fiscal_quarter», «fiscal_year»), данные много разливаются по диску даже после того, как я увеличил объем памяти. Т.Е. company_model_vals_df — это огромный фрейм данных … при выполнении groupBy происходит много утечек.

То же самое в приведенном ниже примере, т.е. с partitionBy

company_model_vals_df.write.partitionBy(«model_id», «fiscal_quarter», «fiscal_year»)

КОД PSEDO: Поэтому, чтобы избежать этого, сначала я бы сделал кортежи из значений groups = company_model_vals_df.groupBy(«model_id», «fiscal_quarter», «fiscal_year»).collect

 groups.forEach{ group ->
   // I want to prepare child dataframes for each group from    company_model_vals_df

   val child_df = company_model_vals_df.where(model_id= group.model_id amp;amp; fiscal_quarter === group.fiscal_quarter amp;amp; etc)

 this child_df , i want wrote to a file i.e. saveAs(path)
}
  

Есть ли способ это сделать.
Какие-либо функции spark или API полезны для меня здесь?
пожалуйста, предложите способ решить эту проблему.

Ответ №1:

Здесь есть несколько вариантов —

  • вам нужно разбить dataset на несколько наборов данных и обрабатывать их по отдельности, например ,
 var dist_company_model_vals_list =  company_model_vals_df
  .select("model_id","fiscal_quarter","fiscal_year").distinct().collectAsList
  

Затем отфильтруйте company_model_vals_df с выводом dist_company_model_vals_list списка, который предоставляет несколько наборов данных, с которыми вы можете работать независимо, например

 def rowList = {
import org.apache.spark.sql._
var dfList:Seq[DataFrame] = Seq()
for (data <- dist_company_model_vals_list.zipWithIndex) {
val i = data._2
val row = data.-1
val filterCol = col($"model_id").equalTo(row.get(i).getInt(0).and($"fiscal_quarter").equalTo(row.get(i).getInt(1).and($"fiscal_year").equalTo(row.get(i).getInt(2))

   val resultDf = company_model_vals_df.filter(filterCol)    
dfList  : = resultDf
      }
dfList
}
  
  • Если ваша цель — записать данные, вы можете использовать
    partitionBy("model_id","fiscal_quarter","fiscal_year")
    метод в DataFrameWriter для записи их отдельно.

Комментарии:

1. вы собрали в виде списка, но где итерация элементов списка? ниже приведен только первый набор записей выбора («model_id», «fiscal_quarter», «fiscal_year»), что насчет остальных наборов? т. Е. company_model_vals_df.filter(col($»model_id»).equalTo(dist_company_model_vals_list.get(0).getInt(0).and($»fiscal_quarter»).equalTo(dist_company_model_vals_list.get(0).getInt(1)))

2. спасибо, откуда берется этот «список» в «list.zipWithIndex»? Позвольте мне протестировать это

3. список — это ничто, но dist_company_model_vals_list, вычисленный в предыдущем операторе, обновил ответ. возможно, вам потребуется проверить синтаксис в IDE и исправить незначительные изменения. Если это решит проблему, пожалуйста, примите ответ.

4. выдает ошибку типа «значение foreach не является членом java.util. Список[org.apache.spark.sql.Row] » нет метода «zipWithIndex «. Что это даст значение i = data._2 и значение row = data._1 ???

5. не могли бы вы, пожалуйста, проверить вопрос из части 2

Ответ №2:

Если я правильно понимаю ваш вопрос, вы хотите манипулировать данными отдельно для каждого "model_id","fiscal_quarter","fiscal_year" .

Если это правильно, вы бы сделали это с groupBy() , например:

 company_model_vals_df.groupBy("model_id","fiscal_quarter","fiscal_year").agg(avg($"col1") as "average")
  

Если вы хотите записать каждую логическую группу в отдельную папку, вы можете сделать это, написав:

 company_model_vals_df.write.partitionBy("model_id","fiscal_quarter","fiscal_year").parquet("path/to/save")
  

Комментарии:

1. спасибо за ваш быстрый ответ. да, я хочу манипулировать данными отдельно для каждого набора «model_id», «fiscal_quarter», «fiscal_year» . Я должен повторить каждый набор и сохранить как файл parquet. Если я использую groupBy выше, из-за огромного объема данных данные передаются на рабочий стол и выполняются вечно.

2. @dytyniak не могли бы вы, пожалуйста, взглянуть на вопрос, который я задал… Мне нужно повторить каждую отдельную группу, которая состоит («model_id», «fiscal_quarter», «fiscal_year»), нужно сформировать отдельный фрейм данных и отправить его в writeAsParquet как этот dist_company_model_vals_df.foreach(rowDf => { writeAsParquet(rowDf, parquet_file) // это просто записывает данные в виде файла parquet })

3. @Shyam смотри выше

4. Спасибо. не могли бы вы, пожалуйста, проверить раздел части 2 вопроса? Спасибо