Сбой PySpark при создании нового столбца путем применения функции к существующим столбцам в больших df

#python #apache-spark #pyspark #apache-spark-sql

Вопрос:

У меня есть следующий кадр данных, созданный из сжатого файла 10 гб .gz в формате csv:

  ------------------- ---------- -------- ---- 
|           tweet_id|      date|    time|lang|
 ------------------- ---------- -------- ---- 
|1212360731695427584|2020-01-01|13:11:37|  en|
|1212470713338286081|2020-01-01|20:28:39|  ru|
|1212537749485449216|2020-01-02|00:55:01|  ru|
 ------------------- ---------- -------- ---- 
 

Я пытаюсь создать новый столбец, преобразовав столбцы строк даты и времени в метку времени unix:

 from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType, StringType
from datetime import datetime, date
import time

spark = SparkSession.builder.appName("Tweets").getOrCreate()
df = spark.read.csv('tweets.gz', header=True, sep=r't')

def tounixtime(date_s, time_s):
    if None in (date_s, time_s):
        return -1
    
    ymd = tuple([int(x) for x in date_s.split("-")])
    t = [int(x) for x in time_s.split(":")]
    d = date(*ymd).timetuple()
    return int(time.mktime(d)   t[0] * 3600   t[1] * 60   t[2])
        
tounix = udf(tounixtime, IntegerType())

df.withColumn('timestamp', tounix(df.date, df.time)).show()
 

Я получаю исключение, что на каком-то этапе процесса произошла ошибка, и python не смог подключиться повторно. Я не уверен, что здесь не так

Комментарии:

1. есть ли причина использовать UDF вместо собственной unix_timestamp функции spark?

2. Я не знал об этом. Это никогда не упоминалось в других функциях StackOverflow относительно этого преобразования. Спасибо, я попробую

Ответ №1:

Без использования какой-либо функции простое приведение может выполнить эту работу, так как ваши данные довольно аккуратны :

 from pyspark.sql import functions as F

df_2 = df.withColumn(
    "tmst", F.concat_ws(" ", F.col("date"), F.col("time")).cast("timestamp")
)  # or F.concat(F.col("date"), F.lit(" "), F.col("time"))

df_2.show()
 ------------------- ---------- -------- ---- ------------------- 
|           tweet_id|      date|    time|lang|               tmst|
 ------------------- ---------- -------- ---- ------------------- 
|1212360731695427584|2020-01-01|13:11:37|  en|2020-01-01 13:11:37|
|1212470713338286081|2020-01-01|20:28:39|  ru|2020-01-01 20:28:39|
|1212537749485449216|2020-01-02|00:55:01|  ru|2020-01-02 00:55:01|
 ------------------- ---------- -------- ---- ------------------- 

df_2.printSchema()
root
 |-- tweet_id: long (nullable = true)
 |-- date: string (nullable = true)
 |-- time: string (nullable = true)
 |-- lang: string (nullable = true)
 |-- tmst: timestamp (nullable = true)