#pyspark
#pyspark
Вопрос:
Я хотел бы сделать
- сегментированная или групповая оценка плотности ядра pyspark
- Присоедините полученные оценки плотности к другому фрейму данных и выполните вывод
Например, представьте, что у меня есть фрейм данных, который выглядит следующим образом:
data=[{'id':1, 'samples':[3,56,40]},
{'id':2, 'samples':[-3,80,45,45,2]}]
Эти данные были сгенерированы из чего-то вроде
df.groupBy('id').agg(F.collect_list('sample').alias('samples'))
где df
является большим. Тогда представьте, что у меня есть другой большой фрейм данных, такие данные, как:
data2 = [{'id':1, 'val': 10},
{'id':1, 'val': 39},
{'id':2, 'val': 5}]
Я хотел бы получить вероятности для этих трех значений, 10, 39, 5 относительно двух оценок плотности, которые я получил выше.
Например, программа на Python, которая могла бы это сделать, была бы
import scipy.stats
data_to_define_pdfs=[{'id':1, 'samples':[3,56,40]},
{'id':2, 'samples':[-3,80,45,45,2]}]
kdes = {}
for row in data_to_define_pdfs:
kdes[row['id']] = scipy.stats.gaussian_kde(row['samples'])
inferrence_data = [
{'id': 1, 'val': 10},
{'id': 1, 'val': 39},
{'id': 2, 'val': 5}]
for row in inferrence_data:
kde = kdes[row['id']]
row['prob'] = kde.pdf(x=row['val'])[0]
import pprint
pprint.pprint(inferrence_data)
что привело бы к
[{'id': 1, 'prob': 0.008817584549791962, 'val': 10},
{'id': 1, 'prob': 0.012149240532443975, 'val': 39},
{'id': 2, 'prob': 0.008013522166302479, 'val': 5}]
Комментарии:
1. Было бы полезно, если бы вы также включили желаемый результат и некоторое объяснение того, как он вычисляется.
2. Спасибо за интерес! Я добавил пример.
3. Вы нашли какую-либо альтернативу?
Ответ №1:
У меня есть решение, в котором я объединяю все выборки с данными для вывода, это не оптимально, поскольку я могу реплицировать много выборок, и я восстанавливаю объект python scipy kde для каждой строки в данных, где я применяю kde — но для начала я могу представить, что делаю что-то умнее:
data_to_define_pdfs_flat = []
for row in data:
for sample in row['samples']:
data_to_define_pdfs_flat.append({'id':row['id'], 'sample': sample})
df_sample = spark.createDataFrame(data=data_to_define_pdfs_flat,
schema=T.StructType([T.StructField('id', T.IntegerType(), False),
T.StructField('sample', T.FloatType(), False)]))
df_samples = df_sample.groupBy('id').agg(F.collect_list('sample').alias('samples'))
df_infer = spark.createDataFrame(data=data2,
schema=T.StructType([T.StructField('id', T.IntegerType(), False),
T.StructField('val', T.FloatType(), False)]))
df_infer2 = df_infer.join(df_samples, on='id')
def do_inference(df):
def f(samples, val):
kde = scipy.stats.gaussian_kde(samples)
return float(kde.pdf(val)[0])
udf_f = F.udf(f, T.FloatType())
return df.withColumn('prob', udf_f(F.col('samples'), F.col('val')))
df_infer2 = do_inference(df=df_infer2)
df_samples.show()
df_infer2.show()
Ответ №2:
Если вы можете хранить все образцы для KDE локально, у меня есть решение для фрейма данных pandas — по крайней мере, с этим игрушечным примером. Всегда кажется трудным заставить pandas dataframe работать в масштабе из-за того, как он использует память.
В этом решении все kde формируются на главном узле и отправляются на все узлы задач — каждый kde является функцией всех выборочных данных для этого идентификатора, поэтому мне придется выполнить подвыборку данных, которые делают df_samples
:
def do_inference_pd(df_infer, df_samples):
rows = df_samples.collect()
kdes = {}
for row in rows:
row = row.asDict(True)
kdes[row['id']] = scipy.stats.gaussian_kde(np.array(row['samples']))
def kde_prob(pdf):
kde = kdes[pdf.id[0]]
x = pdf.val
return pdf.assign(prob=kde(x))
df_infer_prob = df_infer.withColumn('prob', F.lit(0.0))
sch_str = df_infer2.schema.simpleString()
f = F.pandas_udf(f=kde_prob, returnType=sch_str, functionType=F.PandasUDFType.GROUPED_MAP)
df_infer_prob = df_infer_prob.groupBy('id').apply(f)
return df_infer_prob
df_infer_prob = do_inference_pd(df_infer=df_infer, df_samples=df_samples)