#apache-spark #pyspark #spark-streaming
#apache-искра #пыспарк #искрящийся поток
Вопрос:
Я создаю классификатор в потоковой передаче spark, но у меня возникли две проблемы с транслируемыми переменными и накопителями.
- При загрузке обученного конвейера я получаю ошибку рассола. Я использую приведенный ниже код. При загрузке модели в функцию foreachRDD она работает нормально. Есть идеи, что я здесь делаю не так?
def getModel(sparkContext): if ("model" not in globals()): # read model model = PipelineModel.load("path") globals()["model"] = sparkContext.broadcast(model) return globals()["model"]
- У меня есть накопитель для общего количества сделанных прогнозов, который работает, но я также пытаюсь подсчитать, сколько раз предсказывается каждый класс. Я пытался использовать эту функцию, но я получаю сообщение об ошибке, что я пытаюсь транслировать RDD.
def getAccumulatorClass(sparkContext, label): name = "accum_class_" label if (name not in globals()): globals()[name] = sparkContext.accumulator(0) return globals()[name] def process_stream(rdd): # code for predicting classes classified_by_class = df_classified.groupBy('prediction').count() count_classes = udf(lambda pred, count: getAccumulatorClass(rdd.context, pred).add(int(count))) classified_by_class.select(count_classes(col("prediction"), col("count"))) fileStream.foreachRDD(process_stream)
Содержание классифицированных по классам является:
содержание классифицированных по классам
Спасибо вам за любую помощь/указания в решении этих проблем!