Прочитать данные текстового файла с помощью Spark и разделить данные с помощью запятой — python

#python #apache-spark #pyspark

#python #apache-spark #pyspark

Вопрос:

У меня есть данные в формате ниже.

 abc, x1, x2, x3  
def, x1, x3, x4,x8,x9   
ghi, x7, x10, x11  
  

Результат, который я хочу, это

 0,abc, [x1, x2, x3]  
1,def, [x1, x3, x4,x8,x9]  
2,ghi, [x7, x10, x11]
  

Комментарии:

1. Вы что-нибудь пробовали?

2. Я попробовал 2 предоставленных решения и решил проблему с небольшими изменениями. что я сделал, так это путем преобразования в rdd, а затем с помощью функции map. rdd = spark.read.text(имя файла).rdd rdd = rdd.map(лямбда x: строка (номер=str(x[‘value’].split(‘,’)[0]), count=str(x[‘value’].split(‘,’)[1:])))

Ответ №1:

Ваши данные не в формате CSV. CSV означает текстовый файл, разделенный запятыми, с фиксированной схемой. CSV для ваших данных будет:

 abc,x1,x2,x3,,
def,x1,x3,x4,x8,x9
ghi,x7,x10,x11,,
  

Обратите внимание на конечные запятые в строках 1 и 3, которых нет в ваших данных.

Поскольку у вас есть текстовый файл, который не является CSV, способ получить нужную схему в Spark — это прочитать весь файл на Python, проанализировать то, что вы хотите, а затем использовать spark.crateDataFrame() . В качестве альтернативы, если у вас в каталоге более одного такого файла, используйте SparkContext.wholeTextFiles а затем flatMap вашу функцию синтаксического анализа.

Предполагая, что вы уже сделали что-то подобное open("Your File.txt").readlines , остальное просто:

 import re
from pyspark.sql import *

lines = [
  "abc, x1, x2, x3",
  "def, x1, x3, x4,x8,x9",
  "ghi, x7, x10, x11"
]

split = re.compile("s*,s*")
Line = Row("id", "first", "rest")

def parse_line(id, line):
  tokens = split.split(line.strip)
  return Line(id, tokens[0], tokens.pop(0))

def parse_lines(lines):
  return [parse_line(i, x) for i,x in enumerate(lines)]

spark.createDataFrame(parse_lines(lines))
  

Комментарии:

1. Извините, данные не в формате csv, но данные разделены запятыми, и я хочу, чтобы они были разделены именем и параметрами. Не могли бы вы помочь мне, как использовать flatmap для его разбора.

2. flatMap предназначено только для случаев, когда у вас много файлов. Я обновил ответ примером кода.

3. Спасибо, сэр! Эта идея создания фрейма данных с помощью функции обратного вызова гениальна!

Ответ №2:

Что вы можете сделать, так это сначала сгенерировать идентификатор с помощью zipWithIndex , а затем внутри функции map взять первую часть строки с r[0].split(",")[0] , а вторую с r[0].split(",")[1:] .

Вот код, описанный выше:

 from pyspark.sql.types import StringType

lines = ["abc, x1, x2, x3",
        "def, x1, x3, x4,x8,x9",
        "ghi, x7, x10, x11"]

df = spark.createDataFrame(lines, StringType())
df = df.rdd.zipWithIndex() 
           .map(lambda (r, indx): (indx, r[0].split(",")[0], r[0].split(",")[1:])) 
           .toDF(["id", "name", "x_col"])

df.show(10, False)
  

И вывод:

  --- ---- ----------------------- 
|id |name|x_col                  |
 --- ---- ----------------------- 
|0  |abc |[ x1,  x2,  x3]        |
|1  |def |[ x1,  x3,  x4, x8, x9]|
|2  |ghi |[ x7,  x10,  x11]      |
 --- ---- ----------------------- 
  

Комментарии:

1. Привет, lambda (r, indx) выдает ошибку как недопустимый синтаксис.

2. Привет, не могли бы вы вставить свой код, пожалуйста?

3. Конечно @abiratsis sc = SparkContext / df = sc.textFile(«/home/kubra/PycharmProjects/Recom_Data/october2020ItemList.txt «) / df = df.rdd.zipWithIndex() .map(лямбда (r, indx): (indx, r[0].разделить(«,»)[0], r[0].разделить(«,»)[1:])) . toDF([«id», «name», «x_col»]) Он не принимает lambda (r, indx) как опечатку.

4. как насчет lambda r, indx ? Ошибка все еще сохраняется?

5. В этом случае возникает следующая ошибка; TypeError: <lambda>() отсутствует 1 требуемый позиционный аргумент: ‘indx’ . Я не уверен, что делаю что-то неправильно.

Ответ №3:

Если данные поступают в файл, можно реализовать таким образом:

  1. Прочитайте файл как CSV;
  2. Добавьте столбец индекса с «monotonically_increasing_id»
  3. Выберите первый столбец и все остальные столбцы в виде массива.

На Scala может быть реализовано таким образом:

 val df = spark.read.option("header", "false").csv("non-csv.txt")
val remainingColumns = df.columns.tail
df.withColumn("id", monotonically_increasing_id).
  select(
    col("id"),
    col(df.columns(0)),
    array(remainingColumns.head, remainingColumns.tail: _*)
  ).show(false)
  

Вывод:

  --- --- -------------------- 
|id |_c0|array(_c1, _c2, _c3)|
 --- --- -------------------- 
|0  |abc|[ x1,  x2,  x3]     |
|1  |def|[ x1,  x3,  x4]     |
|2  |ghi|[ x7,  x10,  x11]   |
 --- --- --------------------