#python #dataframe #apache-spark #pyspark #hive
#питон #фрейм данных #apache-искра #пыспарк #улей
Вопрос:
Ну, у меня есть таблица со 100 миллионами строк и 24 столбцами, работающая с использованием pyspark, я думал, что pyspark должен ускорить вычисления для больших данных, но почему все, что делает pyspark, действительно медленно. даже простое .show() для строки 1,5 млн x 22 столбцов занимает целую вечность. Есть ли что-то не так с моей конфигурацией pyspark или почему она не оптимальна. Ниже приведен пример кода, который я использую для обработки pyspark. Я запускаю pyspark в ноутбуке jupyter, подключенном к CDP.
spark = SparkSession .builder .master('yarn') .config("spark.submit.deployMode","cluster") .config("spark.sql.extensions", "com.qubole.spark.hiveacid.HiveAcidAutoConvertExtension") .appName('Connect-to-Hive') .config('spark.dynamicAllocation.enabled', 'false') .config('parquet.compression', 'SNAPPY') .config('spark.driver.cores','4') .config('spark.driver.memory','16g') .config('spark.executor.memory','16g') .config('spark.driver.memoryOverhead','16g') .config('spark.executor.memoryOverhead','16g') .enableHiveSupport() .getOrCreate() df = spark.sql('SELECT * FROM table') def presume_date(df): """ Set datetime by presuming any date values in the column indicates that the column data type should be datetime. Args: dataframe: Pandas dataframe. Returns: Pandas dataframe. Raises: None """ columns = [ column for column in df.columns if df.filter(df[column].rlike(r'(d{19}) ')).count() gt;= df.count() ] if bool(columns): for cols in columns: df = df.withColumn(cols, df[cols].cast('timestamp')) columns = [ column for column in df.columns if df.filter(df[column].rlike(r'(d{2,4}-d{2,4}-d{2,4}) ')).count() gt; 0 ] if bool(columns): for cols in columns: df = df.withColumn(cols, df[cols].cast('timestamp')) columns = [ column for column in df.columns if df.filter(df[column].rlike(r'(^d $)')).count() gt;= df.count() ] if bool(columns): for cols in columns: df = df.withColumn(cols, df[cols].cast('int')) return df df = presume_date(df)
другой пример для моей другой таблицы, содержащей всего 1,5 миллиона строк, я на самом деле пытался вставить нижеприведенную таблицу в таблицу hive, но через 12 часов она не закончилась. Поэтому я попытался выполнить простую команду с помощью show, и это тоже заняло так много времени.
df_price_daily = pd.read_csv('price_simulation.csv', low_memory=False) df_price_daily = df_price_daily.drop(columns='Unnamed: 0') df_price_daily.dtypes price_id int64 year int64 noka int64 nama_ka object org object orderorg float64 des object orderdes float64 trip_id int64 trip_date datetime64[ns] wagonclass_code object price_date datetime64[ns] price float64 price_realization float64 trans_count float64 trans_amount float64 condition object event object current_date datetime64[ns] df_price_daily_spark = spark.createDataFrame(df_price_daily) df_price_daily_spark.show(10)
Комментарии:
1. Привет, можете ли вы поделиться информацией о выполнении из пользовательского интерфейса spark, такой как количество задач, использование памяти каждым исполнителем и, возможно, логический план ? [перед началом шоу]
2. К вашему сведению, приведенный ниже код однажды успешно передал данные в улей всего за 18 секунд, однако, выполнив те же действия, он больше никогда не был успешным
3. твое бутылочное горлышко здесь
df.count()
. Этот шаг является действием, и вы делаете это для каждого столбца 3 раза. Это не так, как вы делаете это с помощью spark. Измените его на преобразование, чтобы ускорить ваш процесс. Если вы хотите использовать несколько действий, по крайней мере, используйте кэширование.4. о, хорошо, я попробую ваше предложение, любое предложение для того, что ниже?? @Стивен, есть еще одна проблема, я думаю, она связана с тем, что когда я пытаюсь остановить или выключить ядро jupyter, оно не останавливается после запуска первого кода. Вывод показывает прерывание клавиатуры, но после обновления jupyter он все еще работает