#scala #apache-spark #pyspark #apache-spark-sql #apache-spark-mllib
#scala #apache-spark #pyspark #apache-spark-sql #apache-spark-mllib
Вопрос:
Использование :
http://spark.apache.org/docs/1.6.1/mllib-frequent-pattern-mining.html
Код Python:
from pyspark.mllib.fpm import FPGrowth
model = FPGrowth.train(dataframe,0.01,10)
Scala:
import org.apache.spark.mllib.fpm.FPGrowth
import org.apache.spark.rdd.RDD
val data = sc.textFile("data/mllib/sample_fpgrowth.txt")
val transactions: RDD[Array[String]] = data.map(s => s.trim.split(' '))
val fpg = new FPGrowth()
.setMinSupport(0.2)
.setNumPartitions(10)
val model = fpg.run(transactions)
model.freqItemsets.collect().foreach { itemset =>
println(itemset.items.mkString("[", ",", "]") ", " itemset.freq)
}
val minConfidence = 0.8
model.generateAssociationRules(minConfidence).collect().foreach { rule =>
println(
rule.antecedent.mkString("[", ",", "]")
" => " rule.consequent .mkString("[", ",", "]")
", " rule.confidence)
}
Из приведенного здесь кода видно, что часть scala не имеет минимальной достоверности.
def trainFPGrowthModel(
data: JavaRDD[java.lang.Iterable[Any]],
minSupport: Double,
numPartitions: Int): FPGrowthModel[Any] = {
val fpg = new FPGrowth()
.setMinSupport(minSupport)
.setNumPartitions(numPartitions)
val model = fpg.run(data.rdd.map(_.asScala.toArray))
new FPGrowthModelWrapper(model)
}
Как добавить minConfidence для создания правила ассоциации в случае pyspark? Мы видим, что у scala есть пример, но у python его нет.
Ответ №1:
Spark > = 2.2
Существует DataFrame
базовый ml
API, который обеспечивает AssociationRules
:
from pyspark.ml.fpm import FPGrowth
data = ...
fpm = FPGrowth(minSupport=0.3, minConfidence=0.9).fit(data)
associationRules = fpm.associationRules.
Spark < 2.2
На данный момент PySpark не поддерживает извлечение правил ассоциации ( DataFrame
FPGrowth
API на основе с поддержкой Python находится в стадии разработки SPARK-1450), но мы можем легко решить эту проблему.
Сначала вам нужно будет установить SBT (просто перейдите на страницу загрузки) и следуйте инструкциям для вашей операционной системы.
Далее вам нужно будет создать простой проект Scala, содержащий только два файла:
.
├── AssociationRulesExtractor.scala
└── build.sbt
Вы можете настроить его позже, чтобы следовать установленной структуре каталогов.
Затем добавьте следующее к build.sbt
(настройте версии Scala и Spark, чтобы они соответствовали той, которую вы используете):
name := "fpm"
version := "1.0"
scalaVersion := "2.10.6"
val sparkVersion = "1.6.2"
libraryDependencies = Seq(
"org.apache.spark" %% "spark-core" % sparkVersion,
"org.apache.spark" %% "spark-mllib" % sparkVersion
)
и, следуя AssociationRulesExtractor.scala
:
package com.example.fpm
import org.apache.spark.mllib.fpm.AssociationRules.Rule
import org.apache.spark.rdd.RDD
object AssociationRulesExtractor {
def apply(rdd: RDD[Rule[String]]) = {
rdd.map(rule => Array(
rule.confidence, rule.javaAntecedent, rule.javaConsequent
))
}
}
Откройте эмулятор терминала по вашему выбору, перейдите в корневой каталог проекта и вызовите:
sbt package
Он сгенерирует файл jar в целевом каталоге. Например, в Scala 2.10 это будет:
target/scala-2.10/fpm_2.10-1.0.jar
Запустите оболочку PySpark или используйте spark-submit
и передайте путь к сгенерированному файлу jar, чтобы --driver-class-path
:
bin/pyspark --driver-class-path /path/to/fpm_2.10-1.0.jar
В нелокальном режиме:
bin/pyspark --driver-class-path /path/to/fpm_2.10-1.0.jar --jars /path/to/fpm_2.10-1.0.jar
В режиме кластера jar должен присутствовать на всех узлах.
Добавьте несколько удобных оболочек:
from pyspark import SparkContext
from pyspark.mllib.fpm import FPGrowthModel
from pyspark.mllib.common import _java2py
from collections import namedtuple
rule = namedtuple("Rule", ["confidence", "antecedent", "consequent"])
def generateAssociationRules(model, minConfidence):
# Get active context
sc = SparkContext.getOrCreate()
# Retrieve extractor object
extractor = sc._gateway.jvm.com.example.fpm.AssociationRulesExtractor
# Compute rules
java_rules = model._java_model.generateAssociationRules(minConfidence)
# Convert rules to Python RDD
return _java2py(sc, extractor.apply(java_rules)).map(lambda x:rule(*x))
Наконец, вы можете использовать эти помощники как функцию:
generateAssociationRules(model, 0.9)
или как метод:
FPGrowthModel.generateAssociationRules = generateAssociationRules
model.generateAssociationRules(0.9)
Это решение зависит от внутренних методов PySpark, поэтому не гарантируется, что оно будет переносимым между версиями.
Комментарии:
1. Вы можете генерировать и получать правила ассоциации в PySpark, используя Spark <2.2 с небольшим количеством кода py4j, я добавлю код в качестве ответа.
Ответ №2:
Вы можете генерировать и получать правила ассоциации в PySpark, используя Spark <2.2 с небольшим количеством кода py4j:
# model was produced by FPGrowth.train() method
rules = sorted(model._java_model.generateAssociationRules(0.9).collect(),
key=lambda x: x.confidence(), reverse=True)
for rule in rules[:200]:
# rule variable has confidence(), consequent() and antecedent()
# methods for individual value access.
print rule