Сопоставьте операцию записи групп строк фрейма данных с различными дельта-таблицами

#python #apache-spark #pyspark #databricks #delta-lake

Вопрос:

У меня есть фрейм данных со строками, которые будут сохранены в разных целевых таблицах. Прямо сейчас я нахожу уникальную комбинацию параметров для определения целевой таблицы, перебираю фрейм данных и фильтрую, а затем пишу.

Что-то похожее на это:

 df = spark.load.json(directory).repartition('client', 'region')

unique_clients_regions = [(group.client, group.region) for group in df.select('client', 'region').distinct().collect()]

for client, region in unique_clients_regions:
  (df
   .filter(f"client = '{client}' and region = '{region}'")
   .select(
     ...
   )
   .write.mode("append")
   .saveAsTable(f"{client}_{region}_data") 
  )
 

Есть ли способ сопоставить операцию записи с разными groupBy группами вместо того, чтобы выполнять итерацию по отдельному набору? Я позаботился о повторном разделении client и region попытался ускорить работу фильтра.

Ответ №1:

Я не могу с чистой совестью посоветовать вам что-либо, используя это решение. На самом деле это действительно плохая архитектура данных.
У вас должна быть только одна таблица и раздел по клиентам и регионам. Это создаст разные папки для каждой пары клиентов/регионов. И вам нужна только одна запись в конце, и никакого цикла или сбора.

 spark.read.json(directory).write.saveAsTable(
    "data",
    mode="append",
    partitionBy=['client', 'region']
)
 

Комментарии:

1. Спасибо @Steven, в процессе изучения Spark и такого рода архитектуры. Проблема в том, что есть разные столбцы/схемы выше по течению для каждого клиента/региона, которые могут изменяться с течением времени, поэтому разные таблицы для каждой имеют смысл.

2. @TomNash если входным файлом является json-файл unic, это означает, что схема одинакова для каждой пары клиент/регион. Так что не нужно иметь несколько таблиц.

3. Это не. В этом-то и проблема. Несколько записей JSON с различными схемами в одном файле. Пытаюсь справиться с каждым в отдельности.

4. @TomNash Это не то, как ты с ними обращаешься. С того момента , как вы это сделаете df = spark.load.json(directory) , df будет иметь глобальную схему, объединяющую все имеющиеся у вас json. Это означает, что итоговая таблица также будет иметь эту единую схему. Вы можете фильтровать столько, сколько хотите, это не изменит схему.

5. Да, с этой проблемой мы столкнулись, я могу spark.load.text , а затем обработать их get_json_object в отфильтрованных наборах, чтобы получить локализованную схему для этой группы, хотя и без определения глобальной. По-прежнему остается вопрос о том, как записывать строки одного кадра данных в разные места назначения без зацикливания. Альтернатива состоит в том, чтобы загружать по одному файлу за раз в свой собственный фрейм данных и записывать в таблицу по одной строке за раз.