Запуск RDD в Dataframe

#scala #apache-spark #dataframe

#scala #apache-spark #фрейм данных

Вопрос:

Ниже приведены данные в файле

 PREFIX|Description|Destination|Num_Type
1|C1|IDD|NA
7|C2|IDDD|NA
20|C3|IDDD|NA
27|C3|IDDD|NA
30|C5|IDDD|NA
  

Я пытаюсь прочитать это и преобразовать в Dataframe.

 val file=sc.textFile("/user/cloudera-scm/file.csv")
val list=file.collect.toList
list.toDF.show

 -------------------- 
|               value|
 -------------------- 
|PREFIX|Descriptio...|
|         1|C1|IDD|NA|
|        7|C2|IDDD|NA|
|       20|C3|IDDD|NA|
|       27|C3|IDDD|NA|
|       30|C5|IDDD|NA|
 -------------------- 
  

Я не могу преобразовать это в dataframe с точной табличной формой

Ответ №1:

Давайте сначала рассмотрим ваш код.

 // reading a potentially big file
val file=sc.textFile("/user/cloudera-scm/file.csv")
// collecting everything to the driver
val list=file.collect.toList
// converting a local list to a dataframe (this does not work)
list.toDF.show
  

Есть способы заставить ваш код работать, но сама логика неудобна. Вы считываете данные с помощью исполнителей, помещаете все это в драйвер, чтобы просто преобразовать его в dataframe (обратно к исполнителям). Это требует большого сетевого взаимодействия, и драйверу, скорее всего, не хватит памяти для любого достаточно большого набора данных.

Что вы можете сделать, это считывает данные напрямую как фрейм данных, подобный этому (драйвер ничего не делает, и нет ненужного ввода-вывода):

 spark.read
    .option("sep", "|") // specify the delimiter
    .option("header", true) // to tell spark that there is a header
    .option("inferSchema", true) // optional, infer the types of the columns
    .csv(".../data.csv").show
 ------ ----------- ----------- -------- 
|PREFIX|Description|Destination|Num_Type|
 ------ ----------- ----------- -------- 
|     1|         C1|        IDD|      NA|
|     7|         C2|       IDDD|      NA|
|    20|         C3|       IDDD|      NA|
|    27|         C3|       IDDD|      NA|
|    30|         C5|       IDDD|      NA|
 ------ ----------- ----------- -------- 

  

Комментарии:

1. Спасибо за ответ, Оли, но я читаю из kafka stream, который в конечном итоге поступает как DStream . Преобразование Dstream в RDD[String], которые необходимо преобразовать в dataStream

2. Если вы замените read на readStream , я почти уверен, что это также сработает. Взгляните на это для примеров spark.apache.org/docs/latest /…