#python #apache-spark #azure-data-lake #databricks #google-api-python-client
#python #apache-spark #azure-data-lake #databricks #google-api-python-client
Вопрос:
Я возвращаю данные об использовании пользователей из Api отчетов об использовании пользователей Google Admin Report через Python SDK в Databricks. Размер данных составляет около 100 000 записей в день, которые я делаю за ночь с помощью пакетного процесса. API возвращает максимальный размер страницы 1000, поэтому я называю его 1000 примерно, чтобы получить данные, которые мне нужны на день. Это работает нормально.
Моя конечная цель — сохранить данные в необработанном формате в озере данных (Azure Gen2, но не имеет отношения к этому вопросу). Позже я преобразоваю данные с помощью Databricks в модель агрегированной отчетности и добавлю поверх нее PowerBI, чтобы отслеживать использование приложений Google с течением времени.
Как программист на C #, я новичок в Python и Spark: мой текущий подход заключается в том, чтобы запросить первую страницу из 1000 записей из API, а затем записать ее в datalake напрямую в виде файла JSON, затем получить следующий набор страниц и записать его тоже. Структура папок будет примерно такой: » raw googleuser ГГГГ ММ ДД data1.json».
Я хотел бы сохранить данные в максимально возможной форме в необработанной зоне и не применять слишком много преобразований. 2-й процесс может извлекать нужные мне поля, помечать их метаданными и записывать их обратно как готовые к использованию функции. Вот почему я подумываю о том, чтобы записать его в формате JSON.
Это означает, что 2-му процессу необходимо прочитать JSON в dataframe, где я могу преобразовать его и записать как parquet (эта часть также проста).
Поскольку я использую Api Google, я не работаю с Json — он возвращает объекты dict (со сложной вложенностью). Я могу извлечь его в виде строки Json с помощью json.dump(), но я не могу понять, как записать СТРОКУ непосредственно в мой datalake. Как только я получу его в dataframe, я могу легко записать его в любом формате, однако преобразование его из Json в dataframe, а затем, по сути, обратно в Json, чтобы просто записать его, требует затрат производительности.
Вот то, что я пробовал, и результаты:
- Создайте список pyspark.sql.Rows и в конце всей подкачки (100 тыс. строк) — используйте spark.createDataFrame(строки), чтобы превратить его в dataframe. Как только это фрейм данных, я могу сохранить его как файл Json. Это работает, но кажется неэффективным.
-
Используйте json.dump(запрос), чтобы получить строку из 1000 записей в формате Json. Я могу записать его в файловую систему Databricks, используя этот код:
with open("/dbfs/tmp/googleuserusagejsonoutput-{0}.json" .format(keyDateFilter), 'w') as f:
f.write(json.dumps(response))Однако затем мне нужно переместить его в мое озеро данных Azure с помощью:
dbutils.fs.cp("/tmp/test_dbfs1.txt", datalake_path dbfs_path "xyz.json")
Затем я получаю следующие 1000 записей и продолжаю это делать. Кажется, я не могу использовать каталог метода open () для хранилища озера данных (драйвер Azure abfss), иначе это было бы достойным решением. Кажется хрупким и странным сначала выгружать его локально, а затем перемещать.
-
То же, что и вариант 1, но сбрасывайте фрейм данных в datalake каждые 1000 записей и перезаписывайте его (чтобы объем памяти не увеличивался более чем на 1000 записей одновременно)
-
Игнорируйте правило сброса необработанного Json. Преобразуйте данные в самый простой формат, который я хочу, и избавьтесь от всех лишних данных, которые мне не нужны. Это привело бы к гораздо меньшему объему, а затем был бы выполнен вариант 1 или 3 выше. (Это второй вопрос — принцип сохранения всех данных из Api в его необработанном формате, чтобы по мере изменения требований со временем у меня всегда были исторические данные в озере данных, и я мог просто изменить процедуры преобразования, чтобы извлечь из них разные показатели. Поэтому я неохотно отбрасываю какие-либо данные на данном этапе.
Пожалуйста, оцените любые советы…
Ответ №1:
Подключите lake к вашей среде databricks, чтобы вы могли просто сохранить его в lake, как если бы это была обычная папка:
with open('/dbfs/mnt/mydatalake/googleuserusagejsonoutput-{0}.json', 'wb') as f:
json.dump(data, codecs.getwriter('utf-8')(f), sort_keys = True, indent = 4, ensure_ascii=False)
f.close()
Вам нужно только смонтировать озеро один раз:
При этом,
Хранение больших данных в формате json не является оптимальным; для каждого значения (ячейки) вы сохраняете ключ (имя столбца), поэтому ваши данные будут намного больше, чем нужно. Кроме того, у вас, вероятно, должна быть функция удаления дублирования, чтобы гарантировать, что (1) в данных нет пробелов, и (2) вы не храните одни и те же данные в нескольких файлах. Databricks delta позаботится об этом.
Комментарии:
1. Спасибо, я попробую смонтировать DL и писать напрямую. Есть ли у вас какие-либо рекомендации по общему подходу? Например, вы упомянули, что хранение больших данных в формате Json не является оптимальным. Что считается большим? Эти файлы размером по 12 мегабайт каждый (содержат 1000 записей — каждая запись имеет 188 параметров). Я мог бы выполнить некоторую обработку данных заранее, чтобы сократить этот размер и преобразовать данные в столбчатый формат (а затем я мог бы сохранить их как parquet), однако я думал, что принцип необработанной зоны заключается в том, чтобы выгружать данные точно так, как они были получены из исходной системы, и обрабатывать их в виде таблицы.метод отслеживания. Мысли?
2. Хороший вопрос. Это зависит от того, насколько чувствительны ваши данные к потере данных. Я не использовал API Google, но я полагаю, что вы можете запросить его снова, если данные неверны или вы перепутали преобразование. Вы можете абсолютно пойти по этому пути, лично я бы не стал рассматривать формат файла как преобразование, которое должно произойти после приземления, если я могу легко получить доступ к данным снова, если простое преобразование файла неверно. Если нет возможности получить данные снова, обязательно сохраните весь JSON в landing. Пока все данные есть, я не думаю, что формат имеет значение.
3. Потрясающе. Сейчас это работает — очень запутанно, но этот фрагмент текста подводит итог: «Когда вы используете Spark API, вы ссылаетесь на файлы с помощью «/mnt/ training/file.csv» или «dbfs:/mnt/ training/file.csv». Если вы используете локальные файловые API, вы должны указать путь в /dbfs , например: «/dbfs/mnt/training/file.csv». Вы не можете использовать путь в dbfs при использовании Spark API «.. docs.databricks.com/user-guide/dbfs-databricks-file-system.html . Поэтому иногда вам нужно выполнить /mtn/ для Spark Api, а затем для локальных API вам нужно использовать /dbfs/mnt/ .
4. WRT выравнивание необработанного json: API Google работает всего 1,5 года на постоянной основе, поэтому я думаю, что постараюсь сохранить его как есть (и извлечь все возможные поля, которые я могу, нужны они мне или нет в данный момент). Тогда в будущем у меня, по крайней мере, будут все данные в исходном формате, начиная с того момента, когда я начал этот процесс, но я принимаю вашу точку зрения о том, что формат файла не считается преобразованием. Спасибо за помощь.
5. Да, при открытии файла с помощью spark уже просматривается dbfs: spark.read.json(«/mnt/pathandfile.json»), но вы должны добавить /dbfs перед mnt при открытии файла с помощью python, как в примере.