#databricks #databricks-autoloader
#databricks #Автозагрузчик databricks-
Вопрос:
Мы можем использовать автозагрузчик для отслеживания файлов, которые были загружены из корзины S3 или нет. Мой вопрос о автозагрузчике: есть ли способ прочитать базу данных автозагрузчика, чтобы получить список загруженных файлов?
Я могу легко сделать это в закладке AWS Glue job, но я не знаю, как это сделать в автозагрузчике Databricks.
Комментарии:
1. Могу ли я получить ссылку на возможность создания закладок AWS Glue job, которую вы ищете. В задание клея добавляется много кода для создания закладки. Вы можете увидеть ниже, что код автозагрузчика прост, всего два оператора
Ответ №1:
.load("path")
.withColumn("filePath",input_file_name())
затем вы можете, например, вставить путь к файлу в свой потоковый приемник и получить оттуда отдельное значение или использовать forEatch / forEatchBatch и, например, вставить его в таблицу spark sql
Ответ №2:
Вы можете получать уведомления о файлах, загруженных в S3, используя структурную потоковую передачу. Для уже загруженных файлов можно проверить путь назначения s3_output_path.
df = (spark.readStream.format('cloudFiles')
.option("cloudFiles.format", "json")
.option("cloudFiles.region", "<aws region>)
.option("cloudFiles.awsAccessKey",<ACCESS_KEY>)
.option("cloudFiles.awsSecretKey", <SECRET_KEY>)
.option ("cloudFiles.useNotifications", "true")
.load(<s3_path>))
df.writeStream.format('delta').outputMode("append")
.option("checkpointLocation", <checkpoint_path>)
.start(<s3_output_path>)
Комментарии:
1. это просто обработка файлов, а не получение списка обработанных файлов, который задает автор вопроса
2. опция («CloudFiles.useNotifications», «true») в приведенном выше коде будет подписываться на уведомления о загруженных файлах.
3. Я знаю, что это значит. Вопрос заключался в том, как получить файлы, которые уже были загружены автозагрузчиком
4. Я опубликовал комментарий выше, чтобы узнать больше о вопросе выше, я вижу, что функция автозагрузчика имеет все необходимые функции для загрузки только новых файлов. уже загруженные файлы могут быть известны, если мы записываем в целевое хранилище после чтения из исходного хранилища.
Ответ №3:
Если вы используете опцию checkpointLocation, вы можете прочитать все файлы, которые были обработаны, прочитав журналы RocksDB. Несколько примеров кода для достижения этой цели обратите внимание, что вам нужно указать путь к местоположению контрольной точки, по которому вы хотите получить список загруженных файлов.
from glob import glob
import codecs
directory = "<YOUR_PATH_GOES_HERE>/sources/*/rocksdb/logs/"
for file in glob(f"{directory}/*.log"):
with codecs.open(file, encoding='utf-8', errors='ignore') as f:
f = f.readlines()
print(f)
PS.: Журналы должны быть проанализированы должным образом, чтобы получить только имена файлов.