#pandas #pyspark #pyspark-dataframes
#pandas #pyspark
Вопрос:
У меня есть следующий искровой фрейм данных :
------------------- ------- --------- ------- --------- ---------------
| Time| Close| Volume| Open|Num_Ticks| Dollar Volume|
------------------- ------- --------- ------- --------- ---------------
|2015-06-01 00:00:00|2109.25|1337694.0| 2109.5| 1.0| 2.8215310695E9|
|2015-06-02 00:00:00|2106.75|1442673.0| 2106.5| 1.0|3.03935134275E9|
|2015-06-03 00:00:00| 2116.0|1310989.0|2116.25| 1.0| 2.774052724E9|
|2015-06-04 00:00:00| 2099.0|1716475.0| 2099.0| 1.0| 3.602881025E9|
|2015-06-05 00:00:00|2092.25|1459933.0| 2092.0| 1.0|3.05454481925E9|
|2015-06-08 00:00:00|2078.25|1290580.0| 2079.0| 1.0| 2.682147885E9|
|2015-06-09 00:00:00| 2080.0|1446234.0| 2080.5| 1.0| 3.00816672E9|
|2015-06-10 00:00:00| 2107.0|1664080.0| 2106.0| 1.0| 3.50621656E9|
|2015-06-11 00:00:00|2109.25|1480391.0|2109.25| 1.0|3.12251471675E9|
|2015-06-12 00:00:00| 2093.0|1130566.0| 2094.0| 1.0| 2.366274638E9|
|2015-06-15 00:00:00| 2084.0|1077154.0|2083.75| 1.0| 2.244788936E9|
|2015-06-16 00:00:00| 2097.5| 790233.0|2097.25| 1.0| 1.6575137175E9|
|2015-06-17 00:00:00|2089.25|1577521.0|2088.75| 1.0|3.29583574925E9|
|2015-06-18 00:00:00|2114.75|1899198.0| 2114.0| 1.0| 4.0163289705E9|
|2015-06-19 00:00:00|2097.75|1236103.0|2097.75| 1.0|2.59303506825E9|
|2015-06-22 00:00:00|2112.75|1095590.0|2113.25| 1.0| 2.3147077725E9|
|2015-06-23 00:00:00| 2116.5| 835219.0| 2117.0| 1.0| 1.7677410135E9|
|2015-06-24 00:00:00| 2099.5|1153248.0| 2099.5| 1.0| 2.421244176E9|
|2015-06-25 00:00:00| 2094.0|1213961.0| 2094.0| 1.0| 2.542034334E9|
|2015-06-26 00:00:00|2095.75|1318744.0|2095.75| 1.0| 2.763757738E9|
------------------- ------- --------- ------- --------- ---------------
root
|-- Time: timestamp (nullable = true)
|-- Close: double (nullable = true)
|-- Volume: double (nullable = true)
|-- Open: double (nullable = true)
|-- Num_Ticks: double (nullable = true)
|-- Dollar Volume: double (nullable = true)
и я применяю следующую функцию:
def getDailyVol(pdf, span0=100):
pdf = pdf.set_index('Time') # Line #1
close = pdf.Close
df0 = close.index.searchsorted(close.index - pd.Timedelta(days=1)) # Line #1
df0 = df0[df0>0]
df0 = pd.Series(close.index[df0 - 1], index=close.index[close.shape[0] - df0.shape[0]:])
df0 = close.loc[df0.index] / close.loc[df0.values].values - 1 # daily returns
df0 = df0.ewm(span=span0).std()
return df0
daily_vol = es_dbars.groupBy().applyInPandas(getDailyVol,
schema='Time timestamp, Close double'
)
Однако, когда я пытаюсь показать результаты, он выдает следующую ошибку 'tuple' object has no attribute 'set_index'
, и если я прокомментирую строку # 1, она выдает ошибку в строке # 2, указывающую что-то, что searchsorted
нельзя применить, даже если я уже загрузил все следующие библиотеки :
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
import pandas as pd
import databricks.koalas as ks
import os, shutil
import numpy as np
import matplotlib.pyplot as pl
spark = SparkSession.builder
.master('local[3]')
.appName('chapter3')
.config("spark.jars.packages", "io.delta:delta-core_2.12:0.7.0")
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
.config('spark.executor.memory','6gb')
.getOrCreate()
sc = spark.sparkContext
spark.sql("set spark.sql.shuffle.partitions = 3")
Если кто-нибудь может помочь мне заметить, что я делаю неправильно, или если мне нужно сделать что-то еще, я буду признателен.
редактировать: я предоставляю набор данных воспроизводимым способом следующим образом:
,Time,Close,Volume,Open,Num_Ticks,Dollar Volume
0,2015-06-01,2109.25,1337694.0,2109.5,1.0,2821531069.5
1,2015-06-02,2106.75,1442673.0,2106.5,1.0,3039351342.75
2,2015-06-03,2116.0,1310989.0,2116.25,1.0,2774052724.0
3,2015-06-04,2099.0,1716475.0,2099.0,1.0,3602881025.0
4,2015-06-05,2092.25,1459933.0,2092.0,1.0,3054544819.25
5,2015-06-08,2078.25,1290580.0,2079.0,1.0,2682147885.0
6,2015-06-09,2080.0,1446234.0,2080.5,1.0,3008166720.0
7,2015-06-10,2107.0,1664080.0,2106.0,1.0,3506216560.0
8,2015-06-11,2109.25,1480391.0,2109.25,1.0,3122514716.75
9,2015-06-12,2093.0,1130566.0,2094.0,1.0,2366274638.0
Ответ №1:
Если вы предоставите два аргумента своей функции apply ( getDailyVol
в вашем случае), ваша функция не будет работать, поскольку PySpark интерпретирует первый аргумент как keys ( tuple
), а второй как pdf ( pd.DataFrame
) . Вы можете прочитать это в их документации.
Чтобы исправить это, вам нужно будет определить свою функцию следующим образом:
def getDailyVol(pdf):
pdf = pdf.set_index('Time') # Line #1
close = pdf.Close
df0 = close.index.searchsorted(close.index - pd.Timedelta(days=1)) # Line #1
df0 = df0[df0>0]
df0 = pd.Series(close.index[df0 - 1], index=close.index[close.shape[0] - df0.shape[0]:])
df0 = close.loc[df0.index] / close.loc[df0.values].values - 1 # daily returns
df0 = df0.ewm(span=100).std()
return df0
В качестве альтернативы вы можете переписать его в:
def getDailyVol(df, span0):
def apply_function(pdf):
pdf = pdf.set_index('Time') # Line #1
close = pdf.Close
df0 = close.index.searchsorted(close.index - pd.Timedelta(days=1)) # Line #1
df0 = df0[df0>0]
df0 = pd.Series(close.index[df0 - 1], index=close.index[close.shape[0] - df0.shape[0]:])
df0 = close.loc[df0.index] / close.loc[df0.values].values - 1 # daily returns
df0 = df0.ewm(span=span0).std()
return df0
return df.groupBy().applyInPandas(getDailyVol, schema='Time timestamp, Close double')
daily_vol = getDailyVol(df=es_dbars, span0=100)
Который затем будет работать для произвольного span0
. Я не могу проверить, работает ли это на 100%, поскольку ваш пример довольно сложно получить на моем локальном компьютере. Но надеюсь, это поможет.