Устраните дубликаты (дедупликацию) в потоковом кадре данных

#scala #apache-spark #apache-spark-sql #spark-structured-streaming #delta-lake

Вопрос:

У меня есть потоковый процессор Spark. Фрейм данных dfNewExceptions имеет дубликаты (дублируется «идентификатором исключения»). Поскольку это потоковый набор данных, приведенный ниже запрос завершается ошибкой:

 val dfNewUniqueExceptions = dfNewExceptions.sort(desc("LastUpdateTime"))
                                    .coalesce(1)
                                    .dropDuplicates("ExceptionId")
                                    
val dfNewExceptionCore = dfNewUniqueExceptions.select("ExceptionId", "LastUpdateTime")
    dfNewExceptionCore.writeStream
      .format("console")
//      .outputMode("complete")
      .option("truncate", "false")
      .option("numRows",5000)
      .start()
      .awaitTermination(1000)
  
 

**
Исключение в потоке «основной» org.apache.spark.sql.AnalysisException: Сортировка не поддерживается в потоковых кадрах данных/наборах данных, если только она не выполняется в агрегированном кадре данных/наборе данных в режиме полного вывода;;
**

Это также задокументировано здесь: https://home.apache.org/~pwendell/spark-nightly/spark-branch-2.0-docs/latest/structured-streaming-programming-guide.html

Есть какие-либо предложения о том, как можно удалить дубликаты из dfNewExceptions?

Ответ №1:

Я рекомендую следовать подходу, описанному в Руководстве по структурированной потоковой передаче по дедупликации потоковой передачи. Там написано:

Вы можете дедуплицировать записи в потоках данных, используя уникальный идентификатор в событиях. Это точно так же, как удаление дублирования при статическом использовании столбца с уникальным идентификатором. Запрос сохранит необходимый объем данных из предыдущих записей, чтобы можно было фильтровать повторяющиеся записи. Аналогично агрегациям, вы можете использовать удаление дублирования с водяными знаками или без них.

С водяным знаком — Если существует верхняя граница того, с каким опозданием может появиться повторяющаяся запись, вы можете определить водяной знак в столбце «Время события» и выполнить дедупликацию, используя как идентификатор guid, так и столбцы «Время события». Запрос будет использовать водяной знак для удаления старых данных о состоянии из прошлых записей, которые, как ожидается, больше не будут дублироваться. Это ограничивает объем состояния, которое должен поддерживать запрос.

Также приведен пример в Scala:

 val dfExceptions = spark.readStream. ... // columns: ExceptionId, LastUpdateTime, ... 

dfExceptions 
  .withWatermark("LastUpdateTime", "10 seconds") 
  .dropDuplicates("ExceptionId", "LastUpdateTime")
 

Комментарии:

1. Спасибо. dropDuplicates работает для устранения дубликатов. Но я хочу устранить дубликаты — используя последнее «Последнее время», чтобы удалить старые события. Похоже, этого невозможно достичь в потоковой передаче (даже в пределах одного и того же фрагмента)?

2. Проблема в том, что последнее время приходит в случайном порядке. Поэтому без сортировки внутри куска я не вижу, как этого можно достичь. Не могли бы вы, пожалуйста, пояснить, что вы имели в виду под объединением.

3. Как пример: Из приведенных ниже 2 записей (которые входят в один и тот же фрагмент) я хочу удалить более старую: 0_7ED_25D59038A21F66638E570A8C53E4D8 3/2/2018 6:08:11 ВЕЧЕРА 0_7ED_25D59038A21F66638E570A8C53E4D8 3/8/2018 7:28:21 ВЕЧЕРА Я попробовал это : val dfNewUniqueExceptions = dfNewExceptions. С пометкой withWatermark(«Последнее время», «60 секунд»).Выпадающие копии(«Идентификатор исключения», «Последнее время») Ожидаются, событие с «3/2/2018 6:08:11 вечера» должно быть удалено. Но я вижу, что обе строки все еще присутствуют в результате.

4. Кажется, теперь я понимаю. Вы не хотите удалять дубликаты, а вместо этого выберите максимальное время по идентификатору исключения. Это не задача по устранению дублирования.

5. Я хочу извлечь все события, за исключением уникальных идентификаторов. В случае, если есть события с повторяющимися идентификаторами исключения, я хочу выбрать уникальное событие, у которого есть последнее последнее время.

Ответ №2:

Вы можете использовать водяные знаки для удаления дубликатов в определенный период времени.