Как применить модель прогнозирования MLflow в потоке?

#python #spark-streaming #databricks #spark-structured-streaming

Вопрос:

У меня есть поток, который считывает готовые к публикации данные функций в уже зарегистрированную модель. Весь код написан на Python. Следующая модель и метаданные функционируют вне потока в обычном ноутбуке. В потоке — это другое дело. Основная проблема заключается в том, что данные, записанные из потока (в целевую таблицу), имеют нулевое предсказание. Другое дело foreachBatch , что функция кажется невосприимчивой даже к намеренно заложенным синтаксическим ошибкам. Никаких признаков проблемы в журналах или отзывах в записной книжке. Это как если бы его не звали.

Я понимаю, что пишу в таблицу дважды (один раз в функции и один раз в writeStream . Только одна запись делает это, и это из — за writeStream -не функции.

Код приведен ниже:

 from pyspark.sql import SparkSession
from pyspark.sql.functions import explode
from pyspark.sql.functions import split

spark = SparkSession 
    .builder 
    .appName("MyTest") 
    .getOrCreate()

# Create a streaming DataFrame
lines = spark.readStream 
    .format("delta") 
    .option('ignoreDeletes','true') 
    .table("schema.transformeddata") 

fixedValueStream = lines.select('feature1','feature2', 'feature3')


# Split the lines into words
def batchpredictions(df, epoch_id):
    
    pyfunc_udf = mlflow.pyfunc.spark_udf(spark, model_uri='runs:/<myrunid>/model')
    prediction = df.withColumn("prediction", pyfunc_udf(struct('feature1','feature2','feature3')))
    prediction.write.mode("append").saveAsTable("schema.transformeddata_prediction")
    

fixedValueStream.writeStream.format("delta").outputMode("append").foreachBatch(batchpredictions).option("checkpointLocation", "/delta/events/_checkpoints/etl-from-json").table("schema.transformeddata_prediction")
 

Входящие данные :

 feature1, feature2, feature3
1       , 5       , 9
2       , 6       , 10
3       , 7       , 11
4       , 8       , 12
 

Исходящие данные

 feature1, feature2, feature3, prediction
1       , 5       , 9       , NULL
2       , 6       , 10      , NULL
3       , 7       , 11      , NULL
4       , 8       , 12      , NULL
 

Есть какие-нибудь подсказки, что я делаю не так?

*ОБНОВЛЕНИЕ: Спасибо Майку за ваши ответы. Я намерен начать оптимизировать свое решение ниже, используя некоторые из предложенных вами вещей. Сейчас мне просто нужно было чем-то заняться, чтобы вернуться к работе. Решение в его текущем состоянии приведено ниже.

 from pyspark.sql import SparkSession
from pyspark.sql.functions import explode
from pyspark.sql.functions import *
import mlflow
import mlflow.xgboost
import xgboost
import numpy as np
import pandas as pd
from pyspark.sql.types import *

# Load model as a PysparkUDF
loaded_model = mlflow.pyfunc.load_model('runs:/<mymodelrunid>/model')

spark = SparkSession 
    .builder 
    .appName("MyTest") 
    .getOrCreate()

# Create a streaming DataFrame
lines = spark.readStream 
    .format("delta") 
    .option('ignoreDeletes','true') 
    .table("<myschema>.<mytableinput>") 
    
fixedValueStream = lines.select('feature1','feature2', 'feature3', 'feature4', 'feature5')

def foreach_batch_function(df, epoch_id):
    #text value of the multi class prediction GREEN, RED, BLUE
    df = df.withColumn("pred_class", lit('    '))
    
    #Prepare 3 holders for the 3 class scores returned from multiclass model. 
    #Done before hand so I don't have to deal with data type/additional column index/key issues.
    df = df.withColumn("prediction_class1", lit(0.00).cast("double"))
    df = df.withColumn("prediction_class2", lit(0.00).cast("double"))
    df = df.withColumn("prediction_class3", lit(0.00).cast("double"))
    
    #Select back into pandas frame
    pd_df = df.select('feature1','feature2', 'feature3', 'feature4', 'feature5','pred_class','prediction_class1','prediction_class2','prediction_class3').toPandas()

    #Pass pandas frame into model and return array of shape [<batch-df-rows-count>][3]
    y_pred = loaded_model.predict(pd_df)
    
    #Retun the max column score
    predicted_idx = np.argmax(y_pred, axis=1)
    
    #Translate said column into end user labels 
    y_pred_class = np.where(predicted_idx == 1, 'GREEN', np.where(predicted_idx == 0, 'RED', 'BLUE' ))
    
    #Assign class to place holder column
    pd_df["pred_class"] = y_pred_class

    #Store the 3 prediction strengths into place holder columns
    pd_df["prediction_class1"] = y_pred[:,0]
    pd_df["prediction_class2"] = y_pred[:,1]
    pd_df["prediction_class3"] = y_pred[:,2]
    
    #Write out back to a monitoring table
    result = spark.createDataFrame(pd_df)
    result.write.option("mergeSchema","true").format("delta").option("header", "true").mode("append").saveAsTable("<myschema>.<mytableoutput>")
    
#write stream out
fixedValueStream.writeStream.foreachBatch(foreach_batch_function).start()
 

Комментарии:

1. зачем вам нужно использовать foreachBatch ? почему бы просто не применить spark UDF к потоку?

2. Спасибо, я работаю над ответами сегодня

Ответ №1:

Как отметил @AlexOtt в комментариях, нет необходимости подавать foreachBatch заявку, поскольку ваш вопрос в настоящее время написан.

Все, что вам нужно сделать, это применить UDF к вашему потоковому фрейму данных с помощью withColumn .


В случае , если вам действительно нужно использовать foreachBatch , возможно, потому, что вы записываете в непотоковый формат приемника, вы можете прочитать ниже, как это сделать.

Если посмотреть документацию foreachBatch в Руководстве по программированию структурированной потоковой передачи, вам не нужны a format и a outputMode в вашем окончательном потоке записи. Вместо этого логика, в которую записываются данные, определяется в функции foreachBatch. Кроме того, использование saveAsTable в потоке также выглядит неправильно.

В целом, ваш код должен выглядеть так:

 def batchpredictions(df, epoch_id):
    # Split the lines into words
    pyfunc_udf = mlflow.pyfunc.spark_udf(spark, model_uri='runs:/<myrunid>/model')
    prediction = df.withColumn("prediction", pyfunc_udf(struct('feature1', 'feature2', 'feature3')))
    prediction.write.mode("append").format("delta").save("/tmp/delta-table")
    

fixedValueStream.writeStream.foreachBatch(batchpredictions).option("checkpointLocation", "/delta/events/_checkpoints/etl-from-json").start()
 

Комментарии:

1. Спасибо, я попробую это в следующий раз. Мне удалось заставить что-то работать с помощью поставщика услуг. Добавлено решение ниже, но оно даст вам высокую оценку и попробует ваш метод завтра.