Динамический словарь в pyspark

#apache-spark #pyspark

Вопрос:

Я пытаюсь динамически создать словарь с помощью pyspark, прочитав структуру таблиц в базе данных oracle. Вот упрощенная версия моего кода

предопределенный словарь (convert_dict.py)

 conversions = {
    "COL1": lambda c: f.col(c).cast("string"),
    "COL2": lambda c: f.from_unixtime(f.unix_timestamp(c, dateFormat)).cast("date"),
    "COL3": lambda c: f.from_unixtime(f.unix_timestamp(c, dateFormat)).cast("date"),
    "COL4": lambda c: f.col(c).cast("float")
}
 

Основная программа

 from pyspark.sql import SparkSession
from pyspark.sql import functions as f
from pyspark.sql.types import StructType, StructField, StringType
from convert_dict import conversions


spark = SparkSession.builder.appName("file_testing").getOrCreate()

table_name = "TEST_TABLE"
input_file_path = "file:\c:Desktopfoo.txt"

sql_query = "(select listagg(column_name,',') within group(order by column_id) col from user_tab_columns where " 
      "table_name = '"   table_name   "' and column_name not in ('COL10', 'COL11','COL12') order by column_id) table_columns"
      
struct_schema = StructType([
                StructField("COL1", StringType(), True),
                StructField("COL2", StringType(), True),
                StructField("COL3", StringType(), True),
                StructField("COL4", StringType(), True),
                ])
    

data_df = spark.read.schema(struct_schema).option("sep", ",").option("header", "true").csv(input_file_path)

validdateData = lines.withColumn(
                "dataTypeValidations",
                f.concat_ws(",",
                            *[
                               f.when(
                                    v(k).isNull() amp; f.col(k).isNotNull(),
                                    f.lit(k    " not valid") 
                                ).otherwise(f.lit("None"))
                                
                                for k,v in conversions.items()
                            ]
                     )
                 )
                 
data_temp = validdateData
for k,v in conversions.items():
    data_temp = data_temp.withColumn(k,v(k))

validateData.show()

spark.stop()

 

Если я должен изменить приведенный выше код для динамического создания словаря из базы данных

 DATEFORMAT = "yyyyMMdd"
dict_sql = """
(select column_name,case when data_type = 'VARCHAR2' then 'string' when data_type in ( 'DATE','TIMESTAMP(6)') then 'date' when data_type = 'NUMBER' and NVL(DATA_SCALE,0) <> 0 then 'float' when data_type = 'NUMBER' and NVL(DATA_SCALE,0) = 0 then 'int'
end d_type from user_tab_columns where table_name = 'TEST_TABLE' and column_name not in ('COL10', 'COL11','COL12')) dict
"""
column_df = spark.read.format("jdbc").option("url",url).option("dbtable", dict_sql)
    .option("user",user).option("password",password).option("driver",driver).load()


conversions = {}
for row in column_df.rdd.collect():
    column_name = row.COLUMN_NAME
    column_type = row.D_TYPE
    if column_type == "date":
        conversions.update({column_name: lambda c:f.col(c)})
    elif column_type == "float":
        conversions.update({column_name: lambda c: f.col(c).cast("float")})
    elif column_type == "date":
        conversions.update({column_name: lambda c: f.from_unixtime(f.unix_timestamp(c, DATEFORMAT)).cast("date")})
    elif column_type == "int":
        conversions.update({column_name: lambda c: f.col(c).cast("int")})
    else:
        conversions.update({column_name: lambda c: f.col(c)})
 

Преобразование типов данных не работает, когда используется приведенный выше динамически сгенерированный словарь. Например: если «COL2» содержит «20210731», результирующие данные из приведенного выше кода остаются прежними, т. е. не преобразуются в правильный формат даты. Где в качестве предопределенного словаря работает правильно.

Я что-то упускаю здесь или есть лучший способ реализовать динамически создаваемые словари в pyspark?

Ответ №1:

В моем коде была ошибка новичка, в блоке «если-то-еще» у меня было два отдельных оператора для column_type == «дата».

Комментарии:

1. используйте раздел комментариев для этого вместо выбора раздела ans