#scala #apache-spark #apache-spark-sql #user-defined-functions
#scala #apache-spark #apache-spark-sql #определяемые пользователем функции
Вопрос:
Я не могу отправить каждую группу фреймов данных одновременно исполнителю.
У меня есть данные, приведенные ниже в company_model_vals_df dataframe
.
----------------------------------------------------------------------------------------
| model_id | fiscal_year | fiscal_quarter | col1 | col2 | col3 | col4 | col5 | col6 |
----------------------------------------------------------------------------------------
| 1 | 2018 | 1 | r1 | r2 | r3 | r4 | r5 | r6 |
| 1 | 2018 | 2 | r1 | r2 | r3 | r4 | r5 | r6 |
| 1 | 2018 | 1 | r1 | r2 | r3 | r4 | r5 | r6 |
| 1 | 2018 | 2 | r1 | r2 | r3 | r4 | r5 | r6 |
| 1 | 2018 | 1 | r1 | r2 | r3 | r4 | r5 | r6 |
| 2 | 2017 | 3 | r1 | r2 | r3 | r4 | r5 | r6 |
| 2 | 2017 | 1 | r1 | r2 | r3 | r4 | r5 | r6 |
| 2 | 2017 | 3 | r1 | r2 | r3 | r4 | r5 | r6 |
| 2 | 2017 | 3 | r1 | r2 | r3 | r4 | r5 | r6 |
| 2 | 2017 | 1 | r1 | r2 | r3 | r4 | r5 | r6 |
----------------------------------------------------------------------------------------
Я хочу отправить все сгруппированные данные исполнителю, чтобы обработать их по одному за раз.
Для этого я делаю, как показано ниже:
var dist_company_model_vals_df = company_model_vals_df.select("model_id","fiscal_quarter","fiscal_year").distinct()
// Want to send each group at a time to write by executors.
dist_company_model_vals_df.foreach(rowDf => {
writeAsParquet(rowDf , parquet_file) // this simply writes the data as parquet file
})
Ошибка :
Это вызывает исключение NullPointerException, поскольку rowDf не найден на стороне исполнителя. Каков правильный способ справиться с этим в spark-sql с использованием Scala 2.11?
Часть 2: Вопрос
Когда я выполняю company_model_vals_df.groupBy(«model_id», «fiscal_quarter», «fiscal_year»), данные много разливаются по диску даже после того, как я увеличил объем памяти. Т.Е. company_model_vals_df — это огромный фрейм данных … при выполнении groupBy происходит много утечек.
То же самое в приведенном ниже примере, т.е. с partitionBy
company_model_vals_df.write.partitionBy(«model_id», «fiscal_quarter», «fiscal_year»)
КОД PSEDO: Поэтому, чтобы избежать этого, сначала я бы сделал кортежи из значений groups = company_model_vals_df.groupBy(«model_id», «fiscal_quarter», «fiscal_year»).collect
groups.forEach{ group ->
// I want to prepare child dataframes for each group from company_model_vals_df
val child_df = company_model_vals_df.where(model_id= group.model_id amp;amp; fiscal_quarter === group.fiscal_quarter amp;amp; etc)
this child_df , i want wrote to a file i.e. saveAs(path)
}
Есть ли способ это сделать.
Какие-либо функции spark или API полезны для меня здесь?
пожалуйста, предложите способ решить эту проблему.
Ответ №1:
Здесь есть несколько вариантов —
- вам нужно разбить dataset на несколько наборов данных и обрабатывать их по отдельности, например ,
var dist_company_model_vals_list = company_model_vals_df
.select("model_id","fiscal_quarter","fiscal_year").distinct().collectAsList
Затем отфильтруйте company_model_vals_df
с выводом dist_company_model_vals_list
списка, который предоставляет несколько наборов данных, с которыми вы можете работать независимо, например
def rowList = {
import org.apache.spark.sql._
var dfList:Seq[DataFrame] = Seq()
for (data <- dist_company_model_vals_list.zipWithIndex) {
val i = data._2
val row = data.-1
val filterCol = col($"model_id").equalTo(row.get(i).getInt(0).and($"fiscal_quarter").equalTo(row.get(i).getInt(1).and($"fiscal_year").equalTo(row.get(i).getInt(2))
val resultDf = company_model_vals_df.filter(filterCol)
dfList : = resultDf
}
dfList
}
- Если ваша цель — записать данные, вы можете использовать
partitionBy("model_id","fiscal_quarter","fiscal_year")
метод в DataFrameWriter для записи их отдельно.
Комментарии:
1. вы собрали в виде списка, но где итерация элементов списка? ниже приведен только первый набор записей выбора («model_id», «fiscal_quarter», «fiscal_year»), что насчет остальных наборов? т. Е. company_model_vals_df.filter(col($»model_id»).equalTo(dist_company_model_vals_list.get(0).getInt(0).and($»fiscal_quarter»).equalTo(dist_company_model_vals_list.get(0).getInt(1)))
2. спасибо, откуда берется этот «список» в «list.zipWithIndex»? Позвольте мне протестировать это
3. список — это ничто, но dist_company_model_vals_list, вычисленный в предыдущем операторе, обновил ответ. возможно, вам потребуется проверить синтаксис в IDE и исправить незначительные изменения. Если это решит проблему, пожалуйста, примите ответ.
4. выдает ошибку типа «значение foreach не является членом java.util. Список[org.apache.spark.sql.Row] » нет метода «zipWithIndex «. Что это даст значение i = data._2 и значение row = data._1 ???
5. не могли бы вы, пожалуйста, проверить вопрос из части 2
Ответ №2:
Если я правильно понимаю ваш вопрос, вы хотите манипулировать данными отдельно для каждого "model_id","fiscal_quarter","fiscal_year"
.
Если это правильно, вы бы сделали это с groupBy()
, например:
company_model_vals_df.groupBy("model_id","fiscal_quarter","fiscal_year").agg(avg($"col1") as "average")
Если вы хотите записать каждую логическую группу в отдельную папку, вы можете сделать это, написав:
company_model_vals_df.write.partitionBy("model_id","fiscal_quarter","fiscal_year").parquet("path/to/save")
Комментарии:
1. спасибо за ваш быстрый ответ. да, я хочу манипулировать данными отдельно для каждого набора «model_id», «fiscal_quarter», «fiscal_year» . Я должен повторить каждый набор и сохранить как файл parquet. Если я использую groupBy выше, из-за огромного объема данных данные передаются на рабочий стол и выполняются вечно.
2. @dytyniak не могли бы вы, пожалуйста, взглянуть на вопрос, который я задал… Мне нужно повторить каждую отдельную группу, которая состоит («model_id», «fiscal_quarter», «fiscal_year»), нужно сформировать отдельный фрейм данных и отправить его в writeAsParquet как этот dist_company_model_vals_df.foreach(rowDf => { writeAsParquet(rowDf, parquet_file) // это просто записывает данные в виде файла parquet })
3. @Shyam смотри выше
4. Спасибо. не могли бы вы, пожалуйста, проверить раздел части 2 вопроса? Спасибо