Фабрика данных Azure — как читать только последний набор данных в паркете формата Delta, созданном из блоков данных?

#azure #azure-data-factory #azure-data-factory-2

#azure #azure-data-factory #azure-data-factory-2

Вопрос:

Чтобы было понятно, что такое формат, вот как фрейм данных сохраняется в блоках данных:

 folderpath = "abfss://container@storage.dfs.core.windows.net/folder/path"
df.write.format("delta").mode("overwrite").save(folderPath)
 

В результате создается набор файлов Parquet (часто по 2-4 фрагмента), которые находятся в основной папке, с папкой _delta_log, которая содержит файлы, описывающие загрузку данных.
Папка журнала delta определяет, какой набор файлов Parquet в папке должен быть прочитан.

В Databricks я бы прочитал последний набор данных для exmaple, выполнив следующее:

 df = spark.read.format("delta").load(folderpath)
 

Как я могу это сделать на фабрике данных Azure?
Я выбрал Azure Data Lake Gen 2, затем формат Parquet, однако это, похоже, не работает, поскольку я получаю весь набор прочитанных наборов (т. Е. Все Наборы данных), а не Только последние.

Как я могу правильно это настроить?

Комментарии:

1. Какой последний набор данных? Это последний измененный файл?

2. Это будет то, что создает последний прогон блоков данных и сохраняется в хранилище.

3. Привет @user3012708, еще один вопрос, вы хотите добиться этого с помощью pipeline actives? Поскольку Data factory поддерживает запуск скриптов databrick с помощью python или notebook. По моему опыту, этого очень сложно добиться.

4. Должно быть с конвейером, нельзя просто запустить ноутбук или что-то в этом роде, к сожалению. По сути, я бы считывал данные из Azure Data Lake Storage gen 2, но мне нужны только последние данные оттуда. Это каким-то образом определяется с помощью файлов _delta_log, но я не знаю, как ADF будет их читать, поскольку, похоже, он считывает все файлы parquet вместе.

5. Я делюсь с вами своими идеями в ответе, надеюсь, это полезно для вас.

Ответ №1:

С конвейером фабрики данных, похоже, этого добиться сложно. Но у меня есть несколько идей для вас:

  1. Используйте активный поиск, чтобы получить содержимое файла delta_log. Если файлов много, используйте get metadata, чтобы получить схему всех файлов (дата последнего изменения).
  2. Используйте условие if active или swich active для фильтрации последних данных.
  3. После фильтрации данных передайте выходные данные поиска, чтобы установить активный источник копирования (установить в качестве параметра).

Самое сложное, что вам нужно выяснить, как фильтровать последний набор данных с помощью delta_log. Вы могли бы попробовать этот способ, весь рабочий процесс должен понравиться, но я не могу сказать вам, действительно ли это работает. Я не мог протестировать это для вас без такой же среды.

HTP.