Получить путь к файлу HDFS в PySpark для файлов в формате sequence file

#apache-spark #pyspark #sequencefile

#apache-spark #pyspark #sequencefile

Вопрос:

Мои данные в HDFS представлены в формате Sequence file. Я использую PySpark (Spark 1.6) и пытаюсь достичь 2 вещей:

  1. Путь к данным содержит временную метку в формате гггг / мм / дд / чч, которую я хотел бы внести в сами данные. Я попробовал SparkContext.wholeTextFiles, но я думаю, что это может не поддерживать формат файла Sequence.

  2. Как мне поступить с пунктом выше, если я хочу обработать данные за день и хочу внести дату в данные? В этом случае я бы загружал данные в формате гггг / мм / дд / *.

Ценю любые подсказки.

Ответ №1:

Если сохраненные типы совместимы с типами SQL и вы используете Spark 2.0, это довольно просто. Импорт input_file_name :

 from pyspark.sql.functions import input_file_name 
  

Прочитать файл и преобразовать в DataFrame :

 df = sc.sequenceFile("/tmp/foo/").toDF()
  

Добавить имя файла:

 df.withColumn("input", input_file_name())
  

Если это решение неприменимо в вашем случае, то универсальным решением является прямой список файлов (для HDFS вы можете использовать hdfs3 библиотеку):

 files = ...
  

прочитайте один за другим, добавив имя файла:

 def read(f):
    """Just to avoid problems with late binding"""
    return sc.sequenceFile(f).map(lambda x: (f, x))

rdds = [read(f) for f in files]
  

и объединение:

 sc.union(rdds)
  

Комментарии:

1. Хорошее решение, работает для меня. Но в RDD нет метода toDF(), вместо этого следует использовать createDataFrame.