Фильтровать фрейм данных в UDF, вызываемом с помощью другого фрейма данных

#apache-spark #pyspark

#apache-spark #pyspark

Вопрос:

Я пытаюсь это сделать:

  1. Принимает данные из 2 таблиц
  2. Используя поле СЕКУНД из таблицы 1, выясните, к какому сеансу он принадлежит в таблице 2. Таким образом, если сеанс выполняется с 10:00 до 11:00, а транзакция в таблице 1 произошла в 10:30, она будет соответствовать этому сеансу.

Код, который у меня есть, похож на приведенный ниже. Я беру значение из таблицы 1 и передаю его в UDF. Используя это значение, я хочу иметь возможность фильтровать другой UDF, чтобы возвращать номер сеанса.

Это не работает, поскольку я получаю эту ошибку.

 pickle.PicklingError: Could not serialize object: Py4JError: An error occurred while calling o73.__getnewargs__. Trace:
py4j.Py4JException: Method __getnewargs__([]) does not exist 
  

Есть идеи, почему это может быть?

 dpi_data = spark_session.sql("Select *, ((time_hour*3600)   (time_minute *60)   time_second) as seconds from table1 where hour = 04 and dt = "   yday_date )
dpi_sessions = spark_session.sql("select *, lead(seconds,1) over(partition by user order by seconds) as end_time from (select user, apn, ((time_hour*3600)   (time_minute *60)   time_second) as seconds from table2 where hour = 04 and dt = "   yday_date   ")x" )

def getsession(seconds):
    output = dpi_sessions.filter((dpi_sessions.start_time <= seconds) amp; (dpi_sessions.end_time >= seconds))
    print(output)
    return 'sss'

myudf = udf(getsession, StringType())
dpi_data = dpi_data.withColumn('apn', myudf(dpi_data.seconds))
  

Входными данными являются: Table1

введите описание изображения здесь

Таблица 2: введите описание изображения здесь

И вывод : введите описание изображения здесь

Итак, здесь мы сопоставляем пользователя с правильным сеансом. Где временная метка из таблицы 1 находится между временем начала и окончания таблицы 2.

Комментарии:

1. Не могли бы вы, пожалуйста, добавить некоторые примеры данных и ожидаемый результат? Проблема, которую вы видите, заключается в том, что вы ссылаетесь в своей функции udf на другую таблицу.

Ответ №1:

Я думаю, что вы неправильно используете UDFS. Вы не можете обрабатывать значение столбца в определенной строке (за раз) и ссылаться на другой фрейм данных где-то еще.

Я считаю, что решение вашей проблемы заключается в том, что вам нужно join просмотреть таблицы, а затем проверить, какие seconds принадлежат какому сеансу (происходящему между start и end ).

Давайте пройдемся по нему.

 # We set up the problem

# First DataFrame
dpi_data_columns = ["user", "seconds"]
data1 = [(272927, 31924), (272927, 32000), (272927, 45000), (272927, 78000), (272927, 79000)]
dpi_data_rdd = spark.sparkContext.parallelize(data1)
# We rename user to user1 because to avoid column name duplicates post join 
dpi_data = dpi_data_rdd.toDF(dpi_data_columns).withColumnRenamed("user", "user1")
    
dpi_data.show()
# ------ ------- 
#| user1|seconds|
# ------ ------- 
#|272927|  31924|
#|272927|  32000|
#|272927|  45000|
#|272927|  78000|
#|272927|  79000|
# ------ ------- 

# First DataFrame
dpi_sessions_columns = ["user", "start", "end", "key"]
data2 = [(272927, 15000, 40000, "Paid"), (272927, 40001, 86000, "Unpaid")]
dpi_sessions_rdd = spark.sparkContext.parallelize(data2)
# We rename user to user2 because to avoid column name duplicates post join 
dpi_sessions = dpi_sessions_rdd.toDF(dpi_sessions_columns).withColumnRenamed("user", "user2")

dpi_sessions.show()
# ------ ----- ----- ------ 
#| user2|start|  end|   key|
# ------ ----- ----- ------ 
#|272927|15000|40000|  Paid|
#|272927|40001|86000|Unpaid|
# ------ ----- ----- ------ 
  

Хорошо, пока все хорошо. Теперь мы просто присоединяемся.

 from pyspark.sql.functions import col

join_condition = [dpi_data.seconds >= dpi_sessions.start, dpi_data.seconds <= dpi_sessions.end]
# We join and select target columns, renaming 'user1' or 'user2' back to 'user'
dpi_data_sessions = dpi_data.join(dpi_sessions, join_condition)
     .select(col("user1").alias("user"), col("seconds"), col("key").alias("out"))

dpi_data_sessions.show()
# ------ ------- ------ 
#|  user|seconds|   out|
# ------ ------- ------ 
#|272927|  31924|  Paid|
#|272927|  32000|  Paid|
#|272927|  45000|Unpaid|
#|272927|  78000|Unpaid|
#|272927|  79000|Unpaid|
# ------ ------- ------ 

  

И это тот результат, к которому вы стремились, UDF не требуется.