#pyspark #spark-structured-streaming
Вопрос:
Я пишу непрерывное приложение с помощью Apache Spark. В случае структурированной потоковой передачи я пытаюсь прочитать из дельта-таблицы, выполнить агрегацию потоковой передачи по времени события через временное окно и записать результат в дельта-таблицу в режиме добавления. Мое ожидание в соответствии с документацией состоит в том, что в режиме добавления в приемник будет записан только завершенный агрегат для временного окна. Такого со мной еще не случалось. Вместо этого я вижу записи, подобные приведенным ниже, в моей целевой дельта-таблице, независимо от множества конфигураций, которые я пробовал с потоком (windowDuration=5 минут, slideDuration=20 секунд).
Как вы можете видеть из приведенного выше рисунка, одно и то же окно времени вносит много записей в приемник. Я подтвердил, что не более одной записи для временного окна выводится из каждой микропакета, но временное окно может вносить выходные записи из многих (явно не согласованных по количеству) микропакетов. Вот ядро кода агрегации потоковой передачи.
output_schema = create_trades_data_features_schema()
features_sdf = (trades_sdf.withWatermark("event_datetime", f"{trades_stream_watermark_secs} seconds")
.withColumn('time_window', f.window(timeColumn=f.col('event_datetime'),
windowDuration=f"{analysis_window_length_secs} seconds",
slideDuration=f"{analysis_window_hop_size_secs} seconds"))
.groupBy('time_window')
.applyInPandas(lambda pdf: generate_trades_data_features(pdf, output_schema, data_type_cast), output_schema))
UDF Pandas создает некоторые переменные, содержащие скалярные значения, создает фрейм данных Pandas в форме [1,N] и выводит его в качестве результата. То есть он возвращает одну строку. Единственное, на чем я группируюсь, — это временное окно. Как я могу получать несколько записей за одно и то же время? Я создавал и закрывал поток множеством способов и каждый раз получал один и тот же результат (например, в документах Delta Lake, в структурированном руководстве по потоковой передаче, и через параметры API чтения/загрузки/таблицы/суммирования, пробуя каждую конфигурацию параметров, которую я мог найти… да, много часов грубой силы). Я также пробовал различные диапазоны значений с длительностью водяного знака и периодом срабатывания; ни один из них не оказал влияния.
Является ли это ожидаемым поведением в режиме добавления (т. Е. Нескольких записей для одного и того же временного окна)?
Изменить: Я использую среду выполнения Databricks версии 8.3 ML. У него есть версия Spark «3.1.1».
Правка 2: Я предварительно рассматриваю, связана ли эта проблема с: https://issues.apache.org/jira/browse/SPARK-25756
Ответ №1:
Чтобы это не привело к присоединению к легиону вопросов, на которые нет ответов/которые будут продолжены, я запишу свой предварительный вывод ниже и обновлю его, если узнаю больше. Это может быть неправильно. Пожалуйста, не позволяйте этому сдерживать другие ответы/комментарии.
В целом, это не является преднамеренным поведением. Каждая микропатча индивидуально отправляется в UDF Pandas (т. Е. При каждом запуске в UDF отправляется текущая микропатча и только эта микропатча), и в результате запись в таблице результатов отправляется в приемник, несмотря на то, что она находится в режиме добавления. Проблема была отмечена разработчиками, и по крайней мере одна проблема JIRA была создана для ее решения. Этот поток работы, по-видимому, неактивен.
Другие данные и рекомендации:
- Несколько проблем на разных форумах (например, ссылки на базы данных) и вышеупомянутая проблема, связанная с JIRA, напрямую ссылаются или предоставляют явные примеры этой ошибки в Spark.
- Проблема существует с 2018 года, исправление, по-видимому, предназначено для версии 3.1.2, но проблема JIRA была полностью закрыта, и я не вижу продолжения обсуждения/работы.
- В настоящее время Spark Structured Streaming для разработчиков Python поддерживает только тривиальные преобразования данных в потоковых агрегациях (т. Е. Функции, которые вы можете запускать на объекте GroupedData, за исключением apply или applyInPandas).
- Если вы ищете механизм потоковых вычислений для нетривиального приложения, не ожидайте поддержки от API Python Spark, пока эта проблема не будет решена.
Очень интересно услышать о потенциальных обходных путях или о том, пришел ли я к неверному выводу выше. Ваше здоровье.
Комментарии:
1. Я пришел к выводу, что это не предполагаемое поведение. Мой предварительный вывод был верен. Обходной путь состоит в том, чтобы для каждого столбца, необходимого в UDF Pandas, собрать все значения в столбце в один массив и передать в UDF одну запись с правильной меткой времени.