PySpark Работает с Дельта — таблицами- Для оптимизации цикла с объединением

#pyspark #databricks #union-all #delta-lake

Вопрос:

В настоящее время я работаю в базах данных, и у меня есть дельта-таблица с более чем 20 столбцами. Мне в основном нужно взять значение из 1 столбца в каждой строке, отправить его в api, который возвращает два значения/столбца, а затем создать остальные 26, чтобы объединить значения обратно в исходную дельта-таблицу. Таким образом, ввод-28 столбцов, а вывод-28 столбцов. В настоящее время мой код выглядит так:

 from pyspark.sql.types import * from pyspark.sql import functions as F import requests, uuid, json from pyspark.sql import SparkSession from pyspark.sql import DataFrame from pyspark.sql.functions import col,lit from functools import reduce  spark.conf.set("spark.sql.adaptive.enabled","true") spark.conf.set("spark.databricks.adaptive.autoOptimizeShuffle.enabled", "true") spark.sql('set spark.sql.execution.arrow.pyspark.enabled = true') spark.conf.set("spark.databricks.optimizer.dynamicPartitionPruning","true") spark.conf.set("spark.sql.parquet.compression.codec","gzip") spark.conf.set("spark.sql.inMemorycolumnarStorage.compressed","true") spark.conf.set("spark.databricks.optimizer.dynamicFilePruning","true");  output=spark.sql("select * from delta.`table`").cache()  SeriesAppend=[]  for i in output.collect():  #small mapping fix  if i['col1']=='val1':  var0='a'  elif i['col1']=='val2':  var0='b'  elif i['col1']=='val3':  var0='c'  elif i['col1']=='val4':  var0='d'   var0=set([var0])  req_var = set(['a','b','c','d'])  var_list=list(req_var-var0)   #subscription info   headers = {header}   body = [{  'text': i['col2']  }]    if len(i['col2'])lt;500:  request = requests.post(constructed_url, params=params, headers=headers, json=body)  response = request.json()  dumps=json.dumps(response[0])  loads = json.loads(dumps)  json_rdd = sc.parallelize(loads)  json_df = spark.read.json(json_rdd)  json_df = json_df.withColumn('col1',lit(i['col1']))  json_df = json_df.withColumn('col2',lit(i['col2']))  json_df = json_df.withColumn('col3',lit(i['col3']))  ...  SeriesAppend.append(json_df)    else:  pass  Series_output=reduce(DataFrame.unionAll, SeriesAppend)  

ОБРАЗЕЦ DF только с 3 столбцами:

 df = spark.createDataFrame(  [  ("a", "cat","owner1"), # create your data here, be consistent in the types.  ("b", "dog","owner2"),  ("c", "fish","owner3"),  ("d", "fox","owner4"),  ("e", "rat","owner5"),  ],  ["col1", "col2", "col3"]) # add your column names here  

Мне действительно просто нужно записать ответ другие значения столбцов в дельта-таблицу, поэтому фреймы данных не обязательно требуются, но я не нашел более быстрого способа, чем описанный выше. Прямо сейчас я могу запустить 5 входов, которые возвращают 15 за 25,3 секунды без unionAll. С включением союза он превращается в 3 минуты.

Конечный результат будет выглядеть следующим образом:

 df = spark.createDataFrame(  [  ("a", "cat","owner1","MI", 48003), # create your data here, be consistent in the types.  ("b", "dog","owner2", "MI", 48003),  ("c", "fish","owner3","MI", 48003),  ("d", "fox","owner4","MI", 48003),  ("e", "rat","owner5","MI", 48003),  ],  ["col1", "col2", "col3", "col4", "col5"]) # add your column names here  

Как я могу сделать это быстрее в spark?

Комментарии:

1. collect затем for запустил бы все это в драйвере, который не масштабируется и не параллелен. Вы подумывали о том, чтобы написать вместо этого UDF?

2. Поэтому я знаю, что могу написать udf, но не уверен, что это было бы похоже на вышесказанное или что я могу сделать в этом udf, чтобы он работал быстрее. Можете ли вы продемонстрировать пример?

3. Я могу попробовать сделать демонстрацию, если вы объясните мне, что вы хотите сделать, каков пример ввода и какой ожидаемый результат. И я могу заверить вас, что использование UDF (по сравнению с вашим кодом выше) абсолютно масштабируемо. Вероятно, это не быстрее с небольшим набором данных, но попробуйте с большим, и вы увидите разницу

4. Таким образом, ответ-это всего лишь две дополнительные колонки. Поэтому для образца df, который я предоставил, где 1 строка будет col1, col2, col3… Выход будет col1,col2,col3,col4,col5. Мы можем пометить эти столбцы заполнителем состояния и почтового индекса. Я добавил образец вывода выше для ясности.

Ответ №1:

Как упоминалось в моих комментариях, вы должны использовать UDF для распределения дополнительной рабочей нагрузки между работниками collect , а не позволять одной машине (драйверу) запускать все это. Это просто неправильный подход и не масштабируемый.

 # This is your main function, pure Python and you can unittest it in any way you want. # The most important about this function is: # - everything must be encapsulated inside the function, no global variable works here def req(col1, col2):  if col1 == 'val1':  var0 = 'a'  elif col1 == 'val2':  var0 = 'b'  elif col1 == 'val3':  var0 = 'c'  elif col1 == 'val4':  var0 = 'd'    var0=set([var0])  req_var = set(['a','b','c','d'])  var_list = list(req_var - var0)    #subscription info   headers = {header} # !!! `header` must available **inside** this function, global won't work   body = [{  'text': col2  }]    if len(col2) lt; 500:  # !!! same as `header`, `constructed_url` must available **inside** this function, global won't work  request = requests.post(constructed_url, params=params, headers=headers, json=body)  response = request.json()  return (response.col4, response.col5)  else:  return None  # Now you wrap the function above into a Spark UDF. # I'm using only 2 columns here as input, but you can use as many columns as you wish. # Same as output, I'm using only a tuple with 2 elements, you can make it as many items as you wish. df.withColumn('temp', F.udf(req, T.ArrayType(T.StringType()))('col1', 'col2')).show()  # Output #  ---- ---- ------ ------------------  # |col1|col2| col3| temp| #  ---- ---- ------ ------------------  # | a| cat|owner1|[foo_cat, bar_cat]| # | b| dog|owner2|[foo_dog, bar_dog]| # | c|fish|owner3| null| # | d| fox|owner4| null| # | e| rat|owner5| null| #  ---- ---- ------ ------------------   # Now all you have to do is extract the tuple and assign to separate columns # (and delete temp column to cleanup) (df  .withColumn('col4', F.col('temp')[0])  .withColumn('col5', F.col('temp')[1])  .drop('temp')  .show() )  # Output #  ---- ---- ------ ------- -------  # |col1|col2| col3| col4| col5| #  ---- ---- ------ ------- -------  # | a| cat|owner1|foo_cat|bar_cat| # | b| dog|owner2|foo_dog|bar_dog| # | c|fish|owner3| null| null| # | d| fox|owner4| null| null| # | e| rat|owner5| null| null| #  ---- ---- ------ ------- -------