#python #apache-spark #pyspark #databricks #delta-lake
Вопрос:
У меня есть фрейм данных со строками, которые будут сохранены в разных целевых таблицах. Прямо сейчас я нахожу уникальную комбинацию параметров для определения целевой таблицы, перебираю фрейм данных и фильтрую, а затем пишу.
Что-то похожее на это:
df = spark.load.json(directory).repartition('client', 'region')
unique_clients_regions = [(group.client, group.region) for group in df.select('client', 'region').distinct().collect()]
for client, region in unique_clients_regions:
(df
.filter(f"client = '{client}' and region = '{region}'")
.select(
...
)
.write.mode("append")
.saveAsTable(f"{client}_{region}_data")
)
Есть ли способ сопоставить операцию записи с разными groupBy
группами вместо того, чтобы выполнять итерацию по отдельному набору? Я позаботился о повторном разделении client
и region
попытался ускорить работу фильтра.
Ответ №1:
Я не могу с чистой совестью посоветовать вам что-либо, используя это решение. На самом деле это действительно плохая архитектура данных.
У вас должна быть только одна таблица и раздел по клиентам и регионам. Это создаст разные папки для каждой пары клиентов/регионов. И вам нужна только одна запись в конце, и никакого цикла или сбора.
spark.read.json(directory).write.saveAsTable(
"data",
mode="append",
partitionBy=['client', 'region']
)
Комментарии:
1. Спасибо @Steven, в процессе изучения Spark и такого рода архитектуры. Проблема в том, что есть разные столбцы/схемы выше по течению для каждого клиента/региона, которые могут изменяться с течением времени, поэтому разные таблицы для каждой имеют смысл.
2. @TomNash если входным файлом является json-файл unic, это означает, что схема одинакова для каждой пары клиент/регион. Так что не нужно иметь несколько таблиц.
3. Это не. В этом-то и проблема. Несколько записей JSON с различными схемами в одном файле. Пытаюсь справиться с каждым в отдельности.
4. @TomNash Это не то, как ты с ними обращаешься. С того момента , как вы это сделаете
df = spark.load.json(directory)
, df будет иметь глобальную схему, объединяющую все имеющиеся у вас json. Это означает, что итоговая таблица также будет иметь эту единую схему. Вы можете фильтровать столько, сколько хотите, это не изменит схему.5. Да, с этой проблемой мы столкнулись, я могу
spark.load.text
, а затем обработать ихget_json_object
в отфильтрованных наборах, чтобы получить локализованную схему для этой группы, хотя и без определения глобальной. По-прежнему остается вопрос о том, как записывать строки одного кадра данных в разные места назначения без зацикливания. Альтернатива состоит в том, чтобы загружать по одному файлу за раз в свой собственный фрейм данных и записывать в таблицу по одной строке за раз.