#apache-spark #pyspark
Вопрос:
Я пытаюсь динамически создать словарь с помощью pyspark, прочитав структуру таблиц в базе данных oracle. Вот упрощенная версия моего кода
предопределенный словарь (convert_dict.py)
conversions = {
"COL1": lambda c: f.col(c).cast("string"),
"COL2": lambda c: f.from_unixtime(f.unix_timestamp(c, dateFormat)).cast("date"),
"COL3": lambda c: f.from_unixtime(f.unix_timestamp(c, dateFormat)).cast("date"),
"COL4": lambda c: f.col(c).cast("float")
}
Основная программа
from pyspark.sql import SparkSession
from pyspark.sql import functions as f
from pyspark.sql.types import StructType, StructField, StringType
from convert_dict import conversions
spark = SparkSession.builder.appName("file_testing").getOrCreate()
table_name = "TEST_TABLE"
input_file_path = "file:\c:Desktopfoo.txt"
sql_query = "(select listagg(column_name,',') within group(order by column_id) col from user_tab_columns where "
"table_name = '" table_name "' and column_name not in ('COL10', 'COL11','COL12') order by column_id) table_columns"
struct_schema = StructType([
StructField("COL1", StringType(), True),
StructField("COL2", StringType(), True),
StructField("COL3", StringType(), True),
StructField("COL4", StringType(), True),
])
data_df = spark.read.schema(struct_schema).option("sep", ",").option("header", "true").csv(input_file_path)
validdateData = lines.withColumn(
"dataTypeValidations",
f.concat_ws(",",
*[
f.when(
v(k).isNull() amp; f.col(k).isNotNull(),
f.lit(k " not valid")
).otherwise(f.lit("None"))
for k,v in conversions.items()
]
)
)
data_temp = validdateData
for k,v in conversions.items():
data_temp = data_temp.withColumn(k,v(k))
validateData.show()
spark.stop()
Если я должен изменить приведенный выше код для динамического создания словаря из базы данных
DATEFORMAT = "yyyyMMdd"
dict_sql = """
(select column_name,case when data_type = 'VARCHAR2' then 'string' when data_type in ( 'DATE','TIMESTAMP(6)') then 'date' when data_type = 'NUMBER' and NVL(DATA_SCALE,0) <> 0 then 'float' when data_type = 'NUMBER' and NVL(DATA_SCALE,0) = 0 then 'int'
end d_type from user_tab_columns where table_name = 'TEST_TABLE' and column_name not in ('COL10', 'COL11','COL12')) dict
"""
column_df = spark.read.format("jdbc").option("url",url).option("dbtable", dict_sql)
.option("user",user).option("password",password).option("driver",driver).load()
conversions = {}
for row in column_df.rdd.collect():
column_name = row.COLUMN_NAME
column_type = row.D_TYPE
if column_type == "date":
conversions.update({column_name: lambda c:f.col(c)})
elif column_type == "float":
conversions.update({column_name: lambda c: f.col(c).cast("float")})
elif column_type == "date":
conversions.update({column_name: lambda c: f.from_unixtime(f.unix_timestamp(c, DATEFORMAT)).cast("date")})
elif column_type == "int":
conversions.update({column_name: lambda c: f.col(c).cast("int")})
else:
conversions.update({column_name: lambda c: f.col(c)})
Преобразование типов данных не работает, когда используется приведенный выше динамически сгенерированный словарь. Например: если «COL2» содержит «20210731», результирующие данные из приведенного выше кода остаются прежними, т. е. не преобразуются в правильный формат даты. Где в качестве предопределенного словаря работает правильно.
Я что-то упускаю здесь или есть лучший способ реализовать динамически создаваемые словари в pyspark?
Ответ №1:
В моем коде была ошибка новичка, в блоке «если-то-еще» у меня было два отдельных оператора для column_type == «дата».
Комментарии:
1. используйте раздел комментариев для этого вместо выбора раздела ans