Scala — как мне преобразовать из фрейма данных в массив [caseclass]

#arrays #scala #dataframe #apache-spark #databricks

#массивы #scala #фрейм данных #apache-spark #блоки данных

Вопрос:

Я использую DataBricks для обработки данных.

Один из столбцов в моем фрейме данных является массивом. Я хочу извлечь его, чтобы я мог обработать его, но у меня проблемы с синтаксисом. Кажется, я все еще извлекаю его как фрейм данных, а не конвертирую в массив

Вот пример. Мои данные представляют собой набор животных. У каждого животного есть имя и набор движений.

Я определяю два класса Case для хранения данных

 case class Movement ( 
  location: String,
  direction: String
)

case class Animal(
  var name: String,
  var movements: Array[Movement]
)
  

Я определяю данные для 2 животных, Гордона и Дэвида

 val m1 = Movement( "farm1", "arrive");
val m2 = Movement( "farm1", "leave");
val m3 = Movement( "farm2", "arrive");
val m4 = Movement( "farm3", "arrive");

val am = Array( m1, m2, m3);
val am2 = Array( m1, m2, m4);
val df : Animal = Animal("Gordon", am )
val df2 : Animal = Animal( "David", am2 )
val df3 = Seq( df, df2 ).toDF;
  

У меня есть подпрограмма, которая обрабатывает перемещения. Для упрощения в этом примере он просто отображает их

 def showMoves( amIn: Array[Movement]) {
  for( mv <- amIn ) (
    println( mv.location   " "   mv.direction )
  )
}
  

Это отлично работает для движений Гордона и Дэвида

 showMoves( am )
showMoves( am2 )
  

И результаты таковы:

 farm1 arrive
farm1 leave
farm2 arrive

farm1 arrive
farm1 leave
farm3 arrive
  

Это все настроенные данные.

Теперь я решаю, что меня интересует только Гордон, и хочу извлечь его движения

 val df4 = df3.filter( "name == 'Gordon'")
var am3 = df4.select("movements").as[Array[Movement]]
  

Scala сообщает мне:

 am3:org.apache.spark.sql.Dataset[Array[Movement]] = [movements: array]
  

Теперь я хочу обработать его перемещения, и вот в чем проблема.

 showMoves( am3 )
  

Scala сообщает мне следующее

 command-721439694904705:44: error: type mismatch;
 found   : org.apache.spark.sql.Dataset[Array[Movement]]
 required: Array[Movement]
showMoves( am3 )
  

Насколько я понимаю, из этого я создал am3 как Dataset[Массив [Движение]] вместо Array [Движение], но чего я не могу понять, так это как создать am3 в правильном типе.

Ответ №1:

Вы должны использовать функцию collectAsList для преобразования вашего фрейма данных в массив, а затем сопоставить его с массивом перемещения. Когда вы делаете это, вы собираете все свои данные на главном узле. Таким образом, у вас может возникнуть ошибка оом, если данные тяжелые.
Поэтому вместо

 var am3 = df4.select("movements").as[Array[Movement]]
  

Вы должны использовать

 df4.select("movements").collectAsList()
  

Комментарии:

1.Я не смог заставить это работать. Когда я попытался: var am3 = df4.select("movements").collectAsList() Scala сказала, что тип был df4:org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [name: string, movements: array] , и это выдало следующую ошибку при вызове showMoves( am3 ) command-721439694904705:47: error: type mismatch; found : java.util.List[org.apache.spark.sql.Row] required: Array[Movement] showMoves( am3 )

2. Прошу прощения, я не смог найти, как начать новую строку в mini markdown

3. вам нужно добавить .as[Перемещение] Я думаю, df4.select(«movements»).as[Movement].collectAsList() и Movement должны быть классом case