Прогнозы количества потоков Pyspark для каждого класса

#apache-spark #pyspark #spark-streaming

#apache-искра #пыспарк #искрящийся поток

Вопрос:

Я создаю классификатор в потоковой передаче spark, но у меня возникли две проблемы с транслируемыми переменными и накопителями.

  1. При загрузке обученного конвейера я получаю ошибку рассола. Я использую приведенный ниже код. При загрузке модели в функцию foreachRDD она работает нормально. Есть идеи, что я здесь делаю не так?
 def getModel(sparkContext):  if ("model" not in globals()):  # read model  model = PipelineModel.load("path")    globals()["model"] = sparkContext.broadcast(model)  return globals()["model"]  
  1. У меня есть накопитель для общего количества сделанных прогнозов, который работает, но я также пытаюсь подсчитать, сколько раз предсказывается каждый класс. Я пытался использовать эту функцию, но я получаю сообщение об ошибке, что я пытаюсь транслировать RDD.
 def getAccumulatorClass(sparkContext, label):  name = "accum_class_"   label  if (name not in globals()):  globals()[name] = sparkContext.accumulator(0)  return globals()[name]  def process_stream(rdd):  # code for predicting classes  classified_by_class = df_classified.groupBy('prediction').count()  count_classes = udf(lambda pred, count: getAccumulatorClass(rdd.context, pred).add(int(count)))   classified_by_class.select(count_classes(col("prediction"), col("count")))  fileStream.foreachRDD(process_stream)  

Содержание классифицированных по классам является:

содержание классифицированных по классам

Спасибо вам за любую помощь/указания в решении этих проблем!