Приемник вывода файла Parquet — Spark Structured Streaming

#apache-spark #spark-structured-streaming

#apache-spark #spark-structured-streaming

Вопрос:

Интересно, что (и как изменить) запускает запрос Spark Structured Streaming (с настроенным приемником вывода файла Parquet) для записи данных в файлы parquet. Я периодически загружаю входные данные потока (используя StreamReader для чтения в файлах), но он не записывает выходные данные в файл Parquet для каждого файла, предоставленного в качестве входных данных. Как только я передаю ему несколько файлов, он, как правило, записывает файл Parquet просто отлично.

Мне интересно, как это контролировать. Я хотел бы иметь возможность принудительно выполнять новую запись в файл Parquet для каждого нового файла, предоставленного в качестве входных данных. Любые советы приветствуются!

Примечание: У меня установлен maxFilesPerTrigger равным 1 при вызове потока чтения. Я также вижу, что потоковый запрос обрабатывает один входной файл, однако, похоже, что один файл на входе не приводит к тому, что потоковый запрос записывает выходные данные в файл Parquet

Ответ №1:

После дальнейшего анализа и работы с приемником вывода ForEach, использующим режим добавления по умолчанию, я полагаю, что проблема, с которой я столкнулся, заключалась в сочетании режима добавления с функцией водяных знаков.

После повторного чтения https://spark.apache.org/docs/2.2.1/structured-streaming-programming-guide.html#starting-streaming-queries Похоже, что при использовании режима добавления с набором водяных знаков Spark structured steaming не будет записывать результаты агрегирования в таблицу результатов, пока не истечет срок действия водяных знаков. Режим добавления не разрешает обновлять записи, поэтому он должен дождаться, пока пройдет водяной знак, чтобы гарантировать отсутствие изменений в строке…

Я полагаю, что приемник файла Parquet не поддерживает режим обновления, однако после переключения на приемник вывода ForEach и использования режима обновления я наблюдал, как данные выходят из приемника, как я и ожидал. По сути, для каждой входящей записи по крайней мере одна выходящая запись без задержки (как наблюдалось ранее).

Надеюсь, это полезно другим.