#scala #apache-spark
#scala #apache-spark
Вопрос:
У меня есть три класса case:
case class Section(key: Key, from: String, to: String, travellers: Int)
case class Course(groupedSections: Seq[GroupedSection])
case class GroupedSection(from: String, to: String, sections: Seq[Section])
Разделы не являются уникальными, например
(Section(key1, "a", "b", 1), Section(key1, "a", "b", 2), Section(key1, "b", "c", 3), Section(key2, "a", "b", 1))
и я хочу получить курсы, которые содержат сгруппированные разделы по ключу, которые выглядели бы так в моем примере:
(Course(
GroupedSection("a", "b", (section1, section2 (I shortened this))), GroupedSection("b", "c", (section3))),
Course(
GroupedSection("a", "b", (section4)))
)
Seq разделов важен, чтобы я мог получить различные атрибуты класса Section на следующем шаге. Мой вопрос в том, возможно ли добавить все разделы, которые были сгруппированы в Seq внутри класса GroupedSection через spark. Я пробовал что-то подобное, но я не знаю способа получить последовательность разделов:
sections
.groupBy("key")
.agg(sort_array(collect_list(struct("from", "to"))).as(
"groupedSections"))
.select($"groupedSections")
.as[Course]
Если вам нужна дополнительная информация, просто дайте мне знать 🙂
Ответ №1:
Что вы точно хотите сделать, так это выполнить groupBy для массива, который был получен при использовании collect_list
во время groupBy в dataframe.
Начиная с spark 3.0.1, нет встроенных функций, которые могут группировать массив по ключу. Учитывая это, у вас есть два варианта:
- Вы можете использовать только встроенные функции Spark, а затем вы должны выполнить два groupBy в вашем фрейме данных.
- Поскольку у вас уже есть классы case, вы можете преобразовать свой dataframe в dataset, сгруппировать по вашему ключу и выполнить groupBy для вашего массива, используя код scala
Решение с использованием только встроенных функций Spark
В этом случае вы выполняете два groupBy. Первый использует столбцы «key», «from» и «to», а второй использует только столбец «key». Первый создает список разделов, которые будут использоваться второй groupBy для создания курсов:
import org.apache.spark.sql.functions.{col, collect_list, struct}
import sparkSession.implicits._
sections
.groupBy("key", "from", "to").agg(
collect_list(
struct(col("key"), col("from"), col("to"), col("travellers"))
).as("sections")
)
.groupBy("key").agg(collect_list(
struct(col("from"), col("to"), col("sections"))
).as("groupedSections"))
.select("groupedSections")
.as[Course]
Решение с наборами данных
Здесь вы приводите строки вашего фрейма данных Section
, используете метод K)(implicitevidence$3:org.apache.spark.sql.Encoder[K]):org.apache.spark.sql.KeyValueGroupedDataset[K,T]» rel=»nofollow noreferrer»>groupByKey для группировки по вашему ключу, а затем выполняете преобразование с Course
использованием U)(implicitevidence$4:org.apache.spark.sql.Encoder[U]):org.apache.spark.sql.Dataset[U]» rel=»nofollow noreferrer»/>метода mapGroups:
import sparkSession.implicits._
sections
.as[Section]
.groupByKey(_.key)
.mapGroups((_, sections) => Course(
sections.toSeq
.groupBy(s => (s.from, s.to))
.map(keyValue => GroupedSection(keyValue._1._1, keyValue._1._2, keyValue._2))
.toSeq
)
)
Комментарии:
1. Большое вам спасибо. Я попробовал решение со встроенными функциями Spark, и оно сработало для меня.