Преобразование в RDD завершается неудачно

#scala #entity-framework #apache-spark

Вопрос:

Мой код такой, как показано ниже. Я прочитал CSV-файл с двумя столбцами. Пройдите по элементам фрейма данных, преобразовав их в RDD. Теперь я хотел создать DF каждого элемента. Ниже приведен код с ошибкой. Может кто-нибудь, пожалуйста, помочь.

     val df1 = spark.read.format("csv").load("c:\file.csv") //CSV has 3 columns
     
    for (row <- df1.rdd.collect)
     {
       var tab1 =  row.mkString(",").split(",")(0) //Has Tablename
       var tab2 =  row.mkString(",").split(",")(1) //One Select Statment
       var tab3 =  row.mkString(",").split(",")(1) //Another Select Statment      
       
       val newdf = spark.createDataFrame(tab1).toDF("Col") // This is not working
               
     }
    
 

Я хочу объединить фрейм данных tab2 с tab3 и добавить имя таблицы. Например

Исключение запроса в таб2 и таб3 дает следующий результат.

 Col1     col2
---      ---
A         B
C         D
E         F
G         H
 

Я хочу, как показано ниже:

 Col0  Col1  Col2
----  ----   ---
Tab1   A      B
Tab1   C      D
Tab2   E      F
Tab3   G      h 
 

Теперь таб1 таб2 таб2.. и т.д. эта информация находится в файле CSV, который я читаю. Я хочу преобразовать этот col0 в фрейм данных,чтобы я мог читать в Spark Sql

Комментарии:

1. Не могли бы вы рассказать нам, почему вы хотите создать один кадр данных на строку вашего CSV? И в чем заключается ошибка, которую вы получаете?

2. Я не создаю фрейм данных для каждой строки, но хочу, чтобы он был по столбцу. На самом деле у каждого столбца csv есть запрос Oracle, который мне нужно выполнить.

3. Извините, я не понимаю, что вы пытаетесь сделать. Здесь вы перебираете все строки вашего rdd и создаете фрейм данных для каждой из них. Ваш код, вероятно, не работает, потому spark.createDataFrame что ожидает последовательность кортежей. Не могли бы вы, например, предоставить пример ввода и ожидаемый результат?

4. Мне очень жаль. То, что вы пытаетесь сделать, для меня все еще неясно. Каково содержимое csv — файла?

5. Я создаю временное представление из таб2 и таб3. Делаем перекрестное соединение. Теперь я хочу снова выполнить перекрестное соединение с tab1. Как я могу это сделать? Надеюсь, это ясно

Ответ №1:

Я смог решить свою проблему ниже:

 val newdf = spark.createDataFrame(tab1).toDF("Col") // This is not working
 

Автор:

 val newDf = spark.sparkContext.parallelize(Seq(newdf)).toDF("Col")