#scala #apache-spark #dataframe
#scala #apache-spark #фрейм данных
Вопрос:
Ниже приведены данные в файле
PREFIX|Description|Destination|Num_Type
1|C1|IDD|NA
7|C2|IDDD|NA
20|C3|IDDD|NA
27|C3|IDDD|NA
30|C5|IDDD|NA
Я пытаюсь прочитать это и преобразовать в Dataframe.
val file=sc.textFile("/user/cloudera-scm/file.csv")
val list=file.collect.toList
list.toDF.show
--------------------
| value|
--------------------
|PREFIX|Descriptio...|
| 1|C1|IDD|NA|
| 7|C2|IDDD|NA|
| 20|C3|IDDD|NA|
| 27|C3|IDDD|NA|
| 30|C5|IDDD|NA|
--------------------
Я не могу преобразовать это в dataframe с точной табличной формой
Ответ №1:
Давайте сначала рассмотрим ваш код.
// reading a potentially big file
val file=sc.textFile("/user/cloudera-scm/file.csv")
// collecting everything to the driver
val list=file.collect.toList
// converting a local list to a dataframe (this does not work)
list.toDF.show
Есть способы заставить ваш код работать, но сама логика неудобна. Вы считываете данные с помощью исполнителей, помещаете все это в драйвер, чтобы просто преобразовать его в dataframe (обратно к исполнителям). Это требует большого сетевого взаимодействия, и драйверу, скорее всего, не хватит памяти для любого достаточно большого набора данных.
Что вы можете сделать, это считывает данные напрямую как фрейм данных, подобный этому (драйвер ничего не делает, и нет ненужного ввода-вывода):
spark.read
.option("sep", "|") // specify the delimiter
.option("header", true) // to tell spark that there is a header
.option("inferSchema", true) // optional, infer the types of the columns
.csv(".../data.csv").show
------ ----------- ----------- --------
|PREFIX|Description|Destination|Num_Type|
------ ----------- ----------- --------
| 1| C1| IDD| NA|
| 7| C2| IDDD| NA|
| 20| C3| IDDD| NA|
| 27| C3| IDDD| NA|
| 30| C5| IDDD| NA|
------ ----------- ----------- --------
Комментарии:
1. Спасибо за ответ, Оли, но я читаю из kafka stream, который в конечном итоге поступает как DStream . Преобразование Dstream в RDD[String], которые необходимо преобразовать в dataStream
2. Если вы замените
read
наreadStream
, я почти уверен, что это также сработает. Взгляните на это для примеров spark.apache.org/docs/latest /…