#python #apache-spark #pyspark #databricks
Вопрос:
Входные данные: abc.tar.gz -gt; un tar — gt;gt; Папка: abc
Структура папок abc:
корневая папка: abc содержит csv-файлы, которые генерируются из 100 городов каждые 5 минут в день.
Количество csv-файлов: 100 городов * 12 файлов в час * 24 часа = 28800 csv-файлов
abc/ city1_0005.csv city1_0010.csv .. city1_2355.csv .. .. city2_0005.csv city2_0010.csv .. city2_2355.csv .. .. city100_0005.csv city100_0010.csv
Функциональные Требования:
- С помощью spark/ databricks Создайте таблицу для каждого города и загрузите соответствующие csv-файлы городов (288) в таблицу. всего в целевом местоположении будет 100 столов. 1 стол на 1 город.
- У каждого города своя схема .Все столбцы различны для каждого города. поэтому я не могу записать все данные о городах в одну таблицу с разделами.
техническое требование: Считывайте и обрабатывайте файлы параллельно для повышения производительности
Я разработал приведенный ниже код для последовательной обработки данных. Я ищу способы его оптимизации .
staging_path="abfss://xyz/abc" #using databricks utils to get the list of files in folder filesProp = dbutils.fs.ls(staging_adls_path) #extracting the city names from list of filenames filesSet =set() for file in filesProp: filesSet.add(file.name.split('-')[0]) #empty list to store dataframes dictionary_df = {} #reading 1 city data and inserting to table for fileName in filesSet: filePath = staging_path fileName "*" print(filePath) dictionary_df[fileName] = spark.read.options(header='True', delimiter=',').csv(filePath) dictionary_df[fileName].write.saveAsTable(fileName)
Комментарии:
1. ваш код уже оптимизирован. но вы можете изменить свою структуру данных для повышения производительности. Просто одна таблица с разделом на название города была бы намного лучше.
2. Привет, Стивен, я забыл упомянуть одну вещь. У каждого города своя схема . Поэтому я не могу записать все данные о городах в одну таблицу с разделами.
3. насколько по-другому ? это похоже на что-то совершенно другое ? или у вас может быть общая схема, которая подходит для всех таблиц, но при необходимости не содержит значений.
4. если вы не дадите нам больше информации о своей схеме, нам будет трудно помочь. Но в остальном у вас уже есть надлежащий процесс.
5. Я имею в виду, что csv-файлы city1 содержат столбцы a,b,c,d. файлы city2 содержат столбцы i,j,k,l,m,n. Ни в одной из схем двух городов нет общего столбца.
Ответ №1:
Вот как я бы решил этот сценарий
- использование сценария оболочки для перемещения CSV на основе городов в / определенные папки
This will ensure the files with same schema are under same root folder /abc/ city1/ 20211021/city1_0005 20211021/city1_0010 ... city2/ 20211021/city2_0005 20211021/city2_0010
- Поскольку вы уже работаете в Azure и Databricks, я бы рекомендовал вам использовать формат данных CloudFiles, который повысит производительность при параллельном сканировании необработанных файлов из вашего озера данных по сравнению с опцией структурированной потоковой передачи с открытым исходным кодом csv
- использование структурированной потоковой передачи с foreachBatch() и триггером(один раз=True) будет обрабатывать только инкрементные файлы с момента последнего выполнения, сохраняя сведения об обработанных файлах по пути checkpoint_location
- функция process_multiple_csvs_different_schema принимает микробатч, выбирает столбцы из соответствующего csv-файла и записывает их в соответствующие таблицы городов
from pyspark.sql import functions as F tmp_db = "test_multiple_csv_schema" spark.sql(f"create database if not exists {tmp_db}") base_path = lt;your_base_mount_path_root_folder_for_csvsgt; checkpoint_location = f"{base_path}/checkpoint/multiplecsvs" input_path = f"{base_path}/multiplecsvs/" schema_location = f"{base_path}/schema/multiplecsvs" staging_checkpoint_path = f"{base_path}/staging/checkpoint/multiplecsvs" staging_data_path = f"{base_path}/staging/data/multiplecsvs" input_format = "csv" def process_multiple_csvs_different_schema(batch_df): df = ( batch_df .withColumn("table",F.split(F.col("input_file_name"),".csv")[0]) .withColumn("table_path",F.split(F.col("table"),"/")) .withColumn("table_name",F.split(F.col("table"),"/")[F.size(F.col("table_path"))-1]) .drop("table","table_path") ) list_of_cities = df.select("table_name").distinct().collect() list_of_cities = [city[0] for city in list_of_cities] for city in list_of_cities: print(f"processing data for {city}") city_df = df.where(f"table_name='{city}'") input_file_name = city_df.limit(1).select("input_file_name").collect()[0][0] df_schema = spark.read.option("format",input_format).option("header",True).load(input_file_name,format=input_format) select_columns = df_schema.columns city_df.select(select_columns).withColumn("processed_time",F.current_timestamp()).write.option("mergeSchema",True).option("mode","append").format("delta").saveAsTable(f"{tmp_db}.{city}") raw_df = (spark .readStream .format("cloudFiles") .option("cloudFiles.format",input_format) .option("cloudFiles.schemaLocation",schema_location) .load(input_path) ) ( raw_df.withColumn("input_file_name",F.input_file_name()) .writeStream .option("checkpointLocation",staging_checkpoint_path) .option("mergeSchema",True) .option("format","delta") .outputMode("append") .trigger(once=True) .start(staging_data_path) .awaitTermination() ) staging_df = spark.readStream.format("delta").load(staging_data_path) ( staging_df.writeStream .option("checkpointLocation",checkpoint_location) .option("format","delta") .trigger(once=True) .foreachBatch(lambda batch_df,batch_id:process_multiple_csvs_different_schema(batch_df)) .start() .awaitTermination() )