Как эффективно выбирать столбцы фрейма данных, содержащие определенное значение в Spark?

#scala #apache-spark #apache-spark-sql

#scala #apache-spark #apache-spark-sql

Вопрос:

Предположим, у вас есть фрейм данных в spark (строковый тип), и вы хотите удалить любой столбец, содержащий «foo». В приведенном ниже примере фрейма данных вы должны удалить столбцы «c2» и «c3», но сохранить «c1». Однако я бы хотел, чтобы решение было обобщено на большое количество столбцов и строк.

      ------------------- 
    |   c1|   c2|     c3|
     ------------------- 
    | this|  foo|  hello|
    | that|  bar|  world|
    |other|  baz| foobar|
     ------------------- 
 

Мое решение состоит в том, чтобы сканировать каждый столбец в фрейме данных, а затем агрегировать результаты, используя API фрейма данных и встроенные функции.
Итак, сканирование каждого столбца может быть выполнено следующим образом (я новичок в scala, пожалуйста, извините за синтаксические ошибки):

df = df.select(df.columns.map(c => col(c).like("foo"))

По логике вещей, у меня был бы промежуточный фрейм данных, подобный этому:

      -------------------- 
    |    c1|    c2|    c3|
     -------------------- 
    | false|  true| false|
    | false| false| false|
    | false| false|  true|
     -------------------- 
 

Которые затем будут объединены в одну строку, чтобы считывать, какие столбцы необходимо удалить.

exprs = df.columns.map( c => max(c).alias(c))

drop = df.agg(exprs.head, exprs.tail: _*)

      -------------------- 
    |    c1|    c2|    c3|
     -------------------- 
    | false|  true|  true|
     -------------------- 
 

Теперь любой столбец, содержащий true, может быть удален.

Мой вопрос: есть ли лучший способ сделать это с точки зрения производительности? В этом случае spark прекращает сканирование столбца, как только находит «foo»? Имеет ли значение, как хранятся данные (поможет ли parquet?).

Спасибо, я здесь новичок, поэтому, пожалуйста, расскажите мне, как можно улучшить вопрос.

Комментарии:

1. Помимо того, что на самом деле это не работает, я не уверен, как вы можете это замкнуть. Интересно посмотреть, что скажут другие.

Ответ №1:

В зависимости от ваших данных, например, если у вас много foo значений, приведенный ниже код может работать более эффективно:

 val colsToDrop = df.columns.filter{ c =>
  !df.where(col(c).like("foo")).limit(1).isEmpty
}

df.drop(colsToDrop: _*)
 

ОБНОВЛЕНИЕ: Удалено избыточное .limit(1) :

 val colsToDrop = df.columns.filter{ c =>
  !df.where(col(c).like("foo")).isEmpty
}

df.drop(colsToDrop: _*)
 

Комментарии:

1. Ограничить набор результатов или поиск?

2. Я думаю .limit(1) , это не нужно.

3. @LeoC Почему это? Как, по вашему мнению, работает ограничение?

4. myDF.limit(1) является ли фрейм данных из 1 строки, если myDF он непустой, 0 строк в противном случае. Следовательно myDF.limit(1).isEmpty , допустимо для проверки, является ли myDF оно пустым или нет. Мне это кажется излишним.

5. Это хороший момент @LeoC. Вот реализация isEmpty: def isEmpty: Boolean = withAction("isEmpty", limit(1).groupBy().count().queryExecution) { plan => plan.executeCollect().head.getLong(0) == 0 }

Ответ №2:

Ответ, следующий вашей логике (разработан правильно), но я думаю, что другой ответ лучше, тем более для потомков и ваших улучшенных возможностей с Scala. Я не уверен, что другой ответ на самом деле эффективен, но и это не так. Не уверен, поможет ли parquet, сложно оценить.

Другой вариант — написать цикл в драйвере и получить доступ к каждому столбцу, а затем parquet будет полезен из-за columnar, stats и push down.

 import org.apache.spark.sql.functions._
def myUDF = udf((cols: Seq[String], cmp: String) => cols.map(code => if (code == cmp) true else false ))

val df = sc.parallelize(Seq(
   ("foo", "abc", "sss"),
   ("bar", "fff", "sss"),
   ("foo", "foo", "ddd"),
   ("bar", "ddd", "ddd")
   )).toDF("a", "b", "c")

val res = df.select($"*", array(df.columns.map(col): _*).as("colN"))
            .withColumn( "colres", myUDF( col("colN") , lit("foo") )  )

res.show()
res.printSchema()
val n = 3
val res2 = res.select( (0 until n).map(i => col("colres")(i).alias(s"c${i 1}")): _*)
res2.show(false)

val exprs = res2.columns.map( c => max(c).alias(c))
val drop = res2.agg(exprs.head, exprs.tail: _*)
drop.show(false)