Как получить Seq элементов, которые были сгруппированы в предложении GROUP BY?

#scala #apache-spark

#scala #apache-spark

Вопрос:

У меня есть три класса case:

 case class Section(key: Key, from: String, to: String, travellers: Int)

case class Course(groupedSections: Seq[GroupedSection])

case class GroupedSection(from: String, to: String, sections: Seq[Section])
  

Разделы не являются уникальными, например

 (Section(key1, "a", "b", 1), Section(key1, "a", "b", 2), Section(key1, "b", "c", 3), Section(key2, "a", "b", 1)) 
  

и я хочу получить курсы, которые содержат сгруппированные разделы по ключу, которые выглядели бы так в моем примере:

 (Course(
    GroupedSection("a", "b", (section1, section2 (I shortened this))), GroupedSection("b", "c", (section3))), 
 Course(
    GroupedSection("a", "b", (section4)))
) 
  

Seq разделов важен, чтобы я мог получить различные атрибуты класса Section на следующем шаге. Мой вопрос в том, возможно ли добавить все разделы, которые были сгруппированы в Seq внутри класса GroupedSection через spark. Я пробовал что-то подобное, но я не знаю способа получить последовательность разделов:

 sections
      .groupBy("key")
      .agg(sort_array(collect_list(struct("from", "to"))).as(
        "groupedSections"))
      .select($"groupedSections")
      .as[Course]
  

Если вам нужна дополнительная информация, просто дайте мне знать 🙂

Ответ №1:

Что вы точно хотите сделать, так это выполнить groupBy для массива, который был получен при использовании collect_list во время groupBy в dataframe.

Начиная с spark 3.0.1, нет встроенных функций, которые могут группировать массив по ключу. Учитывая это, у вас есть два варианта:

  • Вы можете использовать только встроенные функции Spark, а затем вы должны выполнить два groupBy в вашем фрейме данных.
  • Поскольку у вас уже есть классы case, вы можете преобразовать свой dataframe в dataset, сгруппировать по вашему ключу и выполнить groupBy для вашего массива, используя код scala

Решение с использованием только встроенных функций Spark

В этом случае вы выполняете два groupBy. Первый использует столбцы «key», «from» и «to», а второй использует только столбец «key». Первый создает список разделов, которые будут использоваться второй groupBy для создания курсов:

 import org.apache.spark.sql.functions.{col, collect_list, struct}

import sparkSession.implicits._

sections
  .groupBy("key", "from", "to").agg(
    collect_list(
      struct(col("key"), col("from"), col("to"), col("travellers"))
    ).as("sections")
  )
  .groupBy("key").agg(collect_list(
     struct(col("from"), col("to"), col("sections"))
  ).as("groupedSections"))
  .select("groupedSections")
  .as[Course]
  

Решение с наборами данных

Здесь вы приводите строки вашего фрейма данных Section , используете метод K)(implicitevidence$3:org.apache.spark.sql.Encoder[K]):org.apache.spark.sql.KeyValueGroupedDataset[K,T]» rel=»nofollow noreferrer»>groupByKey для группировки по вашему ключу, а затем выполняете преобразование с Course использованием U)(implicitevidence$4:org.apache.spark.sql.Encoder[U]):org.apache.spark.sql.Dataset[U]» rel=»nofollow noreferrer»/>метода mapGroups:

 import sparkSession.implicits._

sections
  .as[Section]
  .groupByKey(_.key)
  .mapGroups((_, sections) => Course(
    sections.toSeq
      .groupBy(s => (s.from, s.to))
      .map(keyValue => GroupedSection(keyValue._1._1, keyValue._1._2, keyValue._2))
      .toSeq
    )
  )
  

Комментарии:

1. Большое вам спасибо. Я попробовал решение со встроенными функциями Spark, и оно сработало для меня.