#python #apache-spark #pyspark
#python #apache-spark #pyspark
Вопрос:
Я не знаю, как выполнить интерполяцию в PySpark, когда фрейм данных содержит много столбцов. Позвольте мне объяснить.
from pyspark.sql.functions import to_timestamp
df = spark.createDataFrame([
("John", "A", "2018-02-01 03:00:00", 60),
("John", "A", "2018-02-01 03:03:00", 66),
("John", "A", "2018-02-01 03:05:00", 70),
("John", "A", "2018-02-01 03:08:00", 76),
("Mo", "A", "2017-06-04 01:05:00", 10),
("Mo", "A", "2017-06-04 01:07:00", 20),
("Mo", "B", "2017-06-04 01:10:00", 35),
("Mo", "B", "2017-06-04 01:11:00", 40),
], ("webID", "aType", "timestamp", "counts")).withColumn(
"timestamp", to_timestamp("timestamp")
)
Мне нужно группировать webID
и интерполировать counts
значения с интервалом в 1 минуту. Однако, когда я применяю приведенный ниже код,
from operator import attrgetter
from pyspark.sql.types import StructType
from pyspark.sql.functions import pandas_udf, PandasUDFType
def resample(schema, freq, timestamp_col = "timestamp",**kwargs):
@pandas_udf(
StructType(sorted(schema, key=attrgetter("name"))),
PandasUDFType.GROUPED_MAP)
def _(pdf):
pdf.set_index(timestamp_col, inplace=True)
pdf = pdf.resample(freq).interpolate()
pdf.ffill(inplace=True)
pdf.reset_index(drop=False, inplace=True)
pdf.sort_index(axis=1, inplace=True)
return pdf
return _
df.groupBy("webID").apply(resample(df.schema, "60S")).show()
Ошибка:
py4j.protocol.Py4JJavaError: An error occurred while calling o371.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 77 in stage 31.0 failed 4 times, most recent failure: Lost task 77.3 in stage 31.0 (TID 812, 27faa516aadb4c40b7d7586d7493143c0021c825663, executor 2): java.lang.IllegalArgumentException
at java.nio.ByteBuffer.allocate(ByteBuffer.java:334)
Комментарии:
1. какую версию spark вы используете? у меня это работает на spark 3.0
2. @mck: ‘2.4.4.2.6.99.201-25973884’
3. @mck: Вы знаете, как я мог заставить это работать в Spark 2.4? Спасибо.
Ответ №1:
Установите переменную среды ARROW_PRE_0_15_IPC_FORMAT=1
.
def resample(schema, freq, timestamp_col = "timestamp",**kwargs):
@pandas_udf(
StructType(sorted(schema, key=attrgetter("name"))),
PandasUDFType.GROUPED_MAP)
def _(pdf):
import os # add this line
os.environ['ARROW_PRE_0_15_IPC_FORMAT']='1' # add this line
pdf.set_index(timestamp_col, inplace=True)
pdf = pdf.resample(freq).interpolate()
pdf.ffill(inplace=True)
pdf.reset_index(drop=False, inplace=True)
pdf.sort_index(axis=1, inplace=True)
return pdf
return _
Ответ №2:
Вы также можете выполнить повторную выборку в pyspark без использования pandas UDF (или python UDF). Приведенное ниже решение будет работать лучше для больших наборов данных по сравнению с методом pandas UDF, а также предотвратит ошибку, которую вы получаете:
import pyspark.sql.functions as F
df = spark.createDataFrame([
("John", "A", "2018-02-01 03:00:00", 60),
("John", "A", "2018-02-01 03:03:00", 66),
("John", "A", "2018-02-01 03:05:00", 70),
("John", "A", "2018-02-01 03:08:00", 76),
("Mo", "A", "2017-06-04 01:05:00", 10),
("Mo", "A", "2017-06-04 01:07:00", 20),
("Mo", "B", "2017-06-04 01:10:00", 35),
("Mo", "B", "2017-06-04 01:11:00", 40),
], ("webID", "aType", "timestamp", "counts")).withColumn(
"timestamp", F.to_timestamp("timestamp")
)
resample_interval = 1*60 # Resample interval size in seconds
df_interpolated = (
df
# Get timestamp and Counts of previous measurement via window function
.selectExpr(
"webID",
"aType",
"LAG(Timestamp) OVER (PARTITION BY webID ORDER BY Timestamp ASC) as PreviousTimestamp",
"Timestamp as NextTimestamp",
"LAG(Counts) OVER (PARTITION BY webID ORDER BY Timestamp ASC) as PreviousCounts",
"Counts as NextCounts",
)
# To determine resample interval round up start and round down end timeinterval to nearest interval boundary
.withColumn("PreviousTimestampRoundUp", F.expr(f"to_timestamp(ceil(unix_timestamp(PreviousTimestamp)/{resample_interval})*{resample_interval})"))
.withColumn("NextTimestampRoundDown", F.expr(f"to_timestamp(floor(unix_timestamp(NextTimestamp)/{resample_interval})*{resample_interval})"))
# Make sure we don't get any negative intervals (whole interval is within resample interval)
.filter("PreviousTimestampRoundUp<=NextTimestampRoundDown")
# Create resampled time axis by creating all "interval" timestamps between previous and next timestamp
.withColumn("Timestamp", F.expr(f"explode(sequence(PreviousTimestampRoundUp, NextTimestampRoundDown, interval {resample_interval} second)) as Timestamp"))
# Sequence has inclusive boundaries for both start and stop. Filter out duplicate Counts if original timestamp is exactly a boundary.
.filter("Timestamp<NextTimestamp")
# Interpolate Counts between previous and next
.selectExpr(
"webID",
"aType",
"Timestamp",
"""(unix_timestamp(Timestamp)-unix_timestamp(PreviousTimestamp))
/(unix_timestamp(NextTimestamp)-unix_timestamp(PreviousTimestamp))
*(NextCounts-PreviousCounts)
PreviousCounts
as Counts"""
)
)
Недавно я написал сообщение в блоге, в котором объясняются причины, лежащие в основе этого метода, и сравнивается производительность с методом pandas UDF, который вы используете: https://medium.com/delaware-pro/interpolate-big-data-time-series-in-native-pyspark-d270d4b592a1