Гарантирует ли drop_duplicate сохранение первой строки и удаление остальных строк после сортировки фрейма данных в spark?

#python #scala #apache-spark #apache-spark-sql #drop-duplicates

#python #scala #apache-искра #apache-spark-sql #удаление дубликатов

Вопрос:

У меня есть фрейм данных, считанный из файла Avro в Hadoop, с тремя столбцами (a, b, c), где один является ключевым столбцом, а среди двух других столбцов один имеет целочисленный тип, а другой — тип даты.

Я упорядочиваю фрейм по столбцу integer и столбцу date, а затем вызываю drop_duplicates по ключевому столбцу (a) в результирующем фрейме.

 frame = frame.orderBy(["b","c"],ascending=False)
frame = frame.drop_duplicate('a')
 

Основываясь на коде Spark Scala, я вижу, что orderBy внутри вызывает метод сортировки, который выполняет глобальную сортировку.

 /**
   * Returns a new Dataset sorted by the given expressions. For example:
   * {{{
   *   ds.sort($"col1", $"col2".desc)
   * }}}
   *
   * @group typedrel
   * @since 2.0.0
   */
  @scala.annotation.varargs
  def sort(sortExprs: Column*): Dataset[T] = {
    sortInternal(global = true, sortExprs)
  }
 

https://github.com/apache/spark/blob/branch-2.4/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala

А также метод drop_duplicates (cols) преобразуется в Aggregate(first(cols)) в соответствии с приведенным ниже кодом spark.

 object ReplaceDeduplicateWithAggregate extends Rule[LogicalPlan] {
  def apply(plan: LogicalPlan): LogicalPlan = plan transformUpWithNewOutput {
    case d @ Deduplicate(keys, child) if !child.isStreaming =>
      val keyExprIds = keys.map(_.exprId)
      val aggCols = child.output.map { attr =>
        if (keyExprIds.contains(attr.exprId)) {
          attr
        } else {
          Alias(new First(attr).toAggregateExpression(), attr.name)()
        }
      }
      // SPARK-22951: Physical aggregate operators distinguishes global aggregation and grouping
      // aggregations by checking the number of grouping keys. The key difference here is that a
      // global aggregation always returns at least one row even if there are no input rows. Here
      // we append a literal when the grouping key list is empty so that the result aggregate
      // operator is properly treated as a grouping aggregation.
      val nonemptyKeys = if (keys.isEmpty) Literal(1) :: Nil else keys
      val newAgg = Aggregate(nonemptyKeys, aggCols, child)
      val attrMapping = d.output.zip(newAgg.output)
      newAgg -> attrMapping
  }
}

 

https://github.com/apache/spark/blob/branch-2.4/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala

Поэтому я ожидаю, что drop duplicate переподготовит первые строки после сортировки и удалит другие. но я наблюдаю в своих заданиях spark, что это не так.

Есть мысли, почему?

Ответ №1:

Нет.

Сортировка по b amp; c, а затем удаление по a, будет работать так, как вам хотелось бы, тогда и только тогда, когда для обработки требуется только 1 раздел. С большими данными это обычно не так.

Итак, как вы можете google в другом месте: dropDuplicates сохраняет first occurrence операцию сортировки — только если есть 1 раздел, а в противном случае это удача.

Т.е. недетерминированный, когда задействовано больше разделов.

Не имеет ничего общего с avro или pyspark. Кроме того, порядок по b, c также может быть недетерминированным.

Комментарии:

1. Не могли бы вы пояснить, что вы подразумеваете под заказом на b и c, который не является детерминированным?

2. B и C, если одинаковые значения. Тогда какие сортировки сначала?

3. По нескольким строкам b1 c1, b1 c1

4. хорошо, если оба b1 и c1 имеют одинаковое значение, тогда оно будет недетерминированным, что и ожидается