Оптимизация производительности «с помощью столбца, когда в противном случае» в pyspark

#apache-spark #pyspark #apache-spark-sql #databricks

Вопрос:

Я работаю над проектом с pyspark на базе данных . У меня есть часть кода (ниже), которая переформатирует строку на основе даты (французский).

Существующий код, помимо того, что он многословен, вызывает некоторые проблемы с производительностью, такие как :

  • невозможность отображения кадра данных, наличие постоянной «запущенной команды»
  • вызывает «Драйвер включен, но не реагирует, вероятно, из-за GC».

В этом проекте используются только csv-файлы (для чтения и записи). База данных не используется.

Я пытаюсь лучше справиться с задачей форматирования, чтобы избежать проблем с производительностью и памятью. Есть какие-нибудь предложения?

Большое спасибо !

 courriers = courriers.withColumn('Vague',when(courriers["Vague"] == "XXX Janvier 2020","XXX0120").otherwise(courriers["Vague"]))
courriers = courriers.withColumn('Vague',when(courriers["Vague"] == "XXX Fevrier 2020","XXX0220").otherwise(courriers["Vague"]))
courriers = courriers.withColumn('Vague',when(courriers["Vague"] == "XXX Mars 2020","XXX0320").otherwise(courriers["Vague"]))
courriers = courriers.withColumn('Vague',when(courriers["Vague"] == "XXX Avril 2020","XXX0420").otherwise(courriers["Vague"]))
courriers = courriers.withColumn('Vague',when(courriers["Vague"] == "XXX Mai 2020","XXX0520").otherwise(courriers["Vague"]))
courriers = courriers.withColumn('Vague',when(courriers["Vague"] == "XXX Juin 2020","XXX0620").otherwise(courriers["Vague"]))
courriers = courriers.withColumn('Vague',when(courriers["Vague"] == "XXX Juillet 2020","XXX0720").otherwise(courriers["Vague"]))
courriers = courriers.withColumn('Vague',when(courriers["Vague"] == "XXX Aout 2020","XXX0820").otherwise(courriers["Vague"]))
courriers = courriers.withColumn('Vague',when(courriers["Vague"] == "XXX Septembre 2020","XXX0920").otherwise(courriers["Vague"]))
courriers = courriers.withColumn('Vague',when(courriers["Vague"] == "XXX Octobre 2020","XXX1020").otherwise(courriers["Vague"]))
courriers = courriers.withColumn('Vague',when(courriers["Vague"] == "XXX Novembre 2020","XXX1120").otherwise(courriers["Vague"]))
courriers = courriers.withColumn('Vague',when(courriers["Vague"] == "XXX Decembre 2020","XXX1220").otherwise(courriers["Vague"]))
courriers = courriers.withColumn('Vague',when(courriers["Vague"] == "XXX Janvier 2021","XXX0121").otherwise(courriers["Vague"]))
courriers = courriers.withColumn('Vague',when(courriers["Vague"] == "XXX Fevrier 2021","XXX0221").otherwise(courriers["Vague"]))
courriers = courriers.withColumn('Vague',when(courriers["Vague"] == "XXX Mars 2021","XXX0321").otherwise(courriers["Vague"]))
courriers = courriers.withColumn('Vague',when(courriers["Vague"] == "XXX Avril 2021","XXX0421").otherwise(courriers["Vague"]))
courriers = courriers.withColumn('Vague',when(courriers["Vague"] == "XXX Mai 2021","XXX0521").otherwise(courriers["Vague"]))
courriers = courriers.withColumn('Vague',when(courriers["Vague"] == "XXX Juin 2021","XXX0621").otherwise(courriers["Vague"]))
courriers = courriers.withColumn('Vague',when(courriers["Vague"] == "XXX Juillet 2021","XXX0721").otherwise(courriers["Vague"]))
courriers = courriers.withColumn('Vague',when(courriers["Vague"] == "XXX Aout 2021","XXX0821").otherwise(courriers["Vague"]))
courriers = courriers.withColumn('Vague',when(courriers["Vague"] == "XXX Septembre 2021","XXX0921").otherwise(courriers["Vague"]))
courriers = courriers.withColumn('Vague',when(courriers["Vague"] == "XXX Octobre 2021","XXX1021").otherwise(courriers["Vague"]))
courriers = courriers.withColumn('Vague',when(courriers["Vague"] == "XXX Novembre 2021","XXX1121").otherwise(courriers["Vague"]))
courriers = courriers.withColumn('Vague',when(courriers["Vague"] == "XXX Decembre 2021","XXX1221").otherwise(courriers["Vague"]))
 

Ответ №1:

Гораздо проще программно сгенерировать полное условие, вместо того, чтобы применять его по одному. Он withColumn хорошо известен своей плохой производительностью при большом количестве его использования.

Самым простым способом будет определить отображение и сгенерировать из него условие, например:

 dates = {"XXX Janvier 2020":"XXX0120",
         "XXX Fevrier 2020":"XXX0220",
         "XXX Mars 2020":"XXX0320",
         "XXX Avril 2020":"XXX0420",
         "XXX Mai 2020":"XXX0520",
         "XXX Juin 2020":"XXX0620",
         "XXX Juillet 2020":"XXX0720",
         "XXX Aout 2020":"XXX0820",
         "XXX Septembre 2020":"XXX0920",
         "XXX Octobre 2020":"XXX1020",
         "XXX Novembre 2020":"XXX1120",
         "XXX Decembre 2020":"XXX1220",
         "XXX Janvier 2021":"XXX0121",
         "XXX Fevrier 2021":"XXX0221",
         "XXX Mars 2021":"XXX0321",
         "XXX Avril 2021":"XXX0421",
         "XXX Mai 2021":"XXX0521",
         "XXX Juin 2021":"XXX0621",
         "XXX Juillet 2021":"XXX0721",
         "XXX Aout 2021":"XXX0821",
         "XXX Septembre 2021":"XXX0921",
         "XXX Octobre 2021":"XXX1021",
         "XXX Novembre 2021":"XXX1121",
         "XXX Decembre 2021":"XXX1221"
         }
 

и из этого мы можем сгенерировать наше условие для всех возможных значений:

 import pyspark.sql.functions as F
cl = None
for k,v in dates.items():
    if cl is None:
        cl = F.when(F.col("Vague") == k, F.lit(v))
    else:
        cl = cl.when(F.col("Vague") == k, F.lit(v))

cl = cl.otherwise(F.col("Vague")).alias("Vague")
 

и это может быть использовано следующим образом:

 df = spark.createDataFrame([["XXX Fevrier 2021"], ["22332"]], 
   schema="Vague string")
df.select(cl).show()
 

что дает нам ожидаемый результат:

  ------- 
|  Vague|
 ------- 
|XXX0221|
|  22332|
 ------- 
 

В идеале его можно было бы обобщить для работы с любым годом, используя регулярные выражения, например:

 dates = {"XXX Janvier 20(d{2})":"XXX01$1",
         "XXX Fevrier 20(d{2})":"XXX02$1",
         "XXX Mars 20(d{2})":"XXX03$1",
         "XXX Avril 20(d{2})":"XXX04$1",
         "XXX Mai 20(d{2})":"XXX05$1",
         "XXX Juin 20(d{2})":"XXX06$1",
         "XXX Juillet 20(d{2})":"XXX07$1",
         "XXX Aout 20(d{2})":"XXX08$1",
         "XXX Septembre 20(d{2})":"XXX09$1",
         "XXX Octobre 20(d{2})":"XXX10$1",
         "XXX Novembre 20(d{2})":"XXX11$1",
         "XXX Decembre 20(d{2})":"XXX12$1",
         }

cl = None
for k,v in dates.items():
    if cl is None:
        cl = F.regexp_replace(F.col("Vague"), k, v)
    else:
        cl = F.regexp_replace(cl, k, v)

cl = cl.alias("Vague")
 

и это даст тот же результат, но будет работать с любым годом в 21 веке

Ответ №2:

Другим решением может быть использование mapType

 from pyspark.sql.functions import col, create_map, lit,split,concat
from itertools import chain
df = spark.createDataFrame([["XXX Fevrier 2021"], ["XXX Aout 2021"]], 
   schema="Vague string")

# Create a dict only for the given months
mapping = {
    "Janvier":"01",
    "Fevrier": "02",
    "Mars": "03",
    "Avril": "04",
    "Mai": "05",
    "Juin": "06",
    "Juillet": "07",
    "Aout": "08",
    "Septembre": "09",
    "Octobre": "10",
    "Novembre": "11",
    "Decembre": "12"}

# Create the mapping
mapping_expr = create_map([lit(x) for x in chain(*mapping.items())])

res = (
  df.withColumn("value", concat(
      split(col("Vague"),' ')[0] 
    , mapping_expr.getItem(split(col("Vague"),' ')[1])
    , concat(split(col("Vague"),' ')[2][3:4])))
)

res.show()
 

что обеспечивает ожидаемый результат

  ---------------- ------- 
|           Vague|  value|
 ---------------- ------- 
|XXX Fevrier 2021|XXX0221|
|   XXX Aout 2021|XXX0821|
 ---------------- -------