эффективное симметричное вычисление в spark

#python #apache-spark #pyspark

#python #apache-spark #pyspark

Вопрос:

Распространенная конструкция, которую я видел в алгоритмах, содержащих симметрию,

 for (int i = 0; i < n ; i  ) {
    for (int j = i 1; j < n ; j  ) {
        [compute x]
        objects[i][j]  = x;
        objects[j][i] -= x;
    }
}
  

это (при сохранении сложности O (n ^ 2)) уменьшает объем вычислений, необходимых для использования симметрии. Не могли бы вы сказать мне, как можно внедрить такую оптимизацию в коде pyspark?

Например, я написал код, который вычисляет силу на единицу массы, действующую на каждую частицу в системе по формуле (где r — позиция):

          N    m_j*(r_i - r_j)
F = -G * Σ   -----------------
        i!=j   |r_i - r_j|^3
  

В нем я сначала делаю перекрестное произведение моего фрейма данных с самим собой, чтобы получить каждое попарное взаимодействие, а затем объединяю их все по идентификатору, чтобы получить общую силу, действующую на каждую частицу:

 def calc_F(df_clust, G=1):

    # cartesian product of the dataframe with itself
    renameCols = [f"`{col}` as `{col}_other`" for col in df_clust.columns]
    df_cart = df_clust.crossJoin(df_clust.selectExpr(renameCols))
    df_clust_cartesian = df_cart.filter("id != id_other")

    df_F_cartesian = df_clust_cartesian.selectExpr("id", "id_other", "m_other",
                                                   "`x` - `x_other` as `diff(x)`",
                                                   "`y` - `y_other` as `diff(y)`",
                                                   "`z` - `z_other` as `diff(z)`"
                                                   )
    df_F_cartesian = df_F_cartesian.selectExpr("id", "id_other",
                                               "`diff(x)` * `m_other` as `num(x)`",
                                               "`diff(y)` * `m_other` as `num(y)`",
                                               "`diff(z)` * `m_other` as `num(z)`",
                                               "sqrt(`diff(x)` * `diff(x)`   `diff(y)`"
                                               "* `diff(y)`   `diff(z)` * `diff(z)`) as `denom`",
                                               )
    df_F_cartesian = df_F_cartesian.selectExpr("id", "id_other",
                                               "`num(x)` / pow(`denom`, 3) as `Fx`",
                                               "`num(y)` / pow(`denom`, 3) as `Fy`",
                                               "`num(z)` / pow(`denom`, 3) as `Fz`",
                                               )
    # squish back to inital particles
    sumCols = ["Fx", "Fy", "Fz"]
    df_agg = df_F_cartesian.groupBy("id").sum(*sumCols)
    renameCols = [f"`sum({col})` as `{col}`" for col in sumCols]
    df_F = df_agg.selectExpr("id", *renameCols)

    df_F = df_F.selectExpr("id",
                           f"`Fx` * {-G} as Fx",
                           f"`Fy` * {-G} as Fy",
                           f"`Fz` * {-G} as Fz")

    return df_F
  

Но я знаю, что сила между двумя частицами симметрична — F_ij = -F_ji (я предполагаю, что все массы равны) — поэтому здесь я вычисляю количество сил дважды, вместо их повторного использования. Поэтому в данном конкретном случае я бы хотел df_clust_cartesian = df_cart.filter("id != id_other") df_clust_cartesian = df_cart.filter("id < id_other") , например, обратиться к и каким-то образом повторно использовать эти силы при вычислении общей силы во второй части функции. (Конечно, в идеале я хочу научиться делать это вообще)

Пример ввода для этого случая будет

 a = sc.parallelize([
    [0.48593906,-0.52435857,-0.53198230,0.46153894,-0.33775792E-01,-0.32276499,0.15625001E-04,1],
    [-0.65960690E-01,0.80844238E-01,-0.27603051,-0.57578009,1.1078150,-0.29340765,0.15625001E-04,2],
    [-0.34809157E-01,0.76795481E-01,-0.39087987,-0.55399138,-0.17386098,0.59250806E-01,0.15625001E-04,3]
])                                                           

from pyspark.sql.types import * 
 
clust_input = StructType([ 
    StructField('x',  DoubleType(), False), 
    StructField('y',  DoubleType(), False), 
    StructField('z',  DoubleType(), False), 
    StructField('vx', DoubleType(), False), 
    StructField('vy', DoubleType(), False), 
    StructField('vz', DoubleType(), False), 
    StructField('m',  DoubleType(), False), 
    StructField('id', IntegerType(), False) 
])    

df_clust = a.toDF(schema=clust_input) 
  

Ответ №1:

По сути, вы хотите вычислять свою формулу только тогда, когда id < other_id и использовать этот результат для генерации по симметрии всех элементов, для которых id > other_id .

Вам просто нужно будет изменить свой фильтр следующим образом

 df_clust_cartesian = df_cart.filter("id < id_other")
  

Затем, когда df_F_cartesian у вас есть фрейм данных, у вас есть одна строка на (id, id_other) пару. Вы можете использовать эту строку для генерации строки, соответствующей (id_other, id) и добавить знак минус к Fx, Fy и Fz .

Это можно сделать, добавив следующий шаг прямо перед агрегацией:

 from pyspark.sql import functions as F

sumCols = ["Fx", "Fy", "Fz"]
oppositeSums = [ (-F.col(c)).alias(c) for c in sumCols]
df_F_cartesian = df_F_cartesian.select(F.explode(F.array(
    F.struct(F.col("id"), *sumCols),
    F.struct(F.col("id_other").alias("id"), *oppositeSums)
)).alias("s")).select("s.*")
  

Комментарии:

1. Я попробовал этот подход, и, к сожалению, он почти в 4 раза медленнее, чем оригинал; возможно, я попробую какой-то другой способ генерации симметричных данных

2. Итак, целью было оптимизировать работу. В spark деление объема вычислений на два на самом деле не имеет большого значения (но именно об этом и был вопрос), что действительно важно, так это перемешивание и способ разделения данных. Сколько у вас различных идентификаторов, сколько исполнителей / ядер и сколько времени занимает ваша работа?

3. Да, я начинаю думать, что, возможно, деление вычисления на 2 на самом деле не сделает его заметно быстрее. У меня 64000 идентификаторов. При локальной работе на 8 потоках cpu параллелизм по умолчанию 16, это занимает около 5 минут. При удаленном запуске на 17 исполнителях по 2 ядра в каждом параллелизм по умолчанию 64, это занимает около минуты. И тогда мне нужно будет повторить это тысячи раз (сам алгоритм очень неэффективен tbh)