#scala #apache-spark #apache-spark-sql #spark-structured-streaming #delta-lake
Вопрос:
У меня есть потоковый процессор Spark. Фрейм данных dfNewExceptions имеет дубликаты (дублируется «идентификатором исключения»). Поскольку это потоковый набор данных, приведенный ниже запрос завершается ошибкой:
val dfNewUniqueExceptions = dfNewExceptions.sort(desc("LastUpdateTime"))
.coalesce(1)
.dropDuplicates("ExceptionId")
val dfNewExceptionCore = dfNewUniqueExceptions.select("ExceptionId", "LastUpdateTime")
dfNewExceptionCore.writeStream
.format("console")
// .outputMode("complete")
.option("truncate", "false")
.option("numRows",5000)
.start()
.awaitTermination(1000)
**
Исключение в потоке «основной» org.apache.spark.sql.AnalysisException: Сортировка не поддерживается в потоковых кадрах данных/наборах данных, если только она не выполняется в агрегированном кадре данных/наборе данных в режиме полного вывода;;
**
Это также задокументировано здесь: https://home.apache.org/~pwendell/spark-nightly/spark-branch-2.0-docs/latest/structured-streaming-programming-guide.html
Есть какие-либо предложения о том, как можно удалить дубликаты из dfNewExceptions?
Ответ №1:
Я рекомендую следовать подходу, описанному в Руководстве по структурированной потоковой передаче по дедупликации потоковой передачи. Там написано:
Вы можете дедуплицировать записи в потоках данных, используя уникальный идентификатор в событиях. Это точно так же, как удаление дублирования при статическом использовании столбца с уникальным идентификатором. Запрос сохранит необходимый объем данных из предыдущих записей, чтобы можно было фильтровать повторяющиеся записи. Аналогично агрегациям, вы можете использовать удаление дублирования с водяными знаками или без них.
С водяным знаком — Если существует верхняя граница того, с каким опозданием может появиться повторяющаяся запись, вы можете определить водяной знак в столбце «Время события» и выполнить дедупликацию, используя как идентификатор guid, так и столбцы «Время события». Запрос будет использовать водяной знак для удаления старых данных о состоянии из прошлых записей, которые, как ожидается, больше не будут дублироваться. Это ограничивает объем состояния, которое должен поддерживать запрос.
Также приведен пример в Scala:
val dfExceptions = spark.readStream. ... // columns: ExceptionId, LastUpdateTime, ...
dfExceptions
.withWatermark("LastUpdateTime", "10 seconds")
.dropDuplicates("ExceptionId", "LastUpdateTime")
Комментарии:
1. Спасибо. dropDuplicates работает для устранения дубликатов. Но я хочу устранить дубликаты — используя последнее «Последнее время», чтобы удалить старые события. Похоже, этого невозможно достичь в потоковой передаче (даже в пределах одного и того же фрагмента)?
2. Проблема в том, что последнее время приходит в случайном порядке. Поэтому без сортировки внутри куска я не вижу, как этого можно достичь. Не могли бы вы, пожалуйста, пояснить, что вы имели в виду под объединением.
3. Как пример: Из приведенных ниже 2 записей (которые входят в один и тот же фрагмент) я хочу удалить более старую: 0_7ED_25D59038A21F66638E570A8C53E4D8 3/2/2018 6:08:11 ВЕЧЕРА 0_7ED_25D59038A21F66638E570A8C53E4D8 3/8/2018 7:28:21 ВЕЧЕРА Я попробовал это : val dfNewUniqueExceptions = dfNewExceptions. С пометкой withWatermark(«Последнее время», «60 секунд»).Выпадающие копии(«Идентификатор исключения», «Последнее время») Ожидаются, событие с «3/2/2018 6:08:11 вечера» должно быть удалено. Но я вижу, что обе строки все еще присутствуют в результате.
4. Кажется, теперь я понимаю. Вы не хотите удалять дубликаты, а вместо этого выберите максимальное время по идентификатору исключения. Это не задача по устранению дублирования.
5. Я хочу извлечь все события, за исключением уникальных идентификаторов. В случае, если есть события с повторяющимися идентификаторами исключения, я хочу выбрать уникальное событие, у которого есть последнее последнее время.
Ответ №2:
Вы можете использовать водяные знаки для удаления дубликатов в определенный период времени.