функция monotonically_increasing_id() не поддерживается потоковыми кадрами данных/наборами данных

#apache-spark #pyspark #apache-kafka #apache-spark-sql #spark-streaming

Вопрос:

Сначала я попытался вычислить сходство между различными столбцами в фрейме данных в ПАКЕТНОМ режиме, и это сработало нормально.

Теперь я пытаюсь передавать данные из кафки в Pyspark и выполнять ту же работу, что и в ПАКЕТНОМ режиме, но когда я пытаюсь это сделать, появляется эта ошибка:

введите описание изображения здесь

В нем это сказано monotonically_increasing_id() is not supported , но мне нужна эта функция, чтобы мой алгоритм работал, и вот код:

         start = timer()
    #idoffre=str(df3["idoffre"])
    idoffre="2F12EE1B-E548-4246-8E49-4CC453A50953"
    print("n idoffre=",idoffre,"n")
    demandeurs = get_offre_demandeurs(idoffre)
    """"
    print("tOffre : ")
    print(o)
    print("t Demandeurs : ")
    i = 1
    for d in demandeurs:
        print("n Demaneur N°",i," : ")
        print(d)
        i  = 1
    print()
    """
    print("idoffre = ",idoffre,"n")
    for d in demandeurs:
        print(d['iddemandeur'], "t",d['fichenamereference'])
    print()
    df=spark.createDataFrame(demandeurs)
    df_ofr=df3
    #df_ofr.select("idoffre","metieroffert","wilaya").show()
    
    df_sim_metier=df.selectExpr("metierprincipal","iddemandeur").crossJoin(df_ofr.selectExpr("metieroffert","idoffre")).sort("idoffre")
    df_sim_salaire=df.selectExpr("salairesouhaite","iddemandeur").crossJoin(df_ofr.selectExpr("salaireoffert","idoffre")).sort("idoffre")
    df_sim_maitrise_info=df.selectExpr("niveaumaitriseoutilinformatique AS niveaumaitriseoutilinformatique_dem ","iddemandeur").crossJoin(df_ofr.selectExpr("niveaumaitriseoutilinformatique AS niveaumaitriseoutilinformatique_ofr","idoffre")).sort("idoffre")
    df_sim_instruction=df.selectExpr("niveauinstruction AS niveauinstruction_dem","iddemandeur").crossJoin(df_ofr.selectExpr("niveauinstruction AS niveauinstruction_ofr","idoffre")).sort("idoffre")
    df_sim_qualification=df.selectExpr("niveauqualification AS niveauqualification_dem","iddemandeur").crossJoin(df_ofr.selectExpr("niveauqualification AS niveauqualification_ofr","idoffre")).sort("idoffre")
    df_sim_permis=df.selectExpr("categoriepermisconduire AS categoriepermisconduire_dem","iddemandeur").crossJoin(df_ofr.selectExpr("categoriepermisconduire AS categoriepermisconduire_ofr","idoffre")).sort("idoffre")
    df_sim_exp=df.selectExpr("experience AS experience_dem","iddemandeur").crossJoin(df_ofr.selectExpr("experience AS experience_ofr","idoffre")).sort("idoffre")
    df_sim_age=df.selectExpr("datenaissance","iddemandeur").crossJoin(df_ofr.selectExpr("ageminimum","agemaximum","idoffre")).sort("idoffre")
    df_sim_melitaire=df.selectExpr("situationmilitaire","iddemandeur").crossJoin(df_ofr.selectExpr("servicemilitaire","idoffre")).sort("idoffre")
    df_sim_typecontrat=df.selectExpr("typecontrat AS typecontrat_dem","iddemandeur").crossJoin(df_ofr.selectExpr("typecontrat AS typecontrat_ofr","idoffre")).sort("idoffre")

    df_sim_instruction=df_sim_instruction.withColumn("niveauinstruction_dem",when(col("niveauinstruction_dem")=="Sans Niveau",0).otherwise(when(col("niveauinstruction_dem")=="PRIMAIRE",1).otherwise(when(col("niveauinstruction_dem")=="MOYEN",2).otherwise(when(col("niveauinstruction_dem")=="Secondaire 1AS",3).otherwise(when(col("niveauinstruction_dem")=="Secondaire 2AS",4).otherwise(when(col("niveauinstruction_dem")=="Secondaire 3AS",5).otherwise(when(col("niveauinstruction_dem")=="UNIVERSITAIRE",6).otherwise(when(col("niveauinstruction_dem")=="Supérieur 1",7).otherwise(when(col("niveauinstruction_dem")=="Supérieur 2",8))))))))))
    df_sim_instruction=df_sim_instruction.withColumn("niveauinstruction_ofr",when(col("niveauinstruction_ofr")=="Sans Niveau",0).otherwise(when(col("niveauinstruction_ofr")=="PRIMAIRE",1).otherwise(when(col("niveauinstruction_ofr")=="MOYEN",2).otherwise(when(col("niveauinstruction_ofr")=="Secondaire 1AS",3).otherwise(when(col("niveauinstruction_ofr")=="Secondaire 2AS",4).otherwise(when(col("niveauinstruction_ofr")=="Secondaire 3AS",5).otherwise(when(col("niveauinstruction_ofr")=="UNIVERSITAIRE",6).otherwise(when(col("niveauinstruction_ofr")=="Supérieur 1",7).otherwise(when(col("niveauinstruction_ofr")=="Supérieur 2",8))))))))))  
    
    df_sim_qualification=df_sim_qualification.withColumn("niveauqualification_dem",when(col("niveauqualification_dem")=="Sans Qualification",0).otherwise(when(col("niveauqualification_dem")=="Personnel d'aide",1).otherwise(when(col("niveauqualification_dem")=="Personnel Qualifié",2).otherwise(when(col("niveauqualification_dem")=="Techniciens et techniciens supérieurs",3).otherwise(when(col("niveauqualification_dem")=="Cadres et cadres superieurs",4).otherwise(when(col("niveauqualification_dem")=="Personnel hautement qualifie",5)))))))
    df_sim_qualification=df_sim_qualification.withColumn("niveauqualification_ofr",when(col("niveauqualification_ofr")=="Sans Qualification",0).otherwise(when(col("niveauqualification_ofr")=="Personnel d'aide",1).otherwise(when(col("niveauqualification_ofr")=="Personnel Qualifié",2).otherwise(when(col("niveauqualification_ofr")=="Techniciens et techniciens supérieurs",3).otherwise(when(col("niveauqualification_ofr")=="Cadres et cadres superieurs",4).otherwise(when(col("niveauqualification_ofr")=="Personnel hautement qualifie",5)))))))

    
    
    
    #df_spec4=df_spec4.withColumn("similarite_act_comp_spec",groupby('iddemandeur').agg((sum("similarity act_comp") / count('iddemandeur')).alias('similarite_act_comp_spec')))
    df_sim_metier=df_sim_metier.withColumn("similarite_metier",when(col("metierprincipal")==col("metieroffert"),1).otherwise(0)).drop("metierprincipal","metieroffert")
    df_sim_salaire=df_sim_salaire.withColumn("similarite_salaire",when(col("salaireoffert")=="None",1).otherwise(when(col("salairesouhaite")=="None",1).otherwise(when(col("salairesouhaite").cast(DoubleType()) <= col("salaireoffert").cast(DoubleType()),1).otherwise(0)))).drop("salairesouhaite","salaireoffert")
    df_sim_maitrise_info=df_sim_maitrise_info.withColumn("similarite_informatique",when(col("niveaumaitriseoutilinformatique_ofr")=="None",0).otherwise(when(col("niveaumaitriseoutilinformatique_ofr").cast(IntegerType())<=col("niveaumaitriseoutilinformatique_dem").cast(IntegerType()),1).otherwise(0))).drop("niveaumaitriseoutilinformatique_dem","niveaumaitriseoutilinformatique_ofr")
    df_sim_instruction=df_sim_instruction.withColumn("similarite_instruction",when(col("niveauinstruction_dem")>=col("niveauinstruction_ofr"),1).otherwise(0)).drop("niveauinstruction_dem","niveauinstruction_ofr")
    df_sim_qualification=df_sim_qualification.withColumn("similarite_qualification",when(col("niveauqualification_dem")>=col("niveauqualification_ofr"),1).otherwise(0)).drop("niveauqualification_dem","niveauqualification_ofr")
    df_sim_permis=df_sim_permis.withColumn("similarite_permis",when(col("categoriepermisconduire_ofr")=="None",0).otherwise(when(col("categoriepermisconduire_dem")==col("categoriepermisconduire_ofr"),1).otherwise(0))).drop("categoriepermisconduire_dem","categoriepermisconduire_ofr")
    df_sim_exp=df_sim_exp.withColumn("similarite_exp",when(ceil(col("experience_dem").cast(DoubleType()))>=ceil(col("experience_ofr").cast(DoubleType())),1).otherwise(0)).drop("experience_dem","experience_ofr")
    df_sim_melitaire=df_sim_melitaire.withColumn("similarite_militaire",when(col("situationmilitaire")==col("servicemilitaire"),1).otherwise(0)).drop("situationmilitaire","servicemilitaire")
    df_sim_typecontrat=df_sim_typecontrat.withColumn("similarite_contrat",when(col("typecontrat_dem")==col("typecontrat_ofr"),1).otherwise(0)).drop("typecontrat_dem","typecontrat_ofr")
    df_sim_age=df_sim_age.withColumn("similarite_age",when(col("agemaximum")=="None",when(col("ageminimum")=="None",1).otherwise(when(col("ageminimum")<=floor(datediff(current_date(),col("datenaissance").cast(DateType()))/365.25),1).otherwise(0))).otherwise(when(col("agemaximum")>=floor(datediff(current_date(),col("datenaissance").cast(DateType()))/365.25),1).otherwise(0))).drop("datenaissance","ageminimum","agemaximum")
    #df_sim_act_base=spark.createDataFrame()
    #df_sim_comp_base=spark.createDataFrame()
    #df_sim_act_comp_spec=spark.createDataFrame()
    df_sim_act_base=similarite_act_base(df,df_ofr)
    df_sim_comp_base=similarite_comp_base(df,df_ofr)
    df_sim_act_comp_spec=similarite_act_comp_spec(df,df_ofr)
    
    
    #ofr_size=len(df_ofr.columns)-2
    ofr_size=13
    df_Columns=["niveaumaitriseoutilinformatique","servicemilitaire"]
    #df_ofr=df_ofr.withColumn("column_numbers",when((col("niveaumaitriseoutilinformatique")=="None")amp;(col("servicemilitaire")=="None"),ofr_size-2).otherwise(when((col("niveaumaitriseoutilinformatique")=="None")|(col("servicemilitaire")=="None"),ofr_size-1).otherwise(ofr_size)))
    df_ofr2=df_ofr
    df_ofr=df_ofr.withColumn("column_numbers",count_nones(df_ofr,df_Columns)).select("niveaumaitriseoutilinformatique","servicemilitaire","column_numbers")
    df_ofr=df_ofr.withColumn("Nbr",ofr_size- col("column_numbers"))
    df_ofr2=df_ofr.select("Nbr")
    df_ofr2=df_ofr2.crossJoin(df.select("iddemandeur"))
    #df_ofr2=df_ofr2.select("Nbr")
    #df_ofr2.show(30)
    #offre_size=df_ofr.select("column_numbers").collect()[0][0]
    #offre_size
    
    

    df_sim_metier=df_sim_metier.withColumn('row_index', row_number().over(Window.orderBy(monotonically_increasing_id())))
    df_sim_salaire=df_sim_salaire.withColumn('row_index', row_number().over(Window.orderBy(monotonically_increasing_id()))).drop("iddemandeur","idoffre")
    df_sim_maitrise_info=df_sim_maitrise_info.withColumn('row_index', row_number().over(Window.orderBy(monotonically_increasing_id()))).drop("iddemandeur","idoffre")
    df_sim_instruction=df_sim_instruction.withColumn('row_index', row_number().over(Window.orderBy(monotonically_increasing_id()))).drop("iddemandeur","idoffre")
    df_sim_qualification=df_sim_qualification.withColumn('row_index', row_number().over(Window.orderBy(monotonically_increasing_id()))).drop("iddemandeur","idoffre")
    df_sim_permis=df_sim_permis.withColumn('row_index', row_number().over(Window.orderBy(monotonically_increasing_id()))).drop("iddemandeur","idoffre")
    df_sim_exp=df_sim_exp.withColumn('row_index', row_number().over(Window.orderBy(monotonically_increasing_id()))).drop("iddemandeur","idoffre")
    df_sim_melitaire=df_sim_melitaire.withColumn('row_index', row_number().over(Window.orderBy(monotonically_increasing_id()))).drop("iddemandeur","idoffre")
    df_sim_typecontrat=df_sim_typecontrat.withColumn('row_index', row_number().over(Window.orderBy(monotonically_increasing_id()))).drop("iddemandeur","idoffre")
    df_sim_age=df_sim_age.withColumn('row_index', row_number().over(Window.orderBy(monotonically_increasing_id()))).drop("iddemandeur","idoffre")
    df_sim_act_base=df_sim_act_base.withColumn('row_index', row_number().over(Window.orderBy(monotonically_increasing_id()))).drop("iddemandeur","idoffre","activitebase_ofr","activitebase_dm")
    df_sim_comp_base=df_sim_comp_base.withColumn('row_index', row_number().over(Window.orderBy(monotonically_increasing_id()))).drop("iddemandeur","idoffre","comp_base_ofr","comp_base_dm")
    df_sim_act_comp_spec=df_sim_act_comp_spec.withColumn('row_index', row_number().over(Window.orderBy(monotonically_increasing_id()))).drop("iddemandeur","idoffre")


    df_sim1 = df_sim_metier.join(df_sim_salaire, on=["row_index"]).join(df_sim_maitrise_info, on=["row_index"]).join(df_sim_instruction, on=["row_index"]).join(df_sim_qualification, on=["row_index"]).join(df_sim_permis, on=["row_index"]).join(df_sim_exp, on=["row_index"]).join(df_sim_melitaire, on=["row_index"]).join(df_sim_typecontrat, on=["row_index"]).join(df_sim_age, on=["row_index"]).join(df_sim_act_base, on=["row_index"]).join(df_sim_comp_base, on=["row_index"]).join(df_sim_act_comp_spec, on=["row_index"]).drop("row_index")

    #df_sim1.select("similarite_permis","similarite_exp").show()
    df_sim1=df_sim1.withColumn('row_index', row_number().over(Window.orderBy(monotonically_increasing_id())))
    df_ofr2=df_ofr2.withColumn('row_index', row_number().over(Window.orderBy(monotonically_increasing_id()))).drop("iddemandeur")
    df_sim1=df_ofr2.join(df_sim1, on=["row_index"]).drop("row_index")
    #df_sim1.select("idoffre","iddemandeur","Nbr","Similarite_activitebase").show()
    
    
    df_sim1=df_sim1.withColumn("SIMILARITE",(col("similarite_metier") col("similarite_salaire") col("similarite_informatique") col("similarite_instruction") col("similarite_qualification") col("similarite_permis") col("similarite_exp") col("similarite_militaire") col("similarite_contrat") col("similarite_age") col("similarite_act_comp_spec") col("Similarite_activitebase") col("Similarite_comp_base")) / col("Nbr"))
    #df_sim1=df_sim1.withColumn("SIMILARITE",(col("similarite_metier") col("similarite_salaire") col("similarite_informatique") col("similarite_instruction") col("similarite_qualification") col("similarite_permis") col("similarite_exp") col("similarite_militaire") col("similarite_contrat") col("similarite_age") col("similarite_act_comp_spec") col("Similarite_activitebase") col("Similarite_comp_base")) / offre_size)
    #resultat_elastic=df_sim1.collect()
    
    """
    for row in resultat_elastic:
        dict_score = {}
        dict_score['iddemandeur']=row['iddemandeur']
        dict_score['idoffre']=row['idoffre']
        dict_score['similarite_metier']=row['similarite_metier']
        dict_score['similarite_informatique']=row['similarite_informatique']
        dict_score['similarite_instruction']=row['similarite_instruction']
        dict_score['similarite_qualification']=row['similarite_qualification']
        dict_score['similarite_permis']=row['similarite_permis']
        dict_score['similarite_exp']=row['similarite_exp']
        dict_score['similarite_militaire']=row['similarite_militaire']
        dict_score['similarite_contrat']=row['similarite_contrat']
        dict_score['similarite_age']=row['similarite_age']
        dict_score['similarite_activitebase']=row['Similarite_activitebase']
        dict_score['similarite_comp_base']=row['Similarite_comp_base']
        dict_score['similarite_act_comp_spec']=row['similarite_act_comp_spec']
        dict_score['similarite']=row['SIMILARITE']
        dict_score['count_attributs']=row['Nbr']
        es.index(index="test_score2",body=dict_score)
        print ("nn -- SCORE FINAL ", dict_score['iddemandeur'],' : ',dict_score['similarite'])
        
        print(dict_score['similarite_activitebase'])
        print(dict_score['similarite_comp_base'])
        print(dict_score['similarite_act_comp_spec'])
    
    """
    end = timer()
 

Зная, что я использую Spark 3.1.2 и потоковую передачу с помощью структурированной потоковой передачи spark.

Есть ли способ заменить monotonically_increasing_id() , чтобы этот код нормально работал в потоковой передаче ?

Ответ №1:

Поскольку вы транслируете потоковую передачу из Kafka, вы можете попробовать использовать offset или timestamp , доступные из вашего источника kafka, вместо этого заказать свои данные.

Например, Изменено из документации spark kafka

 df = spark 
  .read 
  .format("kafka") 
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2") 
  .option("subscribe", "topic1") 
  .load()

df.selectExpr(
    "CAST(key AS STRING)", 
    "CAST(value AS STRING)",
    "offset as row_index",
    "timestamp as source_timestamp", 
)
 

Смещение теперь станет вашим row_index , или вы можете попробовать

 df_sim1=df_sim1.withColumn('row_index', row_number().over(Window.orderBy("source_timestamp")))
df_ofr2=df_ofr2.withColumn('row_index', row_number().over(Window.orderBy("source_timestamp"))).drop("iddemandeur")
 

Дайте мне знать, если это сработает для вас.