Список столбцов, удовлетворяющих определенному условию

#python #apache-spark #pyspark #apache-spark-sql

#python #apache-spark #pyspark #apache-spark-sql

Вопрос:

Работая с Python 3.7.9, Spark 2.4.5, я пытаюсь вручную «попробовать проанализировать» заданное подмножество столбцов из строки в целое число, а затем добавить два дополнительных столбца в фрейм данных:

  • _num_invalid_columns: с количеством столбцов, для которых не удалось выполнить синтаксический анализ (обозначается как -9999)
  • _invalid_colums_list: список столбцов, не прошедших синтаксический анализ, разделенных запятыми или разделенных каналами

Я смог вычислить «_num_invalid_columns», но у меня возникли проблемы с «_invalid_columns_list». Код для воспроизведения ниже я максимально сократил.

 '''
Uncomment these 2 lines if using Jupyter Notebook
import findspark
findspark.init()
'''
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
from pyspark.sql import functions as F

def tryparse_integer(integer_str):    
    '''
    if integer_str is None returns None as IntegerType()
    if integer_str is not None and cannot be parsed to integer, returns -9999 as IntegerType()
    if integer_str is not None and can be parsed to integer, returns integer_str as IntegerType()
    (that way we can tell between null data and invalid data)
    '''
    return F.when(F.isnull(integer_str), F.lit(None).cast(IntegerType())) 
        .otherwise( 
            F.when(F.isnull(integer_str.cast(IntegerType())), F.lit(-9999).cast(IntegerType())) 
            .otherwise(integer_str.cast(IntegerType())) 
        ) 
        
def is_invalid_number(col):
    return F.when(col == -9999, 1).otherwise(0)

spark = SparkSession.builder.appName("RandallTest").getOrCreate()

data = [('1', '2','hello'), ('error','error','hello'), ('error','2','hello')]

schema = StructType([
  StructField('column1', StringType()),
  StructField('column2', StringType()),
  StructField('column3', StringType())
  ])

df = spark.createDataFrame(data, schema = schema)

df.printSchema()

integerColumns = ['column1','column2']

df_parsed = df.select(*[
    tryparse_integer(F.col(colName)).alias(colName) if (colName in integerColumns)
    else colName
    for colName in df.columns])  

df_parsed.printSchema()

df_parsed_with_errorcount = df_parsed 
    .withColumn('_num_invalid_columns', sum(
    is_invalid_number(F.col(colName)) if (colName in integerColumns) 
    else 0
    for colName in df_parsed.columns)) 
    .withColumn('_invalid_columns_list', F.lit('--'.join(filter(None, (
    ##Not what I need, but works:
    colName if (colName in integerColumns)  
    ##Not working if I uncomment the actual logic I want. Something like any of these lines (59, 60 or 61 all produce errors)
    ##colName if (colName in integerColumns and F.col(colName) == -9999)
    ##colName if (colName in integerColumns amp; F.col(colName) == -9999)
    ##colName if (colName in integerColumns amp; is_invalid_number(F.col(colName)) == 1)
    else None
    for colName in df_parsed.columns)))))   

df_parsed_with_errorcount.show()
df_parsed_with_errorcount.take(10)
 

Пример ввода:

 column1     column2     column3     
'1'         '2'         'hello'        
'error'     'error'     'hello'     
'error'     '2'         'hello'     
 

Столбцы для «попытки анализа»: column1, column2

Ожидаемый результат:

 column1     column2     column3     _num_invalid_columns    _invalid_columns_list
1           2           'hello'     0   
-9999       -9999       'hello'     2                       column1,column2
-9999       2           'hello'     1                       column1
 

Ответ №1:

Используется F.lit(colName) для помещения имен столбцов в фрейм данных:

 df_parsed_with_errorcount = df_parsed.withColumn(
    '_invalid_columns_list',
    F.concat_ws(
        ',',
        *[F.when(is_invalid_number(F.col(colName)) == 1, F.lit(colName)) for colName in df_parsed.columns]
    )
)

df_parsed_with_errorcount.show()
 ------- ------- ------- --------------------- 
|column1|column2|column3|_invalid_columns_list|
 ------- ------- ------- --------------------- 
|      1|      2|  hello|                     |
|  -9999|  -9999|  hello|      column1,column2|
|  -9999|      2|  hello|              column1|
 ------- ------- ------- --------------------- 
 

Комментарии:

1. спасибо, есть ли способ обойти использование F.хотя, что-то похожее на «cased», которое я использовал для «суммы»? В моем реальном варианте использования было бы больше, чем целочисленные «попытки синтаксического анализа», например, мои фактические _num_invalid_columns выглядят так: df_parsed_with_errorcount = df_parsed.withColumn(‘_num_invalid_columns’, sum( is_invalid_date(F.col(colName)) if (colName в dateColumns.keys()) else is_invalid_number(F. col(имя столбца)) if (colName в списке(set().union(decimal164Columns, decimal96Columns, decimal84Columns, integerColumns))) else 0 для colName в df_parsed.columns))

Ответ №2:

Расширяясь после ввода из mck, мой окончательный код выглядит следующим образом:

Сначала модификация вспомогательной функции, чтобы сделать ее, так сказать, «безопасной для типов»

 def is_invalid_number(col):
    ##return F.when(col == -9999, 1).otherwise(0) --> does not work, no type safety
    return F.when(col.cast(StringType()) == '-9999', 1).otherwise(0)
 

А затем фактическое вычисление столбца с использованием concat_ws() array_contains() моя вспомогательная функция

 df_parsed_with_errorcount = df_parsed 
    .withColumn('_num_invalid_columns', sum(
    is_invalid_number(F.col(colName)) if (colName in integerColumns) 
    else 0
    for colName in df_parsed.columns)) 
    .withColumn('_invalid_columns_list', F.concat_ws(',', *[ 
    (F.when(F.array_contains(F.array([F.lit(x) for x in integerColumns]), colName), F.when(is_invalid_number(F.col(colName)) == 1, F.lit(colName))) 
    .otherwise(F.lit(None))
    ) for colName in df_parsed.columns]))
 

Из того, что я могу сказать из сообщений об ошибках, показанных перед тем, как сделать безопасный тип вспомогательной функции is_invalid_number ; Spark не гарантирует порядок выполнения условий в when(), даже после вложения одной функции when() в другую, в отличие от использования одной функции when() с двумя условиямиразделенных amp; (и)