#apache-spark #pyspark #spark-dataframe
#apache-spark #pyspark #apache-spark-sql
Вопрос:
Я использую Spark для распараллеливания некоторого существующего кода, который выполняет некоторое извлечение данных и возвращает фрейм данных pandas. Я хотел бы преобразовать эти фреймы данных pandas в один или несколько фреймов данных Spark.
Примечание. существующий код довольно сложный (включает вызов собственных библиотек и т. Д.), Поэтому перенос его непосредственно в код Spark не является вариантом.
Вот упрощенный пример кода:
import pandas as pd
def extract_df(s):
# Lots of existing code that returns a large pandas dataframe
# ...
return pd.DataFrame({'x': s, 'y': [1, 2, 3], 'z': [4, 5, 6]})
sRDD = spark.sparkContext.parallelize(['A', 'B', 'C'])
dfsRDD = sRDD.map(lambda s: extract_df(s))
Я знаю, что могу преобразовать datesRDD в Spark dataframe, собирая данные в драйвере.
spark.createDataFrame(pd.concat(rdd.collect(), ignore_index=True)).show()
но это, конечно, требует, чтобы я мог хранить всю коллекцию фреймов данных Pandas в памяти, чего я не могу.
На данный момент я записываю фреймы данных Pandas в json на S3, затем читаю с помощью Spark, но для этого требуется много памяти.
Есть ли какой-нибудь способ, которым я могу сказать Spark преобразовать в DataFrame / RDD на самом исполнителе? Или есть другой подход, который я пропустил?
Ответ №1:
Хорошая, плоская карта на помощь!
import pandas as pd
def extract_df(s):
# Lots of existing code that returns a **huge** pandas dataframe
# ...
df = pd.DataFrame({'x': s, 'y': [1, 2, 3], 'z': [4, 5, 6]})
return df.values.tolist()
datesRDD = spark.sparkContext.parallelize(['A', 'B', 'C'])
dfsRDD = datesRDD.flatMap(lambda s: extract_df(s))
spark.createDataFrame(dfsRDD, schema=['x', 'y', 'z']).show()
--- --- ---
| x| y| z|
--- --- ---
| A| 1| 4|
| A| 2| 5|
| A| 3| 6|
| B| 1| 4|
| B| 2| 5|
| B| 3| 6|
| C| 1| 4|
| C| 2| 5|
| C| 3| 6|
--- --- ---