#scala #entity-framework #apache-spark
Вопрос:
Мой код такой, как показано ниже. Я прочитал CSV-файл с двумя столбцами. Пройдите по элементам фрейма данных, преобразовав их в RDD. Теперь я хотел создать DF каждого элемента. Ниже приведен код с ошибкой. Может кто-нибудь, пожалуйста, помочь.
val df1 = spark.read.format("csv").load("c:\file.csv") //CSV has 3 columns
for (row <- df1.rdd.collect)
{
var tab1 = row.mkString(",").split(",")(0) //Has Tablename
var tab2 = row.mkString(",").split(",")(1) //One Select Statment
var tab3 = row.mkString(",").split(",")(1) //Another Select Statment
val newdf = spark.createDataFrame(tab1).toDF("Col") // This is not working
}
Я хочу объединить фрейм данных tab2 с tab3 и добавить имя таблицы. Например
Исключение запроса в таб2 и таб3 дает следующий результат.
Col1 col2
--- ---
A B
C D
E F
G H
Я хочу, как показано ниже:
Col0 Col1 Col2
---- ---- ---
Tab1 A B
Tab1 C D
Tab2 E F
Tab3 G h
Теперь таб1 таб2 таб2.. и т.д. эта информация находится в файле CSV, который я читаю. Я хочу преобразовать этот col0 в фрейм данных,чтобы я мог читать в Spark Sql
Комментарии:
1. Не могли бы вы рассказать нам, почему вы хотите создать один кадр данных на строку вашего CSV? И в чем заключается ошибка, которую вы получаете?
2. Я не создаю фрейм данных для каждой строки, но хочу, чтобы он был по столбцу. На самом деле у каждого столбца csv есть запрос Oracle, который мне нужно выполнить.
3. Извините, я не понимаю, что вы пытаетесь сделать. Здесь вы перебираете все строки вашего rdd и создаете фрейм данных для каждой из них. Ваш код, вероятно, не работает, потому
spark.createDataFrame
что ожидает последовательность кортежей. Не могли бы вы, например, предоставить пример ввода и ожидаемый результат?4. Мне очень жаль. То, что вы пытаетесь сделать, для меня все еще неясно. Каково содержимое csv — файла?
5. Я создаю временное представление из таб2 и таб3. Делаем перекрестное соединение. Теперь я хочу снова выполнить перекрестное соединение с tab1. Как я могу это сделать? Надеюсь, это ясно
Ответ №1:
Я смог решить свою проблему ниже:
val newdf = spark.createDataFrame(tab1).toDF("Col") // This is not working
Автор:
val newDf = spark.sparkContext.parallelize(Seq(newdf)).toDF("Col")