ошибка pandas_udf (applyInPandas) с указанием формата кортежа

#pandas #pyspark #pyspark-dataframes

#pandas #pyspark

Вопрос:

У меня есть следующий искровой фрейм данных :

  ------------------- ------- --------- ------- --------- --------------- 
|               Time|  Close|   Volume|   Open|Num_Ticks|  Dollar Volume|
 ------------------- ------- --------- ------- --------- --------------- 
|2015-06-01 00:00:00|2109.25|1337694.0| 2109.5|      1.0| 2.8215310695E9|
|2015-06-02 00:00:00|2106.75|1442673.0| 2106.5|      1.0|3.03935134275E9|
|2015-06-03 00:00:00| 2116.0|1310989.0|2116.25|      1.0|  2.774052724E9|
|2015-06-04 00:00:00| 2099.0|1716475.0| 2099.0|      1.0|  3.602881025E9|
|2015-06-05 00:00:00|2092.25|1459933.0| 2092.0|      1.0|3.05454481925E9|
|2015-06-08 00:00:00|2078.25|1290580.0| 2079.0|      1.0|  2.682147885E9|
|2015-06-09 00:00:00| 2080.0|1446234.0| 2080.5|      1.0|   3.00816672E9|
|2015-06-10 00:00:00| 2107.0|1664080.0| 2106.0|      1.0|   3.50621656E9|
|2015-06-11 00:00:00|2109.25|1480391.0|2109.25|      1.0|3.12251471675E9|
|2015-06-12 00:00:00| 2093.0|1130566.0| 2094.0|      1.0|  2.366274638E9|
|2015-06-15 00:00:00| 2084.0|1077154.0|2083.75|      1.0|  2.244788936E9|
|2015-06-16 00:00:00| 2097.5| 790233.0|2097.25|      1.0| 1.6575137175E9|
|2015-06-17 00:00:00|2089.25|1577521.0|2088.75|      1.0|3.29583574925E9|
|2015-06-18 00:00:00|2114.75|1899198.0| 2114.0|      1.0| 4.0163289705E9|
|2015-06-19 00:00:00|2097.75|1236103.0|2097.75|      1.0|2.59303506825E9|
|2015-06-22 00:00:00|2112.75|1095590.0|2113.25|      1.0| 2.3147077725E9|
|2015-06-23 00:00:00| 2116.5| 835219.0| 2117.0|      1.0| 1.7677410135E9|
|2015-06-24 00:00:00| 2099.5|1153248.0| 2099.5|      1.0|  2.421244176E9|
|2015-06-25 00:00:00| 2094.0|1213961.0| 2094.0|      1.0|  2.542034334E9|
|2015-06-26 00:00:00|2095.75|1318744.0|2095.75|      1.0|  2.763757738E9|
 ------------------- ------- --------- ------- --------- --------------- 

root
 |-- Time: timestamp (nullable = true)
 |-- Close: double (nullable = true)
 |-- Volume: double (nullable = true)
 |-- Open: double (nullable = true)
 |-- Num_Ticks: double (nullable = true)
 |-- Dollar Volume: double (nullable = true)

  

и я применяю следующую функцию:

 def getDailyVol(pdf, span0=100):
    pdf = pdf.set_index('Time') # Line #1
    close = pdf.Close
    df0 = close.index.searchsorted(close.index - pd.Timedelta(days=1))  # Line #1
    df0 = df0[df0>0]
    df0 = pd.Series(close.index[df0 - 1], index=close.index[close.shape[0] - df0.shape[0]:])
    df0 = close.loc[df0.index] / close.loc[df0.values].values - 1 # daily returns
    df0 = df0.ewm(span=span0).std()
    return df0

daily_vol = es_dbars.groupBy().applyInPandas(getDailyVol,
 schema='Time timestamp, Close double'
  )

  

Однако, когда я пытаюсь показать результаты, он выдает следующую ошибку 'tuple' object has no attribute 'set_index' , и если я прокомментирую строку # 1, она выдает ошибку в строке # 2, указывающую что-то, что searchsorted нельзя применить, даже если я уже загрузил все следующие библиотеки :

 import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
import pandas as pd
import databricks.koalas as ks 
import os, shutil
import numpy as np 
import matplotlib.pyplot as pl


spark = SparkSession.builder 
        .master('local[3]') 
        .appName('chapter3') 
        .config("spark.jars.packages", "io.delta:delta-core_2.12:0.7.0") 
        .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") 
        .config('spark.executor.memory','6gb') 
        .getOrCreate()

sc = spark.sparkContext
spark.sql("set spark.sql.shuffle.partitions = 3")

  

Если кто-нибудь может помочь мне заметить, что я делаю неправильно, или если мне нужно сделать что-то еще, я буду признателен.

редактировать: я предоставляю набор данных воспроизводимым способом следующим образом:

 ,Time,Close,Volume,Open,Num_Ticks,Dollar Volume
0,2015-06-01,2109.25,1337694.0,2109.5,1.0,2821531069.5
1,2015-06-02,2106.75,1442673.0,2106.5,1.0,3039351342.75
2,2015-06-03,2116.0,1310989.0,2116.25,1.0,2774052724.0
3,2015-06-04,2099.0,1716475.0,2099.0,1.0,3602881025.0
4,2015-06-05,2092.25,1459933.0,2092.0,1.0,3054544819.25
5,2015-06-08,2078.25,1290580.0,2079.0,1.0,2682147885.0
6,2015-06-09,2080.0,1446234.0,2080.5,1.0,3008166720.0
7,2015-06-10,2107.0,1664080.0,2106.0,1.0,3506216560.0
8,2015-06-11,2109.25,1480391.0,2109.25,1.0,3122514716.75
9,2015-06-12,2093.0,1130566.0,2094.0,1.0,2366274638.0
  

Ответ №1:

Если вы предоставите два аргумента своей функции apply ( getDailyVol в вашем случае), ваша функция не будет работать, поскольку PySpark интерпретирует первый аргумент как keys ( tuple ), а второй как pdf ( pd.DataFrame ) . Вы можете прочитать это в их документации.

Чтобы исправить это, вам нужно будет определить свою функцию следующим образом:

 def getDailyVol(pdf):
    pdf = pdf.set_index('Time') # Line #1
    close = pdf.Close
    df0 = close.index.searchsorted(close.index - pd.Timedelta(days=1))  # Line #1
    df0 = df0[df0>0]
    df0 = pd.Series(close.index[df0 - 1], index=close.index[close.shape[0] - df0.shape[0]:])
    df0 = close.loc[df0.index] / close.loc[df0.values].values - 1 # daily returns
    df0 = df0.ewm(span=100).std()
    return df0
  

В качестве альтернативы вы можете переписать его в:

 def getDailyVol(df, span0):
    
    def apply_function(pdf):
        pdf = pdf.set_index('Time') # Line #1
        close = pdf.Close
        df0 = close.index.searchsorted(close.index - pd.Timedelta(days=1))  # Line #1
        df0 = df0[df0>0]
        df0 = pd.Series(close.index[df0 - 1], index=close.index[close.shape[0] - df0.shape[0]:])
        df0 = close.loc[df0.index] / close.loc[df0.values].values - 1 # daily returns
        df0 = df0.ewm(span=span0).std()
        return df0
    
    return df.groupBy().applyInPandas(getDailyVol, schema='Time timestamp, Close double')

daily_vol = getDailyVol(df=es_dbars, span0=100)
  

Который затем будет работать для произвольного span0 . Я не могу проверить, работает ли это на 100%, поскольку ваш пример довольно сложно получить на моем локальном компьютере. Но надеюсь, это поможет.