#pyspark #databricks #union-all #delta-lake
Вопрос:
В настоящее время я работаю в базах данных, и у меня есть дельта-таблица с более чем 20 столбцами. Мне в основном нужно взять значение из 1 столбца в каждой строке, отправить его в api, который возвращает два значения/столбца, а затем создать остальные 26, чтобы объединить значения обратно в исходную дельта-таблицу. Таким образом, ввод-28 столбцов, а вывод-28 столбцов. В настоящее время мой код выглядит так:
from pyspark.sql.types import * from pyspark.sql import functions as F import requests, uuid, json from pyspark.sql import SparkSession from pyspark.sql import DataFrame from pyspark.sql.functions import col,lit from functools import reduce spark.conf.set("spark.sql.adaptive.enabled","true") spark.conf.set("spark.databricks.adaptive.autoOptimizeShuffle.enabled", "true") spark.sql('set spark.sql.execution.arrow.pyspark.enabled = true') spark.conf.set("spark.databricks.optimizer.dynamicPartitionPruning","true") spark.conf.set("spark.sql.parquet.compression.codec","gzip") spark.conf.set("spark.sql.inMemorycolumnarStorage.compressed","true") spark.conf.set("spark.databricks.optimizer.dynamicFilePruning","true"); output=spark.sql("select * from delta.`table`").cache() SeriesAppend=[] for i in output.collect(): #small mapping fix if i['col1']=='val1': var0='a' elif i['col1']=='val2': var0='b' elif i['col1']=='val3': var0='c' elif i['col1']=='val4': var0='d' var0=set([var0]) req_var = set(['a','b','c','d']) var_list=list(req_var-var0) #subscription info headers = {header} body = [{ 'text': i['col2'] }] if len(i['col2'])lt;500: request = requests.post(constructed_url, params=params, headers=headers, json=body) response = request.json() dumps=json.dumps(response[0]) loads = json.loads(dumps) json_rdd = sc.parallelize(loads) json_df = spark.read.json(json_rdd) json_df = json_df.withColumn('col1',lit(i['col1'])) json_df = json_df.withColumn('col2',lit(i['col2'])) json_df = json_df.withColumn('col3',lit(i['col3'])) ... SeriesAppend.append(json_df) else: pass Series_output=reduce(DataFrame.unionAll, SeriesAppend)
ОБРАЗЕЦ DF только с 3 столбцами:
df = spark.createDataFrame( [ ("a", "cat","owner1"), # create your data here, be consistent in the types. ("b", "dog","owner2"), ("c", "fish","owner3"), ("d", "fox","owner4"), ("e", "rat","owner5"), ], ["col1", "col2", "col3"]) # add your column names here
Мне действительно просто нужно записать ответ другие значения столбцов в дельта-таблицу, поэтому фреймы данных не обязательно требуются, но я не нашел более быстрого способа, чем описанный выше. Прямо сейчас я могу запустить 5 входов, которые возвращают 15 за 25,3 секунды без unionAll. С включением союза он превращается в 3 минуты.
Конечный результат будет выглядеть следующим образом:
df = spark.createDataFrame( [ ("a", "cat","owner1","MI", 48003), # create your data here, be consistent in the types. ("b", "dog","owner2", "MI", 48003), ("c", "fish","owner3","MI", 48003), ("d", "fox","owner4","MI", 48003), ("e", "rat","owner5","MI", 48003), ], ["col1", "col2", "col3", "col4", "col5"]) # add your column names here
Как я могу сделать это быстрее в spark?
Комментарии:
1.
collect
затемfor
запустил бы все это в драйвере, который не масштабируется и не параллелен. Вы подумывали о том, чтобы написать вместо этого UDF?2. Поэтому я знаю, что могу написать udf, но не уверен, что это было бы похоже на вышесказанное или что я могу сделать в этом udf, чтобы он работал быстрее. Можете ли вы продемонстрировать пример?
3. Я могу попробовать сделать демонстрацию, если вы объясните мне, что вы хотите сделать, каков пример ввода и какой ожидаемый результат. И я могу заверить вас, что использование UDF (по сравнению с вашим кодом выше) абсолютно масштабируемо. Вероятно, это не быстрее с небольшим набором данных, но попробуйте с большим, и вы увидите разницу
4. Таким образом, ответ-это всего лишь две дополнительные колонки. Поэтому для образца df, который я предоставил, где 1 строка будет col1, col2, col3… Выход будет col1,col2,col3,col4,col5. Мы можем пометить эти столбцы заполнителем состояния и почтового индекса. Я добавил образец вывода выше для ясности.
Ответ №1:
Как упоминалось в моих комментариях, вы должны использовать UDF для распределения дополнительной рабочей нагрузки между работниками collect
, а не позволять одной машине (драйверу) запускать все это. Это просто неправильный подход и не масштабируемый.
# This is your main function, pure Python and you can unittest it in any way you want. # The most important about this function is: # - everything must be encapsulated inside the function, no global variable works here def req(col1, col2): if col1 == 'val1': var0 = 'a' elif col1 == 'val2': var0 = 'b' elif col1 == 'val3': var0 = 'c' elif col1 == 'val4': var0 = 'd' var0=set([var0]) req_var = set(['a','b','c','d']) var_list = list(req_var - var0) #subscription info headers = {header} # !!! `header` must available **inside** this function, global won't work body = [{ 'text': col2 }] if len(col2) lt; 500: # !!! same as `header`, `constructed_url` must available **inside** this function, global won't work request = requests.post(constructed_url, params=params, headers=headers, json=body) response = request.json() return (response.col4, response.col5) else: return None # Now you wrap the function above into a Spark UDF. # I'm using only 2 columns here as input, but you can use as many columns as you wish. # Same as output, I'm using only a tuple with 2 elements, you can make it as many items as you wish. df.withColumn('temp', F.udf(req, T.ArrayType(T.StringType()))('col1', 'col2')).show() # Output # ---- ---- ------ ------------------ # |col1|col2| col3| temp| # ---- ---- ------ ------------------ # | a| cat|owner1|[foo_cat, bar_cat]| # | b| dog|owner2|[foo_dog, bar_dog]| # | c|fish|owner3| null| # | d| fox|owner4| null| # | e| rat|owner5| null| # ---- ---- ------ ------------------ # Now all you have to do is extract the tuple and assign to separate columns # (and delete temp column to cleanup) (df .withColumn('col4', F.col('temp')[0]) .withColumn('col5', F.col('temp')[1]) .drop('temp') .show() ) # Output # ---- ---- ------ ------- ------- # |col1|col2| col3| col4| col5| # ---- ---- ------ ------- ------- # | a| cat|owner1|foo_cat|bar_cat| # | b| dog|owner2|foo_dog|bar_dog| # | c|fish|owner3| null| null| # | d| fox|owner4| null| null| # | e| rat|owner5| null| null| # ---- ---- ------ ------- -------