#python #dataframe #apache-spark #pyspark #databricks
#python #фрейм данных #apache-искра #pyspark #databricks #apache-spark
Вопрос:
Я работаю над NLP с помощью SparkNLP и SparkML в databricks.
Я использовал LDA (из SparkML) для моделирования темы и получил следующие темы.
Это фрейм данных pyspark (df1):
df1:
t_id word_index weights
0 [0, 2, 3] [0.2105, 0.116, 0.18]
1 [1, 4, 6] [0.15, 0.05, 0.36]
"t_id" is topic id.
"weights" is the weight value of each word with index in "word_index"
The "word_index" in df1 corresponds to the location of each word in the list (lt).
df1 is small with not more than 100 rows.
У меня есть список слов (lt): это список python
lt:
['like', 'book', 'music', 'bike', 'great', 'pen', 'laptop']
lt has about 20k words.
У меня есть еще один большой фрейм данных pyspark (df2) с более чем 20 миллионами строк.
Его размер составляет более 50 ГБ.
df2:
u_id p_id reviews
sra tvs "I like this music" # some english tokens (each token can be found in "lt")
fbs dvr "The book is great"
Я хотел бы присвоить «t_id» (темы) в df1 каждой строке df2, чтобы я мог получить фрейм данных pyspark, например:
u_id p_id reviews t_id the_highest_weights
sra tvs "I like this music" 1 # the highest of all tokens' weights among all "t_id"s
fbs dvr "The book is great" 4
Но в одном обзоре может быть несколько «t_id» (тем), потому что в обзоре могут быть слова, охватываемые несколькими «t_id».
Итак, я должен вычислить общий вес каждого «t_id» таким образом, чтобы «t_id» с наибольшим общим весом присваивался «отзывам» в df2.
Он представлен как «the_highest_weights» конечного результата.
Я не хочу использовать «for loops» для работы с этой строкой за строкой, потому что это неэффективно для большого фрейма данных.
Как я могу использовать фрейм данных pyspark (не pandas) и векторизацию (при необходимости) для эффективного получения результата?
Спасибо
Комментарии:
1. Нашли ли вы решение своей проблемы?
Ответ №1:
Я не уверен в том, что именно вы хотите вычислить, но вы сможете настроить этот ответ, чтобы получить то, что вам нужно. Допустим, вы хотите найти для каждого предложения t_id
максимальный балл (заданный суммой весов его токенов).
Вы можете начать с создания фрейма данных, который связывает каждое слово с его индексом.
df_lt = spark.createDataFrame([(i, lt[i]) for i in
range(0, len(lt))], ['word_index', 'w'])
Затем мы сгладим df1 так, чтобы каждая строка содержала t_id
индекс, индекс слова и соответствующий вес. Для этого мы можем использовать UDF. Обратите внимание, что в spark > = 2.4 вместо этого вы можете использовать array_union
и create_map
, но поскольку df1
он маленький, использование UDF не будет проблемой.
def create_pairs(index, weights):
return [(index[i], weights[i]) for i in range(0, len(index))]
create_pairs_udf = udf(create_pairs, ArrayType(StructType([
StructField(IntegerType(), 'word_index'),
StructField(DoubleType(), 'weight')
])))
df1_exp = df1
.select('t_id', explode(create_pairs_udf(df1['word_index'], df1['weights']))
.alias('pair'))
.select('t_id', 'pair.word_index', 'pair.weight')
Наконец, основная работа выполнена над df2
, большим фреймом данных. Мы начинаем с того, что расширяем предложение, чтобы получить по слову в строке ( u_id
и p_id
). Затем нам нужно объединить с df_lt
, чтобы перевести слова в индексы. Затем, путем объединения с df1_exp
, мы связываем индекс каждого слова с его весом. Затем мы группируем по всем индексам (включая t_id
), чтобы вычислить сумму весов, и снова группируем, чтобы выбрать лучшее t_id
для каждого предложения.
Чтобы ускорить процесс, мы можем указать spark на широковещательные df_lt
и df1_exp
небольшие, чтобы избежать перетасовки df2
, которая является большой.
Код выглядит следующим образом:
df2
.select("u_id", "p_id", explode(split(df2['reviews'], "\s ")).alias("w"))
.join(broadcast(df_lt), ['w'])
.drop('w')
.join(broadcast(df1_exp), ['word_index'])
.groupBy('u_id', 'p_id', 't_id')
.agg(sum('weight').alias('score'))
.withColumn('t_id', struct('score', 't_id'))
.groupBy('u_id', 'p_id')
.agg(max('t_id').alias('t_id'))
.select('u_id', 'p_id', 't_id.score', 't_id.t_id')
.show()
---- ---- ------ ----
|u_id|p_id| score|t_id|
---- ---- ------ ----
| fbs| dvr| 0.2| 1|
| sra| tvs|0.3265| 0|
---- ---- ------ ----