Объединение вычислений FPGrowth в pyspark против scala

#scala #apache-spark #pyspark #apache-spark-sql #apache-spark-mllib

#scala #apache-spark #pyspark #apache-spark-sql #apache-spark-mllib

Вопрос:

Использование :

http://spark.apache.org/docs/1.6.1/mllib-frequent-pattern-mining.html

Код Python:

 from pyspark.mllib.fpm import FPGrowth
model = FPGrowth.train(dataframe,0.01,10)
  

Scala:

 import org.apache.spark.mllib.fpm.FPGrowth
import org.apache.spark.rdd.RDD

val data = sc.textFile("data/mllib/sample_fpgrowth.txt")

val transactions: RDD[Array[String]] = data.map(s => s.trim.split(' '))

val fpg = new FPGrowth()
  .setMinSupport(0.2)
  .setNumPartitions(10)
val model = fpg.run(transactions)

model.freqItemsets.collect().foreach { itemset =>
  println(itemset.items.mkString("[", ",", "]")   ", "   itemset.freq)
}

val minConfidence = 0.8
model.generateAssociationRules(minConfidence).collect().foreach { rule =>
  println(
    rule.antecedent.mkString("[", ",", "]")
        " => "   rule.consequent .mkString("[", ",", "]")
        ", "   rule.confidence)
}
  

Из приведенного здесь кода видно, что часть scala не имеет минимальной достоверности.

 def trainFPGrowthModel(
      data: JavaRDD[java.lang.Iterable[Any]],
      minSupport: Double,
      numPartitions: Int): FPGrowthModel[Any] = {
    val fpg = new FPGrowth()
      .setMinSupport(minSupport)
      .setNumPartitions(numPartitions)

    val model = fpg.run(data.rdd.map(_.asScala.toArray))
    new FPGrowthModelWrapper(model)
  }
  

Как добавить minConfidence для создания правила ассоциации в случае pyspark? Мы видим, что у scala есть пример, но у python его нет.

Ответ №1:

Spark > = 2.2

Существует DataFrame базовый ml API, который обеспечивает AssociationRules :

 from pyspark.ml.fpm import FPGrowth

data = ...

fpm = FPGrowth(minSupport=0.3, minConfidence=0.9).fit(data)
associationRules = fpm.associationRules.
  

Spark < 2.2

На данный момент PySpark не поддерживает извлечение правил ассоциации ( DataFrame FPGrowth API на основе с поддержкой Python находится в стадии разработки SPARK-1450), но мы можем легко решить эту проблему.

Сначала вам нужно будет установить SBT (просто перейдите на страницу загрузки) и следуйте инструкциям для вашей операционной системы.

Далее вам нужно будет создать простой проект Scala, содержащий только два файла:

 .
├── AssociationRulesExtractor.scala
└── build.sbt
  

Вы можете настроить его позже, чтобы следовать установленной структуре каталогов.

Затем добавьте следующее к build.sbt (настройте версии Scala и Spark, чтобы они соответствовали той, которую вы используете):

 name := "fpm"

version := "1.0"

scalaVersion := "2.10.6"

val sparkVersion = "1.6.2"

libraryDependencies   = Seq(
  "org.apache.spark" %% "spark-core" % sparkVersion,
  "org.apache.spark" %% "spark-mllib" % sparkVersion
)
  

и, следуя AssociationRulesExtractor.scala :

 package com.example.fpm

import org.apache.spark.mllib.fpm.AssociationRules.Rule
import org.apache.spark.rdd.RDD

object AssociationRulesExtractor {
  def apply(rdd: RDD[Rule[String]]) = {
    rdd.map(rule => Array(
      rule.confidence, rule.javaAntecedent, rule.javaConsequent
    ))
  }
}
  

Откройте эмулятор терминала по вашему выбору, перейдите в корневой каталог проекта и вызовите:

 sbt package
  

Он сгенерирует файл jar в целевом каталоге. Например, в Scala 2.10 это будет:

 target/scala-2.10/fpm_2.10-1.0.jar
  

Запустите оболочку PySpark или используйте spark-submit и передайте путь к сгенерированному файлу jar, чтобы --driver-class-path :

 bin/pyspark --driver-class-path /path/to/fpm_2.10-1.0.jar
  

В нелокальном режиме:

 bin/pyspark --driver-class-path /path/to/fpm_2.10-1.0.jar --jars /path/to/fpm_2.10-1.0.jar
  

В режиме кластера jar должен присутствовать на всех узлах.

Добавьте несколько удобных оболочек:

 from pyspark import SparkContext
from pyspark.mllib.fpm import FPGrowthModel
from pyspark.mllib.common import _java2py
from collections import namedtuple


rule = namedtuple("Rule", ["confidence", "antecedent", "consequent"])

def generateAssociationRules(model, minConfidence):
    # Get active context
    sc = SparkContext.getOrCreate()

    # Retrieve extractor object
    extractor = sc._gateway.jvm.com.example.fpm.AssociationRulesExtractor

    # Compute rules
    java_rules = model._java_model.generateAssociationRules(minConfidence)

    # Convert rules to Python RDD
    return _java2py(sc, extractor.apply(java_rules)).map(lambda x:rule(*x))
  

Наконец, вы можете использовать эти помощники как функцию:

 generateAssociationRules(model, 0.9)
  

или как метод:

 FPGrowthModel.generateAssociationRules = generateAssociationRules
model.generateAssociationRules(0.9)
  

Это решение зависит от внутренних методов PySpark, поэтому не гарантируется, что оно будет переносимым между версиями.

Комментарии:

1. Вы можете генерировать и получать правила ассоциации в PySpark, используя Spark <2.2 с небольшим количеством кода py4j, я добавлю код в качестве ответа.

Ответ №2:

Вы можете генерировать и получать правила ассоциации в PySpark, используя Spark <2.2 с небольшим количеством кода py4j:

 # model was produced by FPGrowth.train() method
rules = sorted(model._java_model.generateAssociationRules(0.9).collect(), 
    key=lambda x: x.confidence(), reverse=True)
for rule in rules[:200]:
    # rule variable has confidence(), consequent() and antecedent() 
    # methods for individual value access.
    print rule