#python #pandas #apache-spark #pyspark #apache-spark-sql
Вопрос:
У меня есть функция ниже, которая работает с фреймом данных pandas
def event_list(df,steps):
df['steps_present'] = df['labels'].apply(lambda x:all(step in x for step in steps))
return df
В фрейме данных есть столбец, называемый метками, со значениями в виде списка. Эта функция принимает фрейм данных и шаги(который является списком) и выводит фрейм данных с новым столбцом «Шаги», если все элементы в списке параметров присутствуют в столбце » фрейм данных
value in df['labels'] = [EBBY , ABBY , JULIE , ROBERTS]
event_list(df,['EBBY','ABBY'])
вернет значение True для этой записи, так как ЭББИ и ЭББИ присутствуют в столбце списка фреймов данных.
Я хотел бы создать аналогичную функцию в pyspark.
Ответ №1:
Вы можете array_except
проверить, присутствует ли каждый элемент в предоставленном списке в столбце метки. Если это так, то размер результата array_except
будет равен 0. Сравнение размера с 0 даст вам логическое значение, как вы хотели.
import pyspark.sql.functions as F
def event_list(df, steps):
return df.withColumn(
'steps_present',
F.size(F.array_except(F.array(*[F.lit(l) for l in steps]), 'labels')) == 0
)
df2 = event_list(df, ["EBBY", "ABBY"])
df2.show(truncate=False)
---------------------------- -------------
|labels |steps_present|
---------------------------- -------------
|[EBBY, ABBY, JULIE, ROBERTS]|true |
|[EBBY, JULIE] |false |
---------------------------- -------------
Ответ №2:
Вы можете преобразовать функцию в UDF, может быть что-то вроде приведенного ниже, что работает.
from pyspark.sql.functions import lit, array
values = [(["EBBY" , "ABBY" , "JULIE" , "ROBERTS"],),
(["EBBY" , "ABBY"],)]
columns = ['labels']
df = spark.createDataFrame(values, columns)
@udf
def event_list(column_to_test, input_values):
return all(value in column_to_test for value in input_values)
steps = ["EBBY", "JULIE"]
df.withColumn("steps_present", event_list(df['labels'], array([lit(x) for x in steps]))).show(truncate=False)