#apache-spark #pyspark #sequencefile
#apache-spark #pyspark #sequencefile
Вопрос:
Мои данные в HDFS представлены в формате Sequence file. Я использую PySpark (Spark 1.6) и пытаюсь достичь 2 вещей:
-
Путь к данным содержит временную метку в формате гггг / мм / дд / чч, которую я хотел бы внести в сами данные. Я попробовал SparkContext.wholeTextFiles, но я думаю, что это может не поддерживать формат файла Sequence.
-
Как мне поступить с пунктом выше, если я хочу обработать данные за день и хочу внести дату в данные? В этом случае я бы загружал данные в формате гггг / мм / дд / *.
Ценю любые подсказки.
Ответ №1:
Если сохраненные типы совместимы с типами SQL и вы используете Spark 2.0, это довольно просто. Импорт input_file_name
:
from pyspark.sql.functions import input_file_name
Прочитать файл и преобразовать в DataFrame
:
df = sc.sequenceFile("/tmp/foo/").toDF()
Добавить имя файла:
df.withColumn("input", input_file_name())
Если это решение неприменимо в вашем случае, то универсальным решением является прямой список файлов (для HDFS вы можете использовать hdfs3
библиотеку):
files = ...
прочитайте один за другим, добавив имя файла:
def read(f):
"""Just to avoid problems with late binding"""
return sc.sequenceFile(f).map(lambda x: (f, x))
rdds = [read(f) for f in files]
и объединение:
sc.union(rdds)
Комментарии:
1. Хорошее решение, работает для меня. Но в RDD нет метода toDF(), вместо этого следует использовать createDataFrame.