Как эффективно объединить большие фреймы данных pyspark и маленький список python для получения некоторых результатов NLP в databricks

#python #dataframe #apache-spark #pyspark #databricks

#python #фрейм данных #apache-искра #pyspark #databricks #apache-spark

Вопрос:

Я работаю над NLP с помощью SparkNLP и SparkML в databricks.

Я использовал LDA (из SparkML) для моделирования темы и получил следующие темы.

Это фрейм данных pyspark (df1):

   df1:

  t_id word_index  weights
   0   [0, 2, 3] [0.2105, 0.116, 0.18]
   1   [1, 4, 6] [0.15, 0.05, 0.36]

 "t_id" is topic id.
 "weights" is the weight value of each word with index in "word_index"

 The "word_index" in df1 corresponds to the location of each word in the list (lt).

 df1 is small with not more than 100 rows.
  

У меня есть список слов (lt): это список python

   lt:
  ['like', 'book', 'music', 'bike', 'great', 'pen', 'laptop']
  
  lt has about 20k words.
  

У меня есть еще один большой фрейм данных pyspark (df2) с более чем 20 миллионами строк.
Его размер составляет более 50 ГБ.

df2:

  u_id p_id reviews
 sra  tvs  "I like this music" # some english tokens (each token can be found in "lt")  
 fbs  dvr  "The book is great"
  

Я хотел бы присвоить «t_id» (темы) в df1 каждой строке df2, чтобы я мог получить фрейм данных pyspark, например:

  u_id p_id reviews               t_id the_highest_weights
 sra  tvs  "I like this music"   1    # the highest of all tokens' weights among all "t_id"s
 fbs  dvr  "The book is great"   4
  

Но в одном обзоре может быть несколько «t_id» (тем), потому что в обзоре могут быть слова, охватываемые несколькими «t_id».
Итак, я должен вычислить общий вес каждого «t_id» таким образом, чтобы «t_id» с наибольшим общим весом присваивался «отзывам» в df2.

Он представлен как «the_highest_weights» конечного результата.

Я не хочу использовать «for loops» для работы с этой строкой за строкой, потому что это неэффективно для большого фрейма данных.

Как я могу использовать фрейм данных pyspark (не pandas) и векторизацию (при необходимости) для эффективного получения результата?

Спасибо

Комментарии:

1. Нашли ли вы решение своей проблемы?

Ответ №1:

Я не уверен в том, что именно вы хотите вычислить, но вы сможете настроить этот ответ, чтобы получить то, что вам нужно. Допустим, вы хотите найти для каждого предложения t_id максимальный балл (заданный суммой весов его токенов).

Вы можете начать с создания фрейма данных, который связывает каждое слово с его индексом.

 df_lt = spark.createDataFrame([(i, lt[i]) for i in 
                      range(0, len(lt))], ['word_index', 'w'])
  

Затем мы сгладим df1 так, чтобы каждая строка содержала t_id индекс, индекс слова и соответствующий вес. Для этого мы можем использовать UDF. Обратите внимание, что в spark > = 2.4 вместо этого вы можете использовать array_union и create_map , но поскольку df1 он маленький, использование UDF не будет проблемой.

 
def create_pairs(index, weights):
    return [(index[i], weights[i]) for i in range(0, len(index))]
create_pairs_udf = udf(create_pairs, ArrayType(StructType([
                         StructField(IntegerType(), 'word_index'),
                         StructField(DoubleType(), 'weight')
                   ])))

df1_exp = df1
    .select('t_id', explode(create_pairs_udf(df1['word_index'], df1['weights']))
                       .alias('pair'))
    .select('t_id', 'pair.word_index', 'pair.weight')
  

Наконец, основная работа выполнена над df2 , большим фреймом данных. Мы начинаем с того, что расширяем предложение, чтобы получить по слову в строке ( u_id и p_id ). Затем нам нужно объединить с df_lt , чтобы перевести слова в индексы. Затем, путем объединения с df1_exp , мы связываем индекс каждого слова с его весом. Затем мы группируем по всем индексам (включая t_id ), чтобы вычислить сумму весов, и снова группируем, чтобы выбрать лучшее t_id для каждого предложения.

Чтобы ускорить процесс, мы можем указать spark на широковещательные df_lt и df1_exp небольшие, чтобы избежать перетасовки df2 , которая является большой.

Код выглядит следующим образом:

 df2
    .select("u_id", "p_id", explode(split(df2['reviews'], "\s ")).alias("w"))
    .join(broadcast(df_lt), ['w'])
    .drop('w')
    .join(broadcast(df1_exp), ['word_index'])
    .groupBy('u_id', 'p_id', 't_id')
    .agg(sum('weight').alias('score'))
    .withColumn('t_id', struct('score', 't_id'))
    .groupBy('u_id', 'p_id')
    .agg(max('t_id').alias('t_id'))
    .select('u_id', 'p_id', 't_id.score', 't_id.t_id')
    .show()
  
  ---- ---- ------ ---- 
|u_id|p_id| score|t_id|
 ---- ---- ------ ---- 
| fbs| dvr|   0.2|   1|
| sra| tvs|0.3265|   0|
 ---- ---- ------ ----