Получить список загруженных файлов из автозагрузчика Databricks

#databricks #databricks-autoloader

#databricks #Автозагрузчик databricks-

Вопрос:

Мы можем использовать автозагрузчик для отслеживания файлов, которые были загружены из корзины S3 или нет. Мой вопрос о автозагрузчике: есть ли способ прочитать базу данных автозагрузчика, чтобы получить список загруженных файлов?

Я могу легко сделать это в закладке AWS Glue job, но я не знаю, как это сделать в автозагрузчике Databricks.

Комментарии:

1. Могу ли я получить ссылку на возможность создания закладок AWS Glue job, которую вы ищете. В задание клея добавляется много кода для создания закладки. Вы можете увидеть ниже, что код автозагрузчика прост, всего два оператора

Ответ №1:

 .load("path")
.withColumn("filePath",input_file_name())
 

затем вы можете, например, вставить путь к файлу в свой потоковый приемник и получить оттуда отдельное значение или использовать forEatch / forEatchBatch и, например, вставить его в таблицу spark sql

Ответ №2:

Вы можете получать уведомления о файлах, загруженных в S3, используя структурную потоковую передачу. Для уже загруженных файлов можно проверить путь назначения s3_output_path.

     df = (spark.readStream.format('cloudFiles') 
    .option("cloudFiles.format",    "json") 
    .option("cloudFiles.region", "<aws region>) 
    .option("cloudFiles.awsAccessKey",<ACCESS_KEY>) 
    .option("cloudFiles.awsSecretKey", <SECRET_KEY>) 
   .option ("cloudFiles.useNotifications", "true") 
   .load(<s3_path>))

    df.writeStream.format('delta').outputMode("append") 
      .option("checkpointLocation", <checkpoint_path>) 
      .start(<s3_output_path>)
 

Комментарии:

1. это просто обработка файлов, а не получение списка обработанных файлов, который задает автор вопроса

2. опция («CloudFiles.useNotifications», «true») в приведенном выше коде будет подписываться на уведомления о загруженных файлах.

3. Я знаю, что это значит. Вопрос заключался в том, как получить файлы, которые уже были загружены автозагрузчиком

4. Я опубликовал комментарий выше, чтобы узнать больше о вопросе выше, я вижу, что функция автозагрузчика имеет все необходимые функции для загрузки только новых файлов. уже загруженные файлы могут быть известны, если мы записываем в целевое хранилище после чтения из исходного хранилища.

Ответ №3:

Если вы используете опцию checkpointLocation, вы можете прочитать все файлы, которые были обработаны, прочитав журналы RocksDB. Несколько примеров кода для достижения этой цели обратите внимание, что вам нужно указать путь к местоположению контрольной точки, по которому вы хотите получить список загруженных файлов.

 from glob import glob
import codecs

directory = "<YOUR_PATH_GOES_HERE>/sources/*/rocksdb/logs/"
for file in glob(f"{directory}/*.log"):
    with codecs.open(file, encoding='utf-8', errors='ignore') as f:
        f = f.readlines()
        print(f)
 

PS.: Журналы должны быть проанализированы должным образом, чтобы получить только имена файлов.