#scala #apache-spark
#scala #apache-spark
Вопрос:
У меня есть большой объем данных (12 ГБ, сжатых), разделенных в файлах parquet (около 2000 файлов, в каждом разделе по 10 файлов).
Я хочу получать по одной строке из каждого файла parquet и делать это параллельно, а не только в драйвере. Мне все равно, какую строку я получу, но важно получить запись из каждого файла.
Я придумал этот код, но он работает очень медленно.
val df = spark.read.parquet("/path/to/parquet/")
.groupBy(input_file_name().as("fileUri")).agg(
first("myColumn").as("myColumnValue")
)
.collect()
Комментарии:
1. На самом деле это не вариант использования Spark.
2. collect() переносит все данные на главный узел, следовательно, этот код выполняется медленно, попробуйте не использовать collect, и он должен быть распространен на каждую машину, поскольку это уже rdd.
3. Это определенно не из-за
collect()
. Это просто выводит около 2000 строк в драйвер, что не так много. Выполнение этого кода занимает около 5 минут на довольно мощном кластере
Ответ №1:
Если вы можете получить List[String]
из каждого файла parquet, вы должны быть в состоянии преобразовать каждый файл в Dataframe
, а затем использовать limit(1)
функцию, чтобы получить ровно одну строку из каждого из этих файлов. Затем вы можете работать с фреймами данных в исполнителях, зная, что каждый из них имеет длину всего 1.
import spark.sqlContext.implicits._
val fileNames: Seq[string] = ...
val dfs: Seq[Dataframe] = fileNames.map{ file =>
spark.read
.parquet(file)
.toDf()
.limit(1)
}
Кроме того, если вы хотите просто иметь возможность использовать богатую библиотеку, доступную для фреймов данных, и не перебирать каждый из них, выполняя dfs.foreach(...)
, вы можете объединить Seq[Dataframe]
только в один фрейм данных, используя функции reduce
with union
:
val df : Dataframe = dfs.reduce(_ union _)
* обратите внимание, что если вы сделаете это и планируете выполнять несколько операций на вашем компьютере, то df
после reduce
вы должны вызвать df.persist(...)
, иначе каждая выполняемая вами операция df
(например, map
или a sum
) будет повторяться reduce
каждый раз целиком.
.collect()
операции — это то, что здесь замедляет работу. Это создает ровно один вызов для каждой строки и отправляет данные по проводу обратно драйверу.
Комментарии:
1. Это выглядит нормально, но буквально создает одно задание для каждого файла, и если у вас 2000 файлов, это означает 2000 заданий. Не уверен, что spark может сделать лучше, чем это.
2. Честно говоря, вариант использования OP довольно странный. 2000 файлов — это не так уж много на самом деле, и им было бы лучше сделать все это в драйвере с помощью простого ввода-вывода, а не Spark.
3. Проблема в том, что файлы parquet состоят из сжатых блоков данных. Чтобы получить хотя бы одну строку данных, вам нужно прочитать блок из файла на диске и распаковать его. Если вы попытаетесь сделать это на одной машине, вы быстро столкнетесь с узкими местами в процессоре, сети и, возможно, даже на диске.
4. Это правда, опять же, почему я говорю, что вариант использования странный. Я привык, что parquet — это множество файлов небольшого размера (даже сжатых), в первую очередь из-за того, что они записываются из Spark распределенным способом. Я бы снова задался вопросом, почему OP нужна одна строка из каждого файла parquet отдельно.
Ответ №2:
Возможно, вы можете сохранить некоторый хэш-код, например, hash(fileUri)
вместо переноса всего пути, что уменьшит объем перетасовываемых данных, который прямо сейчас есть fileUri myColumn
для каждой записи в "/path/to/parquet/"
.
PS
: этот код выполняется на всех исполнителях параллельно, только .collect()
часть создает узкое место драйвера.
Комментарии:
1. Это не из-за сбора. Если вы просто выполните
df.limit(2000).collect()
, это займет 1 секунду.2. Да, я говорю, что все работает параллельно, просто
.collect()
это единственное, что работает в драйвере..collect()
здесь нет проблемы, я думаю, что количество перетасованных данных равно (что обычно и имеет место).3. Да, я думаю, что проблема заключается в случайном перемещении, но логически не должно быть случайного перемещения, потому что к каждому файлу можно получить доступ только от одного исполнителя. Но я думаю, что spark недостаточно умен
4. Да, 1 файл читается только с формы 1 исполнителя, но в нем 2000 файлов, а
spark.sql.shuffle.partitions
по умолчанию 200 (если вы не меняли), что означает, что данные 10 файлов должны находиться в одном разделе (на одной машине), следовательно, в случайном порядке.