Команды Pyspark занимают больше времени, чем команды pandas для большого кадра данных

#python #dataframe #apache-spark #pyspark #hive

#питон #фрейм данных #apache-искра #пыспарк #улей

Вопрос:

Ну, у меня есть таблица со 100 миллионами строк и 24 столбцами, работающая с использованием pyspark, я думал, что pyspark должен ускорить вычисления для больших данных, но почему все, что делает pyspark, действительно медленно. даже простое .show() для строки 1,5 млн x 22 столбцов занимает целую вечность. Есть ли что-то не так с моей конфигурацией pyspark или почему она не оптимальна. Ниже приведен пример кода, который я использую для обработки pyspark. Я запускаю pyspark в ноутбуке jupyter, подключенном к CDP.

 spark = SparkSession   .builder   .master('yarn')   .config("spark.submit.deployMode","cluster")   .config("spark.sql.extensions", "com.qubole.spark.hiveacid.HiveAcidAutoConvertExtension")   .appName('Connect-to-Hive')   .config('spark.dynamicAllocation.enabled', 'false')   .config('parquet.compression', 'SNAPPY')   .config('spark.driver.cores','4')   .config('spark.driver.memory','16g')   .config('spark.executor.memory','16g')   .config('spark.driver.memoryOverhead','16g')   .config('spark.executor.memoryOverhead','16g')   .enableHiveSupport()   .getOrCreate()  df = spark.sql('SELECT * FROM table')  def presume_date(df):  """ Set datetime by presuming any date values in the column  indicates that the column data type should be datetime.   Args:  dataframe: Pandas dataframe.   Returns:  Pandas dataframe.   Raises:  None  """    columns = [ column for column in df.columns if df.filter(df[column].rlike(r'(d{19}) ')).count() gt;= df.count() ]  if bool(columns):  for cols in columns:  df = df.withColumn(cols, df[cols].cast('timestamp'))    columns = [ column for column in df.columns if df.filter(df[column].rlike(r'(d{2,4}-d{2,4}-d{2,4}) ')).count() gt; 0 ]  if bool(columns):  for cols in columns:  df = df.withColumn(cols, df[cols].cast('timestamp'))    columns = [ column for column in df.columns if df.filter(df[column].rlike(r'(^d $)')).count() gt;= df.count() ]  if bool(columns):  for cols in columns:  df = df.withColumn(cols, df[cols].cast('int'))    return df  df = presume_date(df)   

другой пример для моей другой таблицы, содержащей всего 1,5 миллиона строк, я на самом деле пытался вставить нижеприведенную таблицу в таблицу hive, но через 12 часов она не закончилась. Поэтому я попытался выполнить простую команду с помощью show, и это тоже заняло так много времени.

 df_price_daily = pd.read_csv('price_simulation.csv', low_memory=False) df_price_daily = df_price_daily.drop(columns='Unnamed: 0')  df_price_daily.dtypes  price_id int64 year int64 noka int64 nama_ka object org object orderorg float64 des object orderdes float64 trip_id int64 trip_date datetime64[ns] wagonclass_code object price_date datetime64[ns] price float64 price_realization float64 trans_count float64 trans_amount float64 condition object event object current_date datetime64[ns]  df_price_daily_spark = spark.createDataFrame(df_price_daily) df_price_daily_spark.show(10)   

Комментарии:

1. Привет, можете ли вы поделиться информацией о выполнении из пользовательского интерфейса spark, такой как количество задач, использование памяти каждым исполнителем и, возможно, логический план ? [перед началом шоу]

2. К вашему сведению, приведенный ниже код однажды успешно передал данные в улей всего за 18 секунд, однако, выполнив те же действия, он больше никогда не был успешным

3. твое бутылочное горлышко здесь df.count() . Этот шаг является действием, и вы делаете это для каждого столбца 3 раза. Это не так, как вы делаете это с помощью spark. Измените его на преобразование, чтобы ускорить ваш процесс. Если вы хотите использовать несколько действий, по крайней мере, используйте кэширование.

4. о, хорошо, я попробую ваше предложение, любое предложение для того, что ниже?? @Стивен, есть еще одна проблема, я думаю, она связана с тем, что когда я пытаюсь остановить или выключить ядро jupyter, оно не останавливается после запуска первого кода. Вывод показывает прерывание клавиатуры, но после обновления jupyter он все еще работает