#python #apache-spark #pyspark #apache-spark-sql
#python #apache-spark #pyspark #apache-spark-sql
Вопрос:
Работая с Python 3.7.9, Spark 2.4.5, я пытаюсь вручную «попробовать проанализировать» заданное подмножество столбцов из строки в целое число, а затем добавить два дополнительных столбца в фрейм данных:
- _num_invalid_columns: с количеством столбцов, для которых не удалось выполнить синтаксический анализ (обозначается как -9999)
- _invalid_colums_list: список столбцов, не прошедших синтаксический анализ, разделенных запятыми или разделенных каналами
Я смог вычислить «_num_invalid_columns», но у меня возникли проблемы с «_invalid_columns_list». Код для воспроизведения ниже я максимально сократил.
'''
Uncomment these 2 lines if using Jupyter Notebook
import findspark
findspark.init()
'''
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
from pyspark.sql import functions as F
def tryparse_integer(integer_str):
'''
if integer_str is None returns None as IntegerType()
if integer_str is not None and cannot be parsed to integer, returns -9999 as IntegerType()
if integer_str is not None and can be parsed to integer, returns integer_str as IntegerType()
(that way we can tell between null data and invalid data)
'''
return F.when(F.isnull(integer_str), F.lit(None).cast(IntegerType()))
.otherwise(
F.when(F.isnull(integer_str.cast(IntegerType())), F.lit(-9999).cast(IntegerType()))
.otherwise(integer_str.cast(IntegerType()))
)
def is_invalid_number(col):
return F.when(col == -9999, 1).otherwise(0)
spark = SparkSession.builder.appName("RandallTest").getOrCreate()
data = [('1', '2','hello'), ('error','error','hello'), ('error','2','hello')]
schema = StructType([
StructField('column1', StringType()),
StructField('column2', StringType()),
StructField('column3', StringType())
])
df = spark.createDataFrame(data, schema = schema)
df.printSchema()
integerColumns = ['column1','column2']
df_parsed = df.select(*[
tryparse_integer(F.col(colName)).alias(colName) if (colName in integerColumns)
else colName
for colName in df.columns])
df_parsed.printSchema()
df_parsed_with_errorcount = df_parsed
.withColumn('_num_invalid_columns', sum(
is_invalid_number(F.col(colName)) if (colName in integerColumns)
else 0
for colName in df_parsed.columns))
.withColumn('_invalid_columns_list', F.lit('--'.join(filter(None, (
##Not what I need, but works:
colName if (colName in integerColumns)
##Not working if I uncomment the actual logic I want. Something like any of these lines (59, 60 or 61 all produce errors)
##colName if (colName in integerColumns and F.col(colName) == -9999)
##colName if (colName in integerColumns amp; F.col(colName) == -9999)
##colName if (colName in integerColumns amp; is_invalid_number(F.col(colName)) == 1)
else None
for colName in df_parsed.columns)))))
df_parsed_with_errorcount.show()
df_parsed_with_errorcount.take(10)
Пример ввода:
column1 column2 column3
'1' '2' 'hello'
'error' 'error' 'hello'
'error' '2' 'hello'
Столбцы для «попытки анализа»: column1, column2
Ожидаемый результат:
column1 column2 column3 _num_invalid_columns _invalid_columns_list
1 2 'hello' 0
-9999 -9999 'hello' 2 column1,column2
-9999 2 'hello' 1 column1
Ответ №1:
Используется F.lit(colName)
для помещения имен столбцов в фрейм данных:
df_parsed_with_errorcount = df_parsed.withColumn(
'_invalid_columns_list',
F.concat_ws(
',',
*[F.when(is_invalid_number(F.col(colName)) == 1, F.lit(colName)) for colName in df_parsed.columns]
)
)
df_parsed_with_errorcount.show()
------- ------- ------- ---------------------
|column1|column2|column3|_invalid_columns_list|
------- ------- ------- ---------------------
| 1| 2| hello| |
| -9999| -9999| hello| column1,column2|
| -9999| 2| hello| column1|
------- ------- ------- ---------------------
Комментарии:
1. спасибо, есть ли способ обойти использование F.хотя, что-то похожее на «cased», которое я использовал для «суммы»? В моем реальном варианте использования было бы больше, чем целочисленные «попытки синтаксического анализа», например, мои фактические _num_invalid_columns выглядят так: df_parsed_with_errorcount = df_parsed.withColumn(‘_num_invalid_columns’, sum( is_invalid_date(F.col(colName)) if (colName в dateColumns.keys()) else is_invalid_number(F. col(имя столбца)) if (colName в списке(set().union(decimal164Columns, decimal96Columns, decimal84Columns, integerColumns))) else 0 для colName в df_parsed.columns))
Ответ №2:
Расширяясь после ввода из mck, мой окончательный код выглядит следующим образом:
Сначала модификация вспомогательной функции, чтобы сделать ее, так сказать, «безопасной для типов»
def is_invalid_number(col):
##return F.when(col == -9999, 1).otherwise(0) --> does not work, no type safety
return F.when(col.cast(StringType()) == '-9999', 1).otherwise(0)
А затем фактическое вычисление столбца с использованием concat_ws() array_contains() моя вспомогательная функция
df_parsed_with_errorcount = df_parsed
.withColumn('_num_invalid_columns', sum(
is_invalid_number(F.col(colName)) if (colName in integerColumns)
else 0
for colName in df_parsed.columns))
.withColumn('_invalid_columns_list', F.concat_ws(',', *[
(F.when(F.array_contains(F.array([F.lit(x) for x in integerColumns]), colName), F.when(is_invalid_number(F.col(colName)) == 1, F.lit(colName)))
.otherwise(F.lit(None))
) for colName in df_parsed.columns]))
Из того, что я могу сказать из сообщений об ошибках, показанных перед тем, как сделать безопасный тип вспомогательной функции is_invalid_number ; Spark не гарантирует порядок выполнения условий в when(), даже после вложения одной функции when() в другую, в отличие от использования одной функции when() с двумя условиямиразделенных amp; (и)