#apache-spark #pyspark #apache-kafka #apache-spark-sql #spark-streaming
Вопрос:
Сначала я попытался вычислить сходство между различными столбцами в фрейме данных в ПАКЕТНОМ режиме, и это сработало нормально.
Теперь я пытаюсь передавать данные из кафки в Pyspark и выполнять ту же работу, что и в ПАКЕТНОМ режиме, но когда я пытаюсь это сделать, появляется эта ошибка:
В нем это сказано monotonically_increasing_id() is not supported
, но мне нужна эта функция, чтобы мой алгоритм работал, и вот код:
start = timer()
#idoffre=str(df3["idoffre"])
idoffre="2F12EE1B-E548-4246-8E49-4CC453A50953"
print("n idoffre=",idoffre,"n")
demandeurs = get_offre_demandeurs(idoffre)
""""
print("tOffre : ")
print(o)
print("t Demandeurs : ")
i = 1
for d in demandeurs:
print("n Demaneur N°",i," : ")
print(d)
i = 1
print()
"""
print("idoffre = ",idoffre,"n")
for d in demandeurs:
print(d['iddemandeur'], "t",d['fichenamereference'])
print()
df=spark.createDataFrame(demandeurs)
df_ofr=df3
#df_ofr.select("idoffre","metieroffert","wilaya").show()
df_sim_metier=df.selectExpr("metierprincipal","iddemandeur").crossJoin(df_ofr.selectExpr("metieroffert","idoffre")).sort("idoffre")
df_sim_salaire=df.selectExpr("salairesouhaite","iddemandeur").crossJoin(df_ofr.selectExpr("salaireoffert","idoffre")).sort("idoffre")
df_sim_maitrise_info=df.selectExpr("niveaumaitriseoutilinformatique AS niveaumaitriseoutilinformatique_dem ","iddemandeur").crossJoin(df_ofr.selectExpr("niveaumaitriseoutilinformatique AS niveaumaitriseoutilinformatique_ofr","idoffre")).sort("idoffre")
df_sim_instruction=df.selectExpr("niveauinstruction AS niveauinstruction_dem","iddemandeur").crossJoin(df_ofr.selectExpr("niveauinstruction AS niveauinstruction_ofr","idoffre")).sort("idoffre")
df_sim_qualification=df.selectExpr("niveauqualification AS niveauqualification_dem","iddemandeur").crossJoin(df_ofr.selectExpr("niveauqualification AS niveauqualification_ofr","idoffre")).sort("idoffre")
df_sim_permis=df.selectExpr("categoriepermisconduire AS categoriepermisconduire_dem","iddemandeur").crossJoin(df_ofr.selectExpr("categoriepermisconduire AS categoriepermisconduire_ofr","idoffre")).sort("idoffre")
df_sim_exp=df.selectExpr("experience AS experience_dem","iddemandeur").crossJoin(df_ofr.selectExpr("experience AS experience_ofr","idoffre")).sort("idoffre")
df_sim_age=df.selectExpr("datenaissance","iddemandeur").crossJoin(df_ofr.selectExpr("ageminimum","agemaximum","idoffre")).sort("idoffre")
df_sim_melitaire=df.selectExpr("situationmilitaire","iddemandeur").crossJoin(df_ofr.selectExpr("servicemilitaire","idoffre")).sort("idoffre")
df_sim_typecontrat=df.selectExpr("typecontrat AS typecontrat_dem","iddemandeur").crossJoin(df_ofr.selectExpr("typecontrat AS typecontrat_ofr","idoffre")).sort("idoffre")
df_sim_instruction=df_sim_instruction.withColumn("niveauinstruction_dem",when(col("niveauinstruction_dem")=="Sans Niveau",0).otherwise(when(col("niveauinstruction_dem")=="PRIMAIRE",1).otherwise(when(col("niveauinstruction_dem")=="MOYEN",2).otherwise(when(col("niveauinstruction_dem")=="Secondaire 1AS",3).otherwise(when(col("niveauinstruction_dem")=="Secondaire 2AS",4).otherwise(when(col("niveauinstruction_dem")=="Secondaire 3AS",5).otherwise(when(col("niveauinstruction_dem")=="UNIVERSITAIRE",6).otherwise(when(col("niveauinstruction_dem")=="Supérieur 1",7).otherwise(when(col("niveauinstruction_dem")=="Supérieur 2",8))))))))))
df_sim_instruction=df_sim_instruction.withColumn("niveauinstruction_ofr",when(col("niveauinstruction_ofr")=="Sans Niveau",0).otherwise(when(col("niveauinstruction_ofr")=="PRIMAIRE",1).otherwise(when(col("niveauinstruction_ofr")=="MOYEN",2).otherwise(when(col("niveauinstruction_ofr")=="Secondaire 1AS",3).otherwise(when(col("niveauinstruction_ofr")=="Secondaire 2AS",4).otherwise(when(col("niveauinstruction_ofr")=="Secondaire 3AS",5).otherwise(when(col("niveauinstruction_ofr")=="UNIVERSITAIRE",6).otherwise(when(col("niveauinstruction_ofr")=="Supérieur 1",7).otherwise(when(col("niveauinstruction_ofr")=="Supérieur 2",8))))))))))
df_sim_qualification=df_sim_qualification.withColumn("niveauqualification_dem",when(col("niveauqualification_dem")=="Sans Qualification",0).otherwise(when(col("niveauqualification_dem")=="Personnel d'aide",1).otherwise(when(col("niveauqualification_dem")=="Personnel Qualifié",2).otherwise(when(col("niveauqualification_dem")=="Techniciens et techniciens supérieurs",3).otherwise(when(col("niveauqualification_dem")=="Cadres et cadres superieurs",4).otherwise(when(col("niveauqualification_dem")=="Personnel hautement qualifie",5)))))))
df_sim_qualification=df_sim_qualification.withColumn("niveauqualification_ofr",when(col("niveauqualification_ofr")=="Sans Qualification",0).otherwise(when(col("niveauqualification_ofr")=="Personnel d'aide",1).otherwise(when(col("niveauqualification_ofr")=="Personnel Qualifié",2).otherwise(when(col("niveauqualification_ofr")=="Techniciens et techniciens supérieurs",3).otherwise(when(col("niveauqualification_ofr")=="Cadres et cadres superieurs",4).otherwise(when(col("niveauqualification_ofr")=="Personnel hautement qualifie",5)))))))
#df_spec4=df_spec4.withColumn("similarite_act_comp_spec",groupby('iddemandeur').agg((sum("similarity act_comp") / count('iddemandeur')).alias('similarite_act_comp_spec')))
df_sim_metier=df_sim_metier.withColumn("similarite_metier",when(col("metierprincipal")==col("metieroffert"),1).otherwise(0)).drop("metierprincipal","metieroffert")
df_sim_salaire=df_sim_salaire.withColumn("similarite_salaire",when(col("salaireoffert")=="None",1).otherwise(when(col("salairesouhaite")=="None",1).otherwise(when(col("salairesouhaite").cast(DoubleType()) <= col("salaireoffert").cast(DoubleType()),1).otherwise(0)))).drop("salairesouhaite","salaireoffert")
df_sim_maitrise_info=df_sim_maitrise_info.withColumn("similarite_informatique",when(col("niveaumaitriseoutilinformatique_ofr")=="None",0).otherwise(when(col("niveaumaitriseoutilinformatique_ofr").cast(IntegerType())<=col("niveaumaitriseoutilinformatique_dem").cast(IntegerType()),1).otherwise(0))).drop("niveaumaitriseoutilinformatique_dem","niveaumaitriseoutilinformatique_ofr")
df_sim_instruction=df_sim_instruction.withColumn("similarite_instruction",when(col("niveauinstruction_dem")>=col("niveauinstruction_ofr"),1).otherwise(0)).drop("niveauinstruction_dem","niveauinstruction_ofr")
df_sim_qualification=df_sim_qualification.withColumn("similarite_qualification",when(col("niveauqualification_dem")>=col("niveauqualification_ofr"),1).otherwise(0)).drop("niveauqualification_dem","niveauqualification_ofr")
df_sim_permis=df_sim_permis.withColumn("similarite_permis",when(col("categoriepermisconduire_ofr")=="None",0).otherwise(when(col("categoriepermisconduire_dem")==col("categoriepermisconduire_ofr"),1).otherwise(0))).drop("categoriepermisconduire_dem","categoriepermisconduire_ofr")
df_sim_exp=df_sim_exp.withColumn("similarite_exp",when(ceil(col("experience_dem").cast(DoubleType()))>=ceil(col("experience_ofr").cast(DoubleType())),1).otherwise(0)).drop("experience_dem","experience_ofr")
df_sim_melitaire=df_sim_melitaire.withColumn("similarite_militaire",when(col("situationmilitaire")==col("servicemilitaire"),1).otherwise(0)).drop("situationmilitaire","servicemilitaire")
df_sim_typecontrat=df_sim_typecontrat.withColumn("similarite_contrat",when(col("typecontrat_dem")==col("typecontrat_ofr"),1).otherwise(0)).drop("typecontrat_dem","typecontrat_ofr")
df_sim_age=df_sim_age.withColumn("similarite_age",when(col("agemaximum")=="None",when(col("ageminimum")=="None",1).otherwise(when(col("ageminimum")<=floor(datediff(current_date(),col("datenaissance").cast(DateType()))/365.25),1).otherwise(0))).otherwise(when(col("agemaximum")>=floor(datediff(current_date(),col("datenaissance").cast(DateType()))/365.25),1).otherwise(0))).drop("datenaissance","ageminimum","agemaximum")
#df_sim_act_base=spark.createDataFrame()
#df_sim_comp_base=spark.createDataFrame()
#df_sim_act_comp_spec=spark.createDataFrame()
df_sim_act_base=similarite_act_base(df,df_ofr)
df_sim_comp_base=similarite_comp_base(df,df_ofr)
df_sim_act_comp_spec=similarite_act_comp_spec(df,df_ofr)
#ofr_size=len(df_ofr.columns)-2
ofr_size=13
df_Columns=["niveaumaitriseoutilinformatique","servicemilitaire"]
#df_ofr=df_ofr.withColumn("column_numbers",when((col("niveaumaitriseoutilinformatique")=="None")amp;(col("servicemilitaire")=="None"),ofr_size-2).otherwise(when((col("niveaumaitriseoutilinformatique")=="None")|(col("servicemilitaire")=="None"),ofr_size-1).otherwise(ofr_size)))
df_ofr2=df_ofr
df_ofr=df_ofr.withColumn("column_numbers",count_nones(df_ofr,df_Columns)).select("niveaumaitriseoutilinformatique","servicemilitaire","column_numbers")
df_ofr=df_ofr.withColumn("Nbr",ofr_size- col("column_numbers"))
df_ofr2=df_ofr.select("Nbr")
df_ofr2=df_ofr2.crossJoin(df.select("iddemandeur"))
#df_ofr2=df_ofr2.select("Nbr")
#df_ofr2.show(30)
#offre_size=df_ofr.select("column_numbers").collect()[0][0]
#offre_size
df_sim_metier=df_sim_metier.withColumn('row_index', row_number().over(Window.orderBy(monotonically_increasing_id())))
df_sim_salaire=df_sim_salaire.withColumn('row_index', row_number().over(Window.orderBy(monotonically_increasing_id()))).drop("iddemandeur","idoffre")
df_sim_maitrise_info=df_sim_maitrise_info.withColumn('row_index', row_number().over(Window.orderBy(monotonically_increasing_id()))).drop("iddemandeur","idoffre")
df_sim_instruction=df_sim_instruction.withColumn('row_index', row_number().over(Window.orderBy(monotonically_increasing_id()))).drop("iddemandeur","idoffre")
df_sim_qualification=df_sim_qualification.withColumn('row_index', row_number().over(Window.orderBy(monotonically_increasing_id()))).drop("iddemandeur","idoffre")
df_sim_permis=df_sim_permis.withColumn('row_index', row_number().over(Window.orderBy(monotonically_increasing_id()))).drop("iddemandeur","idoffre")
df_sim_exp=df_sim_exp.withColumn('row_index', row_number().over(Window.orderBy(monotonically_increasing_id()))).drop("iddemandeur","idoffre")
df_sim_melitaire=df_sim_melitaire.withColumn('row_index', row_number().over(Window.orderBy(monotonically_increasing_id()))).drop("iddemandeur","idoffre")
df_sim_typecontrat=df_sim_typecontrat.withColumn('row_index', row_number().over(Window.orderBy(monotonically_increasing_id()))).drop("iddemandeur","idoffre")
df_sim_age=df_sim_age.withColumn('row_index', row_number().over(Window.orderBy(monotonically_increasing_id()))).drop("iddemandeur","idoffre")
df_sim_act_base=df_sim_act_base.withColumn('row_index', row_number().over(Window.orderBy(monotonically_increasing_id()))).drop("iddemandeur","idoffre","activitebase_ofr","activitebase_dm")
df_sim_comp_base=df_sim_comp_base.withColumn('row_index', row_number().over(Window.orderBy(monotonically_increasing_id()))).drop("iddemandeur","idoffre","comp_base_ofr","comp_base_dm")
df_sim_act_comp_spec=df_sim_act_comp_spec.withColumn('row_index', row_number().over(Window.orderBy(monotonically_increasing_id()))).drop("iddemandeur","idoffre")
df_sim1 = df_sim_metier.join(df_sim_salaire, on=["row_index"]).join(df_sim_maitrise_info, on=["row_index"]).join(df_sim_instruction, on=["row_index"]).join(df_sim_qualification, on=["row_index"]).join(df_sim_permis, on=["row_index"]).join(df_sim_exp, on=["row_index"]).join(df_sim_melitaire, on=["row_index"]).join(df_sim_typecontrat, on=["row_index"]).join(df_sim_age, on=["row_index"]).join(df_sim_act_base, on=["row_index"]).join(df_sim_comp_base, on=["row_index"]).join(df_sim_act_comp_spec, on=["row_index"]).drop("row_index")
#df_sim1.select("similarite_permis","similarite_exp").show()
df_sim1=df_sim1.withColumn('row_index', row_number().over(Window.orderBy(monotonically_increasing_id())))
df_ofr2=df_ofr2.withColumn('row_index', row_number().over(Window.orderBy(monotonically_increasing_id()))).drop("iddemandeur")
df_sim1=df_ofr2.join(df_sim1, on=["row_index"]).drop("row_index")
#df_sim1.select("idoffre","iddemandeur","Nbr","Similarite_activitebase").show()
df_sim1=df_sim1.withColumn("SIMILARITE",(col("similarite_metier") col("similarite_salaire") col("similarite_informatique") col("similarite_instruction") col("similarite_qualification") col("similarite_permis") col("similarite_exp") col("similarite_militaire") col("similarite_contrat") col("similarite_age") col("similarite_act_comp_spec") col("Similarite_activitebase") col("Similarite_comp_base")) / col("Nbr"))
#df_sim1=df_sim1.withColumn("SIMILARITE",(col("similarite_metier") col("similarite_salaire") col("similarite_informatique") col("similarite_instruction") col("similarite_qualification") col("similarite_permis") col("similarite_exp") col("similarite_militaire") col("similarite_contrat") col("similarite_age") col("similarite_act_comp_spec") col("Similarite_activitebase") col("Similarite_comp_base")) / offre_size)
#resultat_elastic=df_sim1.collect()
"""
for row in resultat_elastic:
dict_score = {}
dict_score['iddemandeur']=row['iddemandeur']
dict_score['idoffre']=row['idoffre']
dict_score['similarite_metier']=row['similarite_metier']
dict_score['similarite_informatique']=row['similarite_informatique']
dict_score['similarite_instruction']=row['similarite_instruction']
dict_score['similarite_qualification']=row['similarite_qualification']
dict_score['similarite_permis']=row['similarite_permis']
dict_score['similarite_exp']=row['similarite_exp']
dict_score['similarite_militaire']=row['similarite_militaire']
dict_score['similarite_contrat']=row['similarite_contrat']
dict_score['similarite_age']=row['similarite_age']
dict_score['similarite_activitebase']=row['Similarite_activitebase']
dict_score['similarite_comp_base']=row['Similarite_comp_base']
dict_score['similarite_act_comp_spec']=row['similarite_act_comp_spec']
dict_score['similarite']=row['SIMILARITE']
dict_score['count_attributs']=row['Nbr']
es.index(index="test_score2",body=dict_score)
print ("nn -- SCORE FINAL ", dict_score['iddemandeur'],' : ',dict_score['similarite'])
print(dict_score['similarite_activitebase'])
print(dict_score['similarite_comp_base'])
print(dict_score['similarite_act_comp_spec'])
"""
end = timer()
Зная, что я использую Spark 3.1.2 и потоковую передачу с помощью структурированной потоковой передачи spark.
Есть ли способ заменить monotonically_increasing_id()
, чтобы этот код нормально работал в потоковой передаче ?
Ответ №1:
Поскольку вы транслируете потоковую передачу из Kafka, вы можете попробовать использовать offset
или timestamp
, доступные из вашего источника kafka, вместо этого заказать свои данные.
Например, Изменено из документации spark kafka
df = spark
.read
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1")
.load()
df.selectExpr(
"CAST(key AS STRING)",
"CAST(value AS STRING)",
"offset as row_index",
"timestamp as source_timestamp",
)
Смещение теперь станет вашим row_index
, или вы можете попробовать
df_sim1=df_sim1.withColumn('row_index', row_number().over(Window.orderBy("source_timestamp")))
df_ofr2=df_ofr2.withColumn('row_index', row_number().over(Window.orderBy("source_timestamp"))).drop("iddemandeur")
Дайте мне знать, если это сработает для вас.