#python #apache-spark #pyspark
#python #apache-spark #pyspark
Вопрос:
У меня есть данные в формате ниже.
abc, x1, x2, x3
def, x1, x3, x4,x8,x9
ghi, x7, x10, x11
Результат, который я хочу, это
0,abc, [x1, x2, x3]
1,def, [x1, x3, x4,x8,x9]
2,ghi, [x7, x10, x11]
Комментарии:
1. Вы что-нибудь пробовали?
2. Я попробовал 2 предоставленных решения и решил проблему с небольшими изменениями. что я сделал, так это путем преобразования в rdd, а затем с помощью функции map. rdd = spark.read.text(имя файла).rdd rdd = rdd.map(лямбда x: строка (номер=str(x[‘value’].split(‘,’)[0]), count=str(x[‘value’].split(‘,’)[1:])))
Ответ №1:
Ваши данные не в формате CSV. CSV означает текстовый файл, разделенный запятыми, с фиксированной схемой. CSV для ваших данных будет:
abc,x1,x2,x3,,
def,x1,x3,x4,x8,x9
ghi,x7,x10,x11,,
Обратите внимание на конечные запятые в строках 1 и 3, которых нет в ваших данных.
Поскольку у вас есть текстовый файл, который не является CSV, способ получить нужную схему в Spark — это прочитать весь файл на Python, проанализировать то, что вы хотите, а затем использовать spark.crateDataFrame()
. В качестве альтернативы, если у вас в каталоге более одного такого файла, используйте SparkContext.wholeTextFiles
а затем flatMap
вашу функцию синтаксического анализа.
Предполагая, что вы уже сделали что-то подобное open("Your File.txt").readlines
, остальное просто:
import re
from pyspark.sql import *
lines = [
"abc, x1, x2, x3",
"def, x1, x3, x4,x8,x9",
"ghi, x7, x10, x11"
]
split = re.compile("s*,s*")
Line = Row("id", "first", "rest")
def parse_line(id, line):
tokens = split.split(line.strip)
return Line(id, tokens[0], tokens.pop(0))
def parse_lines(lines):
return [parse_line(i, x) for i,x in enumerate(lines)]
spark.createDataFrame(parse_lines(lines))
Комментарии:
1. Извините, данные не в формате csv, но данные разделены запятыми, и я хочу, чтобы они были разделены именем и параметрами. Не могли бы вы помочь мне, как использовать flatmap для его разбора.
2.
flatMap
предназначено только для случаев, когда у вас много файлов. Я обновил ответ примером кода.3. Спасибо, сэр! Эта идея создания фрейма данных с помощью функции обратного вызова гениальна!
Ответ №2:
Что вы можете сделать, так это сначала сгенерировать идентификатор с помощью zipWithIndex
, а затем внутри функции map взять первую часть строки с r[0].split(",")[0]
, а вторую с r[0].split(",")[1:]
.
Вот код, описанный выше:
from pyspark.sql.types import StringType
lines = ["abc, x1, x2, x3",
"def, x1, x3, x4,x8,x9",
"ghi, x7, x10, x11"]
df = spark.createDataFrame(lines, StringType())
df = df.rdd.zipWithIndex()
.map(lambda (r, indx): (indx, r[0].split(",")[0], r[0].split(",")[1:]))
.toDF(["id", "name", "x_col"])
df.show(10, False)
И вывод:
--- ---- -----------------------
|id |name|x_col |
--- ---- -----------------------
|0 |abc |[ x1, x2, x3] |
|1 |def |[ x1, x3, x4, x8, x9]|
|2 |ghi |[ x7, x10, x11] |
--- ---- -----------------------
Комментарии:
1. Привет, lambda (r, indx) выдает ошибку как недопустимый синтаксис.
2. Привет, не могли бы вы вставить свой код, пожалуйста?
3. Конечно @abiratsis sc = SparkContext / df = sc.textFile(«/home/kubra/PycharmProjects/Recom_Data/october2020ItemList.txt «) / df = df.rdd.zipWithIndex() .map(лямбда (r, indx): (indx, r[0].разделить(«,»)[0], r[0].разделить(«,»)[1:])) . toDF([«id», «name», «x_col»]) Он не принимает lambda (r, indx) как опечатку.
4. как насчет
lambda r, indx
? Ошибка все еще сохраняется?5. В этом случае возникает следующая ошибка; TypeError: <lambda>() отсутствует 1 требуемый позиционный аргумент: ‘indx’ . Я не уверен, что делаю что-то неправильно.
Ответ №3:
Если данные поступают в файл, можно реализовать таким образом:
- Прочитайте файл как CSV;
- Добавьте столбец индекса с «monotonically_increasing_id»
- Выберите первый столбец и все остальные столбцы в виде массива.
На Scala может быть реализовано таким образом:
val df = spark.read.option("header", "false").csv("non-csv.txt")
val remainingColumns = df.columns.tail
df.withColumn("id", monotonically_increasing_id).
select(
col("id"),
col(df.columns(0)),
array(remainingColumns.head, remainingColumns.tail: _*)
).show(false)
Вывод:
--- --- --------------------
|id |_c0|array(_c1, _c2, _c3)|
--- --- --------------------
|0 |abc|[ x1, x2, x3] |
|1 |def|[ x1, x3, x4] |
|2 |ghi|[ x7, x10, x11] |
--- --- --------------------