Spark получает по одной строке из каждого файла parquet

#scala #apache-spark

#scala #apache-spark

Вопрос:

У меня есть большой объем данных (12 ГБ, сжатых), разделенных в файлах parquet (около 2000 файлов, в каждом разделе по 10 файлов).

Я хочу получать по одной строке из каждого файла parquet и делать это параллельно, а не только в драйвере. Мне все равно, какую строку я получу, но важно получить запись из каждого файла.

Я придумал этот код, но он работает очень медленно.

 val df = spark.read.parquet("/path/to/parquet/")
    .groupBy(input_file_name().as("fileUri")).agg(
      first("myColumn").as("myColumnValue")
    )
    .collect()
  

Комментарии:

1. На самом деле это не вариант использования Spark.

2. collect() переносит все данные на главный узел, следовательно, этот код выполняется медленно, попробуйте не использовать collect, и он должен быть распространен на каждую машину, поскольку это уже rdd.

3. Это определенно не из-за collect() . Это просто выводит около 2000 строк в драйвер, что не так много. Выполнение этого кода занимает около 5 минут на довольно мощном кластере

Ответ №1:

Если вы можете получить List[String] из каждого файла parquet, вы должны быть в состоянии преобразовать каждый файл в Dataframe , а затем использовать limit(1) функцию, чтобы получить ровно одну строку из каждого из этих файлов. Затем вы можете работать с фреймами данных в исполнителях, зная, что каждый из них имеет длину всего 1.

 import spark.sqlContext.implicits._

val fileNames: Seq[string] = ...

val dfs: Seq[Dataframe] = fileNames.map{ file => 
   spark.read
        .parquet(file)
        .toDf()
        .limit(1) 
}
  

Кроме того, если вы хотите просто иметь возможность использовать богатую библиотеку, доступную для фреймов данных, и не перебирать каждый из них, выполняя dfs.foreach(...) , вы можете объединить Seq[Dataframe] только в один фрейм данных, используя функции reduce with union :

 val df : Dataframe = dfs.reduce(_ union _)
  

* обратите внимание, что если вы сделаете это и планируете выполнять несколько операций на вашем компьютере, то df после reduce вы должны вызвать df.persist(...) , иначе каждая выполняемая вами операция df (например, map или a sum ) будет повторяться reduce каждый раз целиком.

.collect() операции — это то, что здесь замедляет работу. Это создает ровно один вызов для каждой строки и отправляет данные по проводу обратно драйверу.

Комментарии:

1. Это выглядит нормально, но буквально создает одно задание для каждого файла, и если у вас 2000 файлов, это означает 2000 заданий. Не уверен, что spark может сделать лучше, чем это.

2. Честно говоря, вариант использования OP довольно странный. 2000 файлов — это не так уж много на самом деле, и им было бы лучше сделать все это в драйвере с помощью простого ввода-вывода, а не Spark.

3. Проблема в том, что файлы parquet состоят из сжатых блоков данных. Чтобы получить хотя бы одну строку данных, вам нужно прочитать блок из файла на диске и распаковать его. Если вы попытаетесь сделать это на одной машине, вы быстро столкнетесь с узкими местами в процессоре, сети и, возможно, даже на диске.

4. Это правда, опять же, почему я говорю, что вариант использования странный. Я привык, что parquet — это множество файлов небольшого размера (даже сжатых), в первую очередь из-за того, что они записываются из Spark распределенным способом. Я бы снова задался вопросом, почему OP нужна одна строка из каждого файла parquet отдельно.

Ответ №2:

Возможно, вы можете сохранить некоторый хэш-код, например, hash(fileUri) вместо переноса всего пути, что уменьшит объем перетасовываемых данных, который прямо сейчас есть fileUri myColumn для каждой записи в "/path/to/parquet/" .

PS : этот код выполняется на всех исполнителях параллельно, только .collect() часть создает узкое место драйвера.

Комментарии:

1. Это не из-за сбора. Если вы просто выполните df.limit(2000).collect() , это займет 1 секунду.

2. Да, я говорю, что все работает параллельно, просто .collect() это единственное, что работает в драйвере. .collect() здесь нет проблемы, я думаю, что количество перетасованных данных равно (что обычно и имеет место).

3. Да, я думаю, что проблема заключается в случайном перемещении, но логически не должно быть случайного перемещения, потому что к каждому файлу можно получить доступ только от одного исполнителя. Но я думаю, что spark недостаточно умен

4. Да, 1 файл читается только с формы 1 исполнителя, но в нем 2000 файлов, а spark.sql.shuffle.partitions по умолчанию 200 (если вы не меняли), что означает, что данные 10 файлов должны находиться в одном разделе (на одной машине), следовательно, в случайном порядке.