PySpark: ошибка типа: условие должно быть строкой или столбцом

#python #apache-spark #dataframe #pyspark #apache-spark-sql

#python #apache-spark #фрейм данных #pyspark #apache-spark-sql

Вопрос:

Я пытаюсь отфильтровать RDD на основе, как показано ниже:

 spark_df = sc.createDataFrame(pandas_df)
spark_df.filter(lambda r: str(r['target']).startswith('good'))
spark_df.take(5)
  

Но получил следующие ошибки:

 TypeErrorTraceback (most recent call last)
<ipython-input-8-86cfb363dd8b> in <module>()
      1 spark_df = sc.createDataFrame(pandas_df)
----> 2 spark_df.filter(lambda r: str(r['target']).startswith('good'))
      3 spark_df.take(5)

/usr/local/spark-latest/python/pyspark/sql/dataframe.py in filter(self, condition)
    904             jdf = self._jdf.filter(condition._jc)
    905         else:
--> 906             raise TypeError("condition should be string or Column")
    907         return DataFrame(jdf, self.sql_ctx)
    908 

TypeError: condition should be string or Column
  

Есть идеи, что я пропустил? Спасибо!

Комментарии:

1. Прямо здесь есть идеальный ответ 😉

Ответ №1:

DataFrame.filter , которое является псевдонимом для DataFrame.where , ожидает выражения SQL, выраженного либо как Column :

 spark_df.filter(col("target").like("good%"))
  

или эквивалентная строка SQL:

 spark_df.filter("target LIKE 'good%'")
  

Я полагаю, что вы пытаетесь здесь использовать RDD.filter совершенно другой метод:

 spark_df.rdd.filter(lambda r: r['target'].startswith('good'))
  

и не выигрывает от оптимизации SQL.

Ответ №2:

Я прошел через это и решил использовать UDF:

 from pyspark.sql.functions import udf
from pyspark.sql.types import BooleanType

filtered_df = spark_df.filter(udf(lambda target: target.startswith('good'), 
                                  BooleanType())(spark_df.target))
  

Более читаемым было бы использовать обычное определение функции вместо лямбда

Ответ №3:

преобразуйте фрейм данных в rdd.

 spark_df = sc.createDataFrame(pandas_df)
spark_df.rdd.filter(lambda r: str(r['target']).startswith('good'))
spark_df.take(5)
  

Я думаю, это может сработать!

Комментарии:

1. Это может сработать, конечно. Но оптимизация, которую обеспечивают фреймы данных, будет потеряна