#python #scala #apache-spark #apache-spark-sql #drop-duplicates
#python #scala #apache-искра #apache-spark-sql #удаление дубликатов
Вопрос:
У меня есть фрейм данных, считанный из файла Avro в Hadoop, с тремя столбцами (a, b, c), где один является ключевым столбцом, а среди двух других столбцов один имеет целочисленный тип, а другой — тип даты.
Я упорядочиваю фрейм по столбцу integer и столбцу date, а затем вызываю drop_duplicates по ключевому столбцу (a) в результирующем фрейме.
frame = frame.orderBy(["b","c"],ascending=False)
frame = frame.drop_duplicate('a')
Основываясь на коде Spark Scala, я вижу, что orderBy
внутри вызывает метод сортировки, который выполняет глобальную сортировку.
/**
* Returns a new Dataset sorted by the given expressions. For example:
* {{{
* ds.sort($"col1", $"col2".desc)
* }}}
*
* @group typedrel
* @since 2.0.0
*/
@scala.annotation.varargs
def sort(sortExprs: Column*): Dataset[T] = {
sortInternal(global = true, sortExprs)
}
А также метод drop_duplicates (cols) преобразуется в Aggregate(first(cols)) в соответствии с приведенным ниже кодом spark.
object ReplaceDeduplicateWithAggregate extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan transformUpWithNewOutput {
case d @ Deduplicate(keys, child) if !child.isStreaming =>
val keyExprIds = keys.map(_.exprId)
val aggCols = child.output.map { attr =>
if (keyExprIds.contains(attr.exprId)) {
attr
} else {
Alias(new First(attr).toAggregateExpression(), attr.name)()
}
}
// SPARK-22951: Physical aggregate operators distinguishes global aggregation and grouping
// aggregations by checking the number of grouping keys. The key difference here is that a
// global aggregation always returns at least one row even if there are no input rows. Here
// we append a literal when the grouping key list is empty so that the result aggregate
// operator is properly treated as a grouping aggregation.
val nonemptyKeys = if (keys.isEmpty) Literal(1) :: Nil else keys
val newAgg = Aggregate(nonemptyKeys, aggCols, child)
val attrMapping = d.output.zip(newAgg.output)
newAgg -> attrMapping
}
}
Поэтому я ожидаю, что drop duplicate переподготовит первые строки после сортировки и удалит другие. но я наблюдаю в своих заданиях spark, что это не так.
Есть мысли, почему?
Ответ №1:
Нет.
Сортировка по b amp; c, а затем удаление по a, будет работать так, как вам хотелось бы, тогда и только тогда, когда для обработки требуется только 1 раздел. С большими данными это обычно не так.
Итак, как вы можете google в другом месте:
dropDuplicates
сохраняетfirst occurrence
операцию сортировки — только если есть 1 раздел, а в противном случае это удача.Т.е. недетерминированный, когда задействовано больше разделов.
Не имеет ничего общего с avro или pyspark. Кроме того, порядок по b, c также может быть недетерминированным.
Комментарии:
1. Не могли бы вы пояснить, что вы подразумеваете под заказом на b и c, который не является детерминированным?
2. B и C, если одинаковые значения. Тогда какие сортировки сначала?
3. По нескольким строкам b1 c1, b1 c1
4. хорошо, если оба b1 и c1 имеют одинаковое значение, тогда оно будет недетерминированным, что и ожидается