#python #apache-spark #pyspark
#python #apache-spark #pyspark
Вопрос:
Распространенная конструкция, которую я видел в алгоритмах, содержащих симметрию,
for (int i = 0; i < n ; i ) {
for (int j = i 1; j < n ; j ) {
[compute x]
objects[i][j] = x;
objects[j][i] -= x;
}
}
это (при сохранении сложности O (n ^ 2)) уменьшает объем вычислений, необходимых для использования симметрии. Не могли бы вы сказать мне, как можно внедрить такую оптимизацию в коде pyspark?
Например, я написал код, который вычисляет силу на единицу массы, действующую на каждую частицу в системе по формуле (где r
— позиция):
N m_j*(r_i - r_j)
F = -G * Σ -----------------
i!=j |r_i - r_j|^3
В нем я сначала делаю перекрестное произведение моего фрейма данных с самим собой, чтобы получить каждое попарное взаимодействие, а затем объединяю их все по идентификатору, чтобы получить общую силу, действующую на каждую частицу:
def calc_F(df_clust, G=1):
# cartesian product of the dataframe with itself
renameCols = [f"`{col}` as `{col}_other`" for col in df_clust.columns]
df_cart = df_clust.crossJoin(df_clust.selectExpr(renameCols))
df_clust_cartesian = df_cart.filter("id != id_other")
df_F_cartesian = df_clust_cartesian.selectExpr("id", "id_other", "m_other",
"`x` - `x_other` as `diff(x)`",
"`y` - `y_other` as `diff(y)`",
"`z` - `z_other` as `diff(z)`"
)
df_F_cartesian = df_F_cartesian.selectExpr("id", "id_other",
"`diff(x)` * `m_other` as `num(x)`",
"`diff(y)` * `m_other` as `num(y)`",
"`diff(z)` * `m_other` as `num(z)`",
"sqrt(`diff(x)` * `diff(x)` `diff(y)`"
"* `diff(y)` `diff(z)` * `diff(z)`) as `denom`",
)
df_F_cartesian = df_F_cartesian.selectExpr("id", "id_other",
"`num(x)` / pow(`denom`, 3) as `Fx`",
"`num(y)` / pow(`denom`, 3) as `Fy`",
"`num(z)` / pow(`denom`, 3) as `Fz`",
)
# squish back to inital particles
sumCols = ["Fx", "Fy", "Fz"]
df_agg = df_F_cartesian.groupBy("id").sum(*sumCols)
renameCols = [f"`sum({col})` as `{col}`" for col in sumCols]
df_F = df_agg.selectExpr("id", *renameCols)
df_F = df_F.selectExpr("id",
f"`Fx` * {-G} as Fx",
f"`Fy` * {-G} as Fy",
f"`Fz` * {-G} as Fz")
return df_F
Но я знаю, что сила между двумя частицами симметрична — F_ij = -F_ji
(я предполагаю, что все массы равны) — поэтому здесь я вычисляю количество сил дважды, вместо их повторного использования. Поэтому в данном конкретном случае я бы хотел df_clust_cartesian = df_cart.filter("id != id_other")
df_clust_cartesian = df_cart.filter("id < id_other")
, например, обратиться к и каким-то образом повторно использовать эти силы при вычислении общей силы во второй части функции. (Конечно, в идеале я хочу научиться делать это вообще)
Пример ввода для этого случая будет
a = sc.parallelize([
[0.48593906,-0.52435857,-0.53198230,0.46153894,-0.33775792E-01,-0.32276499,0.15625001E-04,1],
[-0.65960690E-01,0.80844238E-01,-0.27603051,-0.57578009,1.1078150,-0.29340765,0.15625001E-04,2],
[-0.34809157E-01,0.76795481E-01,-0.39087987,-0.55399138,-0.17386098,0.59250806E-01,0.15625001E-04,3]
])
from pyspark.sql.types import *
clust_input = StructType([
StructField('x', DoubleType(), False),
StructField('y', DoubleType(), False),
StructField('z', DoubleType(), False),
StructField('vx', DoubleType(), False),
StructField('vy', DoubleType(), False),
StructField('vz', DoubleType(), False),
StructField('m', DoubleType(), False),
StructField('id', IntegerType(), False)
])
df_clust = a.toDF(schema=clust_input)
Ответ №1:
По сути, вы хотите вычислять свою формулу только тогда, когда id < other_id
и использовать этот результат для генерации по симметрии всех элементов, для которых id > other_id
.
Вам просто нужно будет изменить свой фильтр следующим образом
df_clust_cartesian = df_cart.filter("id < id_other")
Затем, когда df_F_cartesian
у вас есть фрейм данных, у вас есть одна строка на (id, id_other)
пару. Вы можете использовать эту строку для генерации строки, соответствующей (id_other, id)
и добавить знак минус к Fx, Fy
и Fz
.
Это можно сделать, добавив следующий шаг прямо перед агрегацией:
from pyspark.sql import functions as F
sumCols = ["Fx", "Fy", "Fz"]
oppositeSums = [ (-F.col(c)).alias(c) for c in sumCols]
df_F_cartesian = df_F_cartesian.select(F.explode(F.array(
F.struct(F.col("id"), *sumCols),
F.struct(F.col("id_other").alias("id"), *oppositeSums)
)).alias("s")).select("s.*")
Комментарии:
1. Я попробовал этот подход, и, к сожалению, он почти в 4 раза медленнее, чем оригинал; возможно, я попробую какой-то другой способ генерации симметричных данных
2. Итак, целью было оптимизировать работу. В spark деление объема вычислений на два на самом деле не имеет большого значения (но именно об этом и был вопрос), что действительно важно, так это перемешивание и способ разделения данных. Сколько у вас различных идентификаторов, сколько исполнителей / ядер и сколько времени занимает ваша работа?
3. Да, я начинаю думать, что, возможно, деление вычисления на 2 на самом деле не сделает его заметно быстрее. У меня 64000 идентификаторов. При локальной работе на 8 потоках cpu параллелизм по умолчанию 16, это занимает около 5 минут. При удаленном запуске на 17 исполнителях по 2 ядра в каждом параллелизм по умолчанию 64, это занимает около минуты. И тогда мне нужно будет повторить это тысячи раз (сам алгоритм очень неэффективен tbh)