Априорная реализация Scala / Spark чрезвычайно медленная

#scala #apache-spark #apache-spark-sql

#scala #apache-spark #apache-spark-sql

Вопрос:

Мы пытаемся реализовать алгоритм Apriori в Scala с помощью Spark (вам не нужно знать алгоритм, чтобы ответить на этот вопрос).

Функция, вычисляющая наборы элементов алгоритма Apriori, является freq() . Код правильный, но он становится медленнее после каждой итерации while в freq() функции in, вплоть до того, что требуется несколько секунд для выполнения перекрестного соединения в таблице с 1 строкой с самим собой.

 import System.{exit, nanoTime}
import scala.collection.mutable.WrappedArray
import org.apache.spark.sql.{Column, SparkSession, DataFrame}
import org.apache.spark.sql.functions._
import spark.implicits._

object Main extends Serializable {
  val s = 0.03

  def loadFakeData() : DataFrame = {
    var data = Seq("1 ",
                   "1 2 ",
                   "1 2",
                   "3",
                   "1 2 3 ",
                   "1 2 ")
                .toDF("baskets_str")
                .withColumn("baskets", split('baskets_str, " ").cast("array<int>"))
      data
  }
  
  def combo(a1: WrappedArray[Int], a2: WrappedArray[Int]): Array[Array[Int]] = {
    var a = a1.toSet
    var b = a2.toSet
    var res = a.diff(b).map(b _)    b.diff(a).map(a _)
    return res.map(_.toArray.sortWith(_ < _)).toArray
  }
  val comboUDF = udf[Array[Array[Int]], WrappedArray[Int], WrappedArray[Int]](combo)

  def getCombinations(df: DataFrame): DataFrame = {
    df.crossJoin(df.withColumnRenamed("itemsets", "itemsets_2"))
      .withColumn("combinations", comboUDF(col("itemsets"), col("itemsets_2")))
      .select("combinations")
      .withColumnRenamed("combinations", "itemsets")
      .withColumn("itemsets", explode(col("itemsets")))
      .dropDuplicates()
  }

  def countCombinations(data : DataFrame, combinations: DataFrame) : DataFrame = {
    data.crossJoin(combinations)
      .where(size(array_intersect('baskets, 'itemsets)) === size('itemsets))
      .groupBy("itemsets")
      .count
  }

  def freq() {
    val spark = SparkSession.builder.appName("FreqItemsets")
      .master("local[*]")
      .getOrCreate()

    // data is a dataframe where each row contains an array of integer values
    var data = loadFakeData()
    val basket_count = data.count

    // Itemset is a dataframe containing all possible sets of 1 element
    var itemset : DataFrame = data
                                .select(explode('baskets))
                                .na.drop
                                .dropDuplicates()
                                .withColumnRenamed("col", "itemsets")
                                .withColumn("itemsets", array('itemsets))
    var itemset_count : DataFrame = countCombinations(data, itemset).filter('count > s*basket_count)
    var itemset_counts = List(itemset_count)

    // We iterate creating each time itemsets of length k 1 from itemsets of length k
    // pruning those that do not have enough support
    var stop = (itemset_count.count == 0)
    while(!stop) {
      itemset = getCombinations(itemset_count.select("itemsets"))
      itemset_count = countCombinations(data, itemset).filter('count > s*basket_count)
      stop = (itemset_count.count == 0)
      if (!stop) {
        itemset_counts = itemset_counts :  itemset_count
      }
    }

    spark.stop()
  }
}
  

Комментарии:

1. Стоит отметить, что сам алгоритм заметно неэффективен: O(2^d) (где d — количество элементов хотя бы в одной корзине). Я давно не использовал spark, но мне интересно DataFrame , пересчитываются ли s заново каждый раз (в этом случае itemset может помочь кэширование).

Ответ №1:

Поскольку Spark сохраняет за собой право регенерировать наборы данных в любое время, это может быть тем, что происходит, и в этом случае кэширование результатов дорогостоящих преобразований может привести к значительному повышению производительности.

В этом случае, на первый взгляд, это выглядит как itemset тяжелый удар, поэтому

 itemset = getCombinations(itemset_count.select("itemsets")).cache
  

Может принести дивиденды.

Следует также отметить, что создание списка путем добавления в цикле обычно намного медленнее ( O(n^2) ), чем его создание путем добавления. Если порядок не влияет на правильность itemset_counts , то:

 itemset_counts = itemset_count :: itemset_counts
  

приведет к, по крайней мере, предельному ускорению.