#scala #apache-spark #apache-spark-sql
#scala #apache-spark #apache-spark-sql
Вопрос:
Предположим, у вас есть фрейм данных в spark (строковый тип), и вы хотите удалить любой столбец, содержащий «foo». В приведенном ниже примере фрейма данных вы должны удалить столбцы «c2» и «c3», но сохранить «c1». Однако я бы хотел, чтобы решение было обобщено на большое количество столбцов и строк.
-------------------
| c1| c2| c3|
-------------------
| this| foo| hello|
| that| bar| world|
|other| baz| foobar|
-------------------
Мое решение состоит в том, чтобы сканировать каждый столбец в фрейме данных, а затем агрегировать результаты, используя API фрейма данных и встроенные функции.
Итак, сканирование каждого столбца может быть выполнено следующим образом (я новичок в scala, пожалуйста, извините за синтаксические ошибки):
df = df.select(df.columns.map(c => col(c).like("foo"))
По логике вещей, у меня был бы промежуточный фрейм данных, подобный этому:
--------------------
| c1| c2| c3|
--------------------
| false| true| false|
| false| false| false|
| false| false| true|
--------------------
Которые затем будут объединены в одну строку, чтобы считывать, какие столбцы необходимо удалить.
exprs = df.columns.map( c => max(c).alias(c))
drop = df.agg(exprs.head, exprs.tail: _*)
--------------------
| c1| c2| c3|
--------------------
| false| true| true|
--------------------
Теперь любой столбец, содержащий true, может быть удален.
Мой вопрос: есть ли лучший способ сделать это с точки зрения производительности? В этом случае spark прекращает сканирование столбца, как только находит «foo»? Имеет ли значение, как хранятся данные (поможет ли parquet?).
Спасибо, я здесь новичок, поэтому, пожалуйста, расскажите мне, как можно улучшить вопрос.
Комментарии:
1. Помимо того, что на самом деле это не работает, я не уверен, как вы можете это замкнуть. Интересно посмотреть, что скажут другие.
Ответ №1:
В зависимости от ваших данных, например, если у вас много foo
значений, приведенный ниже код может работать более эффективно:
val colsToDrop = df.columns.filter{ c =>
!df.where(col(c).like("foo")).limit(1).isEmpty
}
df.drop(colsToDrop: _*)
ОБНОВЛЕНИЕ: Удалено избыточное .limit(1)
:
val colsToDrop = df.columns.filter{ c =>
!df.where(col(c).like("foo")).isEmpty
}
df.drop(colsToDrop: _*)
Комментарии:
1. Ограничить набор результатов или поиск?
2. Я думаю
.limit(1)
, это не нужно.3. @LeoC Почему это? Как, по вашему мнению, работает ограничение?
4.
myDF.limit(1)
является ли фрейм данных из 1 строки, еслиmyDF
он непустой, 0 строк в противном случае. СледовательноmyDF.limit(1).isEmpty
, допустимо для проверки, является лиmyDF
оно пустым или нет. Мне это кажется излишним.5. Это хороший момент @LeoC. Вот реализация isEmpty:
def isEmpty: Boolean = withAction("isEmpty", limit(1).groupBy().count().queryExecution) { plan => plan.executeCollect().head.getLong(0) == 0 }
Ответ №2:
Ответ, следующий вашей логике (разработан правильно), но я думаю, что другой ответ лучше, тем более для потомков и ваших улучшенных возможностей с Scala. Я не уверен, что другой ответ на самом деле эффективен, но и это не так. Не уверен, поможет ли parquet, сложно оценить.
Другой вариант — написать цикл в драйвере и получить доступ к каждому столбцу, а затем parquet будет полезен из-за columnar, stats и push down.
import org.apache.spark.sql.functions._
def myUDF = udf((cols: Seq[String], cmp: String) => cols.map(code => if (code == cmp) true else false ))
val df = sc.parallelize(Seq(
("foo", "abc", "sss"),
("bar", "fff", "sss"),
("foo", "foo", "ddd"),
("bar", "ddd", "ddd")
)).toDF("a", "b", "c")
val res = df.select($"*", array(df.columns.map(col): _*).as("colN"))
.withColumn( "colres", myUDF( col("colN") , lit("foo") ) )
res.show()
res.printSchema()
val n = 3
val res2 = res.select( (0 until n).map(i => col("colres")(i).alias(s"c${i 1}")): _*)
res2.show(false)
val exprs = res2.columns.map( c => max(c).alias(c))
val drop = res2.agg(exprs.head, exprs.tail: _*)
drop.show(false)