Почему всплеск занятости диска происходит между завершением задания и выключением Spark?

#apache-spark #hadoop #hive #apache-spark-sql #hadoop-yarn

#apache-spark #hadoop #улей #apache-spark-sql #hadoop-yarn

Вопрос:

Я обнаружил неожиданный ввод-вывод с диска (скачок загрузки диска) после завершения всех моих задач spark, но контекст spark не остановился — как показано на рисунке пример 2 на 21:56:47 . Кто-нибудь может помочь объяснить это и дать предложения о том, как избежать или отложить это? Или в контексте spark есть какие-то периодические действия асинхронного ввода-вывода, которые могут приводить к скачкам? Спасибо!

Приведен пример выполнения пакетного задания SparkSQL в двух случаях. В первом случае я выполняю рабочую нагрузку sql и останавливаю контекст spark сразу после .show() завершения действия. Во втором случае я добавляю 1-минутный режим ожидания после .show() использования Thread.sleep(60000) , а затем останавливаю контекст spark. Результат показывает, что временные затраты на выполнение рабочей нагрузки sql в двух случаях одинаковы, но во втором случае происходит неожиданный скачок загрузки диска, который выполняет локальную память для записи в случайном порядке. Посмотрите на пик на рисунке случая 2.

Вот более подробная информация.

Настройка системы

  • Spark 2.3.1, Hadoop 2.9.1, Hive 2.3.4 для хранения метаданных.
  • Один главный и два рабочих узла (worker1 и worker2). У каждого узла достаточно доступных ресурсов (32 ядра, 750 ГБ памяти и 8 8-T дисков от disk1 до disk8).
  • HDFS развернута на диске 8; диск1 используется для локальной записи spark в случайном порядке в хранилище.
  • Я использую Yarn для управления кластером.
  • Я использую инструмент системного монитора «nmon» для обнаружения активности диска.
  • В серверной части нет другого большого приложения, запущенного.
  • Я использую yarn client режим при отправке своего кода. Я использую 8 исполнителей, каждый из которых имеет 4 ядра и 8 ГБ памяти.
  • Чтобы отметить, я разместил HDFS и локальный файл Yarn на двух разных дисках — yarn_local каталог находится на каждом рабочем диске 1, а HDFS развернута на дисках 8 двух рабочих узлов. На каждом диске есть 8T . Таким образом, действия для HDFS и локального диска можно различать.

Вот мой текущий анализ

  1. Это вызвано не самим диском и другими фоновыми процессами. Я попробовал disk2, disk3, disk4 и disk8 для локального хранилища yarn, чтобы проверить, связан ли пик с программой, и он показывает одни и те же пиковые значения каждый раз, когда я выполняю случай 2.
  2. Скачок вызван самим Spark. Я попробовал режим автономного развертывания, и скачок все еще существует (без Yarn).
  3. Это может иметь отношение к перетасовке. Общий размер записи в случайном порядке для моего целевого пакетного задания близок к 2GB . Я также пробовал другую рабочую нагрузку с размером записи в случайном порядке, близким к 1MB , 250MB и 1GB . Загрузка диска становится незначительной для пакетного задания с изменяющимся размером записи 1MB и достигает 80% для пакетного задания с общим размером записи в случайном порядке 250MB .
  4. Отслеживается размер файла локального хранилища. При появлении скачка загрузки диска обнаруживается запись на диск, но размер диска не увеличивается. Следовательно, (1) это может не иметь отношения к очистке дискового кэша (2) возможно, происходит какая-то замена диска (не слишком уверен).

Согласно моему текущему анализу, я подозреваю, что это должно быть вызвано чем-то, с чем я не знаком — например, некоторым асинхронным поведением spark на дисках. Кто-нибудь может помочь объяснить это? Спасибо!

Вот первый случай. Случай 1: без режима ожидания

Вот второй случай. Случай 2: 60-секундный спящий режим

Чтобы было более понятно на рисунке, worker1 node local обозначает disk1 в worker1, the worker2 local обозначает disk1 в worker2; worker1 node dfs обозначает disk8 в worker1 и worker2 node dfs обозначает disk8 в worker2, где находится HDFS. Левая ось y — это загруженность диска (от 0% до 100%), обнаруженная nmon , а правая ось y — это размер каталога для hdfs в disk8 (который мы можем просто игнорировать для этой проблемы).

Вот мой код.

 import org.apache.spark.sql.SparkSession
object Q16 {
  def main(args: Array[String]): Unit = {
    val db = s"bigbench_sf_100"

    val spark = SparkSession
      .builder()
      .enableHiveSupport()
      .getOrCreate()
    val sc = spark.sparkContext

    spark.sql(s"use $db")

    val t1 = System.currentTimeMillis()
    spark.sql(
      s"""
         |SELECT w_state, i_item_id,
         |  SUM(
         |    CASE WHEN (unix_timestamp(d_date,'yyyy-MM-dd') < unix_timestamp('2001-03-16','yyyy-MM-dd'))
         |    THEN ws_sales_price - COALESCE(wr_refunded_cash,0)
         |    ELSE 0.0 END
         |  ) AS sales_before,
         |  SUM(
         |    CASE WHEN (unix_timestamp(d_date,'yyyy-MM-dd') >= unix_timestamp('2001-03-16','yyyy-MM-dd'))
         |    THEN ws_sales_price - COALESCE(wr_refunded_cash,0)
         |    ELSE 0.0 END
         |  ) AS sales_after
         |FROM (
         |  SELECT *
         |  FROM web_sales ws
         |  LEFT OUTER JOIN web_returns wr ON (ws.ws_order_number = wr.wr_order_number
         |    AND ws.ws_item_sk = wr.wr_item_sk)
         |) a1
         |JOIN item i ON a1.ws_item_sk = i.i_item_sk
         |JOIN warehouse w ON a1.ws_warehouse_sk = w.w_warehouse_sk
         |JOIN date_dim d ON a1.ws_sold_date_sk = d.d_date_sk
         |AND unix_timestamp(d.d_date, 'yyyy-MM-dd') >= unix_timestamp('2001-03-16', 'yyyy-MM-dd') - 30*24*60*60 --subtract 30 days in seconds
         |AND unix_timestamp(d.d_date, 'yyyy-MM-dd') <= unix_timestamp('2001-03-16', 'yyyy-MM-dd')   30*24*60*60 --add 30 days in seconds
         |GROUP BY w_state,i_item_id
         |--original was ORDER BY w_state,i_item_id , but CLUSTER BY is hives cluster scale counter part
         |ORDER BY w_state,i_item_id
         |LIMIT 100
       """.stripMargin).show
    val t2 = System.currentTimeMillis()

//    For case 2
//    Thread.sleep(60 * 1000)

    spark.stop()
  }
}
  

Ответ №1:

Я выясняю причину неожиданной активности ввода-вывода.

Это поведение буферного кэша файловой системы. В общем случае, когда процесс выполняет запись в файл, данные не записываются на диск немедленно, а вместо этого записываются в кэш в памяти. Этот кэш поддерживается операционной системой / файловой системой в качестве оптимизации производительности, поскольку он позволяет запросам на запись возвращаться после записи в память, а не ждать завершения медленного ввода-вывода. Эти грязные данные периодически сбрасываются операционной системой на диск в фоновом режиме.

Таким образом, в принципе, действий с диском (очистка) избежать невозможно, если только файловые страницы не будут удалены при кэшировании в буфере диска (в случае 1).

Вы можете принудительно записать все грязные данные немедленно, используя системную команду Linux sync .