pyspark — динамическое приведение типов и псевдонимов с использованием метаданных, определенных в переменной json

#python #dataframe #apache-spark #pyspark #apache-spark-sql

#python #фрейм данных #apache-spark #pyspark #apache-spark-sql

Вопрос:

Использование Apache Spark 3.0.1

У меня есть входящий фрейм данных df_src , подобный следующему.

 df_src = spark.createDataFrame(
        [
            ('1', 'foo', '2020-02-03', '2020-03-24T09:21:20 00:00'),
            ('2', 'bar', '2019-01-29', '2020-01-15T17:00:20 00:00'),
        ],
        ['a_col', 'b_col', 'c_col', 'd_col']
    )
 

введите описание изображения здесь

У меня также есть метаданные (как объект json в массиве), которые необходимо применить, — я хотел бы применить эти метаданные к df_src
Мне нужно сделать 3 вещи

  1. выберите только необходимые столбцы (проекция)
  2. применить тип данных
  3. применить псевдоним

Я попробовал следующее и получил шаги 1 и 2.

 import json
metadata_json = """
[
 {"source_field":"a_col", "alias":"x_col", "datatype":"string"}
,{"source_field":"b_col", "alias":"y_col", "datatype":"timestamp"}
,{"source_field":"c_col", "alias":"z_col", "datatype":"string"}
]
"""

# Transform json input to python objects
metadata_dict = json.loads(metadata_json)

# Filter python objects with list comprehensions
source_fields = [x['source_field'] for x in metadata_dict]
print(source_fields)


# 1) projection - DONE i.e. my incoming data frame had d_col which I do not need - so doing select here based on metadata_json
df = df_src.select(source_fields)

# 2) apply data type - DONE i.e. from metata_json I am selecting fields that needs to be timestamp and casting.
for column in metadata_dict:
  if column['datatype'] == 'timestamp':
    df_dest = df.withColumn(column['source_field'], col(column['source_field']).cast("timestamp"))
    
# 3) apply alias ?
 

введите описание изображения здесь

После шага 2 мой целевой фрейм данных df_dest выглядит так, как указано выше.

Теперь, как мне динамически применить псевдоним на основе metadata_json выше, используя pyspark? (Также, пожалуйста, предложите, есть ли элегантный способ выполнить все 3 шага, я не могу изменить metadata_json)

Ответ №1:

Учитывая ваш входной объект (и простые строки), рассмотрите что-то вроде этого:

 import pyspark.sql.functions as F


# string backticks to protect the names against "." and other characters
input_df.select(
    *[
        F.col(f"`{x["source_field"]}`").cast(x["datatype"]).alias(x["alias"])
        for x in metadata_dict
    ]
)
 

Если ваши строки становятся немного более сложными, простой cast() может не взломать его. Если это так, рассмотрите возможность переноса всего F.col().cast().alias() оператора, реализующего простой шаблон стратегии (или if ... elif ... else переключатель), который может обрабатывать более сложную логику.