#apache-spark #pyspark #apache-spark-sql #substring
Вопрос:
У меня есть фрейм данных pyspark, подобный приведенному ниже входному фрейму данных. Я разделяю productname
столбец на пробелы и создаю несколько новых столбцов из маркеров в названии. У меня есть пример кода ниже. Затем я самостоятельно присоединяюсь first_token
к фрейму данных наравне с любым другим токеном, если он имеет то же tz_brandname
самое и producttype
. Моя цель состоит в том , чтобы сопоставить любые два названия продуктов в данных, если они имеют одинаковые tz_brandname
и producttype
, и первый токен соответствует любому из следующих 7 токенов productname
. У меня есть пример желаемого результата ниже. Мой текущий код чрезвычайно медленный, у кого-нибудь есть предложения о том, как это ускорить?
Входные Данные:
products_productname_df.show() ------------ ----------- ----------- |tz_brandname|producttype|productname| ------------ ----------- ----------- |brand1 |toy |green duck | ------------ ----------- ----------- |brand2 |game |toy sky win| ------------ ----------- ----------- |brand1 |toy |duck black | ------------ ----------- ----------- |brand2 |game |sky flyer | ------------ ----------- -----------
Выходные Данные:
------------ ----------- ----------- ----------- ----------- |tz_brandname|producttype|a_name |b_name |first_token| ------------ ----------- ----------- ----------- ----------- |brand1 |toy |duck black |green duck |duck | ------------ ----------- ----------- ----------- ----------- |brand2 |game |sky flyer |toy sky win|sky | ------------ ----------- ----------- ----------- -----------
Код:
from pyspark.sql.functions import regexp_extract, split, coalesce, size, length, substring first_token_df=products_productname_df.withColumn('first_token', upper(trim(split('productname', 's ')[0]))) .withColumn('token_2', upper(trim(split('productname', 's ')[1]))) .withColumn('token_3', upper(trim(split('productname', 's ')[2]))) .withColumn('token_4', upper(trim(split('productname', 's ')[3]))) .withColumn('token_5', upper(trim(split('productname', 's ')[4]))) .withColumn('token_6', upper(trim(split('productname', 's ')[5]))) .withColumn('token_7', upper(trim(split('productname', 's ')[6]))) block_df=first_token_df.alias('a') .join(first_token_df.alias('b'), (col('a.tz_brandname')==col('b.tz_brandname')) amp;(col('a.producttype')==col('b.producttype')) amp;( (col('a.first_token')==col('b.first_token')) |(col('a.first_token')==col('b.token_2')) |(col('a.first_token')==col('b.token_3')) |(col('a.first_token')==col('b.token_4')) |(col('a.first_token')==col('b.token_5')) |(col('a.first_token')==col('b.token_6')) |(col('a.first_token')==col('b.token_7')) ), how='inner' ) .select( col('a.tz_brandname'), col('a.producttype'), col('a.productname').alias('a_name'), col('b.productname').alias('b_name'), col('a.first_token') ) block_df[upper(trim(block_df['first_token']))!=upper(trim(block_df['b_name']))].distinct().orderBy('a_name').show(truncate=False)
Обновить:
включение array_intersect по-прежнему выполняется довольно медленно
block_df=products_productname_df[['tz_brandname','producttype','productname']].distinct() block_df=block_df.alias('a') .join(block_df.alias('b'), (col('a.tz_brandname')==col('b.tz_brandname')) amp;(col('a.producttype')==col('b.producttype')) amp;(size(array_intersect(split(col('a.productname'),''),split(col('b.productname'),'')))gt;0) , how='inner' ) .select( col('a.tz_brandname'), col('a.producttype'), col('a.productname').alias('a_name'), col('b.productname).alias('b_name'), array_intersect(split(col('a.productname'),''),split(col('b.productname'),'')).alias('intersect_tokens') ).distinct()
Комментарии:
1. У меня нет навыков в технике, которую вы перечислили, но можно отсортировать список ввода по первым 2 столбцам. Остальное было бы прямым циклом в результате. Примечание: Не уверен в вашей 3-й колонке результатов.
2. Может быть, вы можете что-то
pyspark.sql.functions.explode()
сделать, чтобы это было более простое соединение вместо того, чтобы сравнивать кучу условий для каждой пары.
Ответ №1:
Я не понимаю, чего именно вы хотите, но вам лучше использовать функции массива.
df = spark.read.csv('test.csv', header=True) df.show() ------------ ----------- ----------- |tz_brandname|producttype|productname| ------------ ----------- ----------- | brand1| toy| green duck| | brand2| game|toy sky win| | brand1| toy| duck black| | brand2| game| sky flyer| | brand2| game| flyer test| ------------ ----------- ----------- df.withColumnRenamed('productname', 'a_name') .join(df.withColumnRenamed('productname', 'b_name'), ['tz_brandname', 'producttype'], 'inner') .filter('a_name gt; b_name') .withColumn('first_token', f.array_intersect(f.split('a_name', ' '), f.split('b_name', ' '))) .show() ------------ ----------- ----------- ---------- ----------- |tz_brandname|producttype| a_name| b_name|first_token| ------------ ----------- ----------- ---------- ----------- | brand1| toy| green duck|duck black| [duck]| | brand2| game|toy sky win|flyer test| []| | brand2| game|toy sky win| sky flyer| [sky]| | brand2| game| sky flyer|flyer test| [flyer]| ------------ ----------- ----------- ---------- -----------
Комментарии:
1. спасибо, что перезвонил мне по этому поводу. Я добавил обновление к своему исходному сообщению, используя ваше предложение array_intersect, чтобы попытаться проиллюстрировать свою цель. Я хочу, чтобы все названия продуктов были рядом, если они имеют общие токены, а их имя tz_brand и тип продукта совпадают. С вашим предложением я думаю, что мы сравниваем только первое и последнее имена продуктов для каждой комбинации tz_brandname и prdoucttype.
2. Я изменил свой ответ, чтобы дать комбинацию «все», и вы можете использовать результат по своему усмотрению. Группируйтесь по или любым другим способом.