Считывайте несколько групп csv-файлов из папки и вставляйте их в соответствующие целевые таблицы параллельно, используя spark или ссылки на данные

#python #apache-spark #pyspark #databricks

Вопрос:

Входные данные: abc.tar.gz -gt; un tar — gt;gt; Папка: abc

Структура папок abc:

корневая папка: abc содержит csv-файлы, которые генерируются из 100 городов каждые 5 минут в день.

Количество csv-файлов: 100 городов * 12 файлов в час * 24 часа = 28800 csv-файлов

 abc/  city1_0005.csv city1_0010.csv .. city1_2355.csv .. .. city2_0005.csv city2_0010.csv .. city2_2355.csv .. .. city100_0005.csv city100_0010.csv  

Функциональные Требования:

  • С помощью spark/ databricks Создайте таблицу для каждого города и загрузите соответствующие csv-файлы городов (288) в таблицу. всего в целевом местоположении будет 100 столов. 1 стол на 1 город.
  • У каждого города своя схема .Все столбцы различны для каждого города. поэтому я не могу записать все данные о городах в одну таблицу с разделами.

техническое требование: Считывайте и обрабатывайте файлы параллельно для повышения производительности


Я разработал приведенный ниже код для последовательной обработки данных. Я ищу способы его оптимизации .

 staging_path="abfss://xyz/abc"  #using databricks utils to get the list of files in folder filesProp = dbutils.fs.ls(staging_adls_path)  #extracting the city names from list of filenames filesSet =set() for file in filesProp:  filesSet.add(file.name.split('-')[0])  #empty list to store dataframes dictionary_df = {}   #reading 1 city data and inserting to table for fileName in filesSet:  filePath = staging_path fileName "*"  print(filePath)  dictionary_df[fileName] = spark.read.options(header='True', delimiter=',').csv(filePath)  dictionary_df[fileName].write.saveAsTable(fileName)   

Комментарии:

1. ваш код уже оптимизирован. но вы можете изменить свою структуру данных для повышения производительности. Просто одна таблица с разделом на название города была бы намного лучше.

2. Привет, Стивен, я забыл упомянуть одну вещь. У каждого города своя схема . Поэтому я не могу записать все данные о городах в одну таблицу с разделами.

3. насколько по-другому ? это похоже на что-то совершенно другое ? или у вас может быть общая схема, которая подходит для всех таблиц, но при необходимости не содержит значений.

4. если вы не дадите нам больше информации о своей схеме, нам будет трудно помочь. Но в остальном у вас уже есть надлежащий процесс.

5. Я имею в виду, что csv-файлы city1 содержат столбцы a,b,c,d. файлы city2 содержат столбцы i,j,k,l,m,n. Ни в одной из схем двух городов нет общего столбца.

Ответ №1:

Вот как я бы решил этот сценарий

  • использование сценария оболочки для перемещения CSV на основе городов в / определенные папки
     This will ensure the files with same schema are under same root folder  /abc/  city1/  20211021/city1_0005   20211021/city1_0010  ...  city2/  20211021/city2_0005  20211021/city2_0010   
  • Поскольку вы уже работаете в Azure и Databricks, я бы рекомендовал вам использовать формат данных CloudFiles, который повысит производительность при параллельном сканировании необработанных файлов из вашего озера данных по сравнению с опцией структурированной потоковой передачи с открытым исходным кодом csv
  • использование структурированной потоковой передачи с foreachBatch() и триггером(один раз=True) будет обрабатывать только инкрементные файлы с момента последнего выполнения, сохраняя сведения об обработанных файлах по пути checkpoint_location


  • функция process_multiple_csvs_different_schema принимает микробатч, выбирает столбцы из соответствующего csv-файла и записывает их в соответствующие таблицы городов

 from pyspark.sql import functions as F  tmp_db = "test_multiple_csv_schema" spark.sql(f"create database if not exists {tmp_db}") base_path = lt;your_base_mount_path_root_folder_for_csvsgt; checkpoint_location = f"{base_path}/checkpoint/multiplecsvs" input_path = f"{base_path}/multiplecsvs/" schema_location = f"{base_path}/schema/multiplecsvs" staging_checkpoint_path = f"{base_path}/staging/checkpoint/multiplecsvs" staging_data_path = f"{base_path}/staging/data/multiplecsvs" input_format = "csv"  def process_multiple_csvs_different_schema(batch_df):   df = (  batch_df  .withColumn("table",F.split(F.col("input_file_name"),".csv")[0])  .withColumn("table_path",F.split(F.col("table"),"/"))  .withColumn("table_name",F.split(F.col("table"),"/")[F.size(F.col("table_path"))-1])  .drop("table","table_path")  )   list_of_cities = df.select("table_name").distinct().collect()   list_of_cities = [city[0] for city in list_of_cities]   for city in list_of_cities:  print(f"processing data for {city}")  city_df = df.where(f"table_name='{city}'")  input_file_name = city_df.limit(1).select("input_file_name").collect()[0][0]   df_schema = spark.read.option("format",input_format).option("header",True).load(input_file_name,format=input_format)  select_columns = df_schema.columns   city_df.select(select_columns).withColumn("processed_time",F.current_timestamp()).write.option("mergeSchema",True).option("mode","append").format("delta").saveAsTable(f"{tmp_db}.{city}")  raw_df = (spark  .readStream  .format("cloudFiles")  .option("cloudFiles.format",input_format)  .option("cloudFiles.schemaLocation",schema_location)  .load(input_path)  )  (  raw_df.withColumn("input_file_name",F.input_file_name())  .writeStream  .option("checkpointLocation",staging_checkpoint_path)  .option("mergeSchema",True)  .option("format","delta")  .outputMode("append")  .trigger(once=True)  .start(staging_data_path)  .awaitTermination() )  staging_df = spark.readStream.format("delta").load(staging_data_path) (  staging_df.writeStream  .option("checkpointLocation",checkpoint_location)  .option("format","delta")  .trigger(once=True)  .foreachBatch(lambda batch_df,batch_id:process_multiple_csvs_different_schema(batch_df))  .start()  .awaitTermination() )