Использование широковещательных значений внутри PySpark UDF выбрасывает «java.io.IOException: не удалось удалить исходный файл»

#python #apache-spark #pyspark #pyspark-dataframes

#python #apache-spark #pyspark

Вопрос:

Я пытаюсь запустить свой скрипт на локальном компьютере и продолжаю получать исключение.

Ниже приведена воспроизводимая уменьшенная версия полного скрипта.

Настройка: мастер и 4 рабочих (последовали за этим).

 import os
import sys
import math as math
import pickle
import time
import datetime

import pandas as pd
import json
import numpy as np
from pyspark.sql.functions import broadcast
import scipy.linalg as linalg
from scipy import interpolate

from pyspark.context import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql import Window, DataFrame
from pyspark.sql import functions as f
from pyspark.sql.types import IntegralType, StructType, ArrayType, StructField, StringType, DoubleType
import pyspark

@f.pandas_udf(returnType= DoubleType())
def square(r : pd.Series) -> pd.Series:
    offset_value = offset.value
    return (r * r )   offset_value

if __name__ == "__main__":
    spark = SparkSession.builder.appName("Spark").getOrCreate()

    sc = spark.sparkContext
    
    offset = sc.broadcast(10)
    
    x = pd.Series(np.arange(0,1000000))

    df = spark.createDataFrame(pd.DataFrame(x, columns=["x"]))
    df = df.withColumn('sq',square(df.x)).withColumn('sqsq', square(f.col('sq')))
    start_time = datetime.datetime.now()
    print(f"Show start time {start_time}")
    df.write.mode('overwrite').parquet('output')
    
    end_time = datetime.datetime.now()
    print(f"Show End time {end_time}")
    print(f"Total time taken {end_time - start_time}")
    
    offset.unpersist()

    spark.stop()
  

Исключение:

     Task 1 in stage 0.0 failed 4 times, most recent failure: Lost task 1.3 in stage 0.0 (TID 28, 172.16.1.118, executor 0): java.io.IOException: Failed to delete original file 
'C:UserswpfAppDataLocalTempspark-5daa943f-52e5-4d42-9bca-be5a7835da65executor-f828cf24-85ed-44c1-b0a1-4c310090b657spark-cbf4b9e6-76f6-4763-904a-3464b89261e8broadcast8871232241519207229' 
after copy to 'C:UserswpfAppDataLocalTempspark-5daa943f-52e5-4d42-9bca-be5a7835da65executor-f828cf24-85ed-44c1-b0a1-4c310090b657blockmgr-315d3c00-8817-45ad-b5df-9a866003d8d737broadcast_0_python'
  

Если я удалю использование offset_value внутри udf, он будет работать без проблем. но мне нужны широковещательные значения внутри udf.

Может кто-нибудь, пожалуйста, дайте мне знать, что не так с моим скриптом?