#python #pyspark #databricks
Вопрос:
У меня есть код конвейера, который итеративно обрабатывает файлы parquet один за другим в псевдокоде, который выглядит следующим образом:
def f(files):
files_list = dbutils.fs.ls(files)
i = 0
for file in files_list:
print(f"n Trying to read file = {file}, loop index = {i} n")
do_stuff_to_file_and_save_resulting_dataframe_to_curated_folder(file, curated_folder)
archive_file_by_moving_to_a_different_folder(file, destination_folder)
i
f(my_folder)
Он почти делает то, что ему говорят — в конце папка «файлы» (состоящая, скажем, из 10 файлов) пуста, а две папки назначения заполнены. Однако в конце цикла for он совершает неожиданное поведение: он пытается еще раз просканировать чтение первого файла (скажем, data0), сбросив индекс до 0. Похоже, что цикл запускается снова без какого-либо триггера для него.
Trying to read file = FileInfo(path='dbfs:/mnt/my_folder/data0/', name='data0/', size=0), loop index = 0
Конечно, это вызывает исключение, потому что папка my_folder пуста:
FileReadException: Error while reading file dbfs:/mnt/my_folder/data0/part-00000-tid-4845080800878359368-2069f3d0-369c-4c6d-b2a0-50923666af8e-25447-1-c000.snappy.parquet.
It is possible the underlying files have been updated.
You can explicitly invalidate the cache in Spark by running 'REFRESH TABLE tableName' command in SQL or by recreating the Dataset/DataFrame involved.
If Delta cache is stale or the underlying files have been removed, you can invalidate Delta cache manually by restarting the cluster.
Я не уверен, почему цикл запускается дважды без всякой причины, так как функция вызывается только один раз. Что еще более удивительно, он ломается после выполнения работы (результаты сохраняются). Нет никакой таблицы SQL, которую нужно обновить. Все сделано в паркете, а не в дельте. Я попытался сбросить и снова подключить кластер. Что можно сделать, чтобы устранить эту ошибку?
Комментарии:
1. Я угадаю: может быть, код должен быть выполнен на
if __name__ == "__main__":
. Может быть, он запускает его с многопроцессорной обработкой, и каждый процесс снова запускает вашу функцию — иif __name__ == "__main__":
. следует остановить другие процессы, чтобы запустить его снова.