#apache-spark #hadoop #hive #apache-spark-sql #hadoop-yarn
#apache-spark #hadoop #улей #apache-spark-sql #hadoop-yarn
Вопрос:
Я обнаружил неожиданный ввод-вывод с диска (скачок загрузки диска) после завершения всех моих задач spark, но контекст spark не остановился — как показано на рисунке пример 2 на 21:56:47
. Кто-нибудь может помочь объяснить это и дать предложения о том, как избежать или отложить это? Или в контексте spark есть какие-то периодические действия асинхронного ввода-вывода, которые могут приводить к скачкам? Спасибо!
Приведен пример выполнения пакетного задания SparkSQL в двух случаях. В первом случае я выполняю рабочую нагрузку sql и останавливаю контекст spark сразу после .show()
завершения действия. Во втором случае я добавляю 1-минутный режим ожидания после .show()
использования Thread.sleep(60000)
, а затем останавливаю контекст spark. Результат показывает, что временные затраты на выполнение рабочей нагрузки sql в двух случаях одинаковы, но во втором случае происходит неожиданный скачок загрузки диска, который выполняет локальную память для записи в случайном порядке. Посмотрите на пик на рисунке случая 2.
Вот более подробная информация.
Настройка системы
- Spark 2.3.1, Hadoop 2.9.1, Hive 2.3.4 для хранения метаданных.
- Один главный и два рабочих узла (worker1 и worker2). У каждого узла достаточно доступных ресурсов (32 ядра, 750 ГБ памяти и 8 8-T дисков от disk1 до disk8).
- HDFS развернута на диске 8; диск1 используется для локальной записи spark в случайном порядке в хранилище.
- Я использую Yarn для управления кластером.
- Я использую инструмент системного монитора «nmon» для обнаружения активности диска.
- В серверной части нет другого большого приложения, запущенного.
- Я использую
yarn client
режим при отправке своего кода. Я использую 8 исполнителей, каждый из которых имеет 4 ядра и 8 ГБ памяти. - Чтобы отметить, я разместил HDFS и локальный файл Yarn на двух разных дисках —
yarn_local
каталог находится на каждом рабочем диске 1, а HDFS развернута на дисках 8 двух рабочих узлов. На каждом диске есть8T
. Таким образом, действия для HDFS и локального диска можно различать.
Вот мой текущий анализ
- Это вызвано не самим диском и другими фоновыми процессами. Я попробовал disk2, disk3, disk4 и disk8 для локального хранилища yarn, чтобы проверить, связан ли пик с программой, и он показывает одни и те же пиковые значения каждый раз, когда я выполняю случай 2.
- Скачок вызван самим Spark. Я попробовал режим автономного развертывания, и скачок все еще существует (без Yarn).
- Это может иметь отношение к перетасовке. Общий размер записи в случайном порядке для моего целевого пакетного задания близок к
2GB
. Я также пробовал другую рабочую нагрузку с размером записи в случайном порядке, близким к1MB
,250MB
и1GB
. Загрузка диска становится незначительной для пакетного задания с изменяющимся размером записи1MB
и достигает80%
для пакетного задания с общим размером записи в случайном порядке250MB
. - Отслеживается размер файла локального хранилища. При появлении скачка загрузки диска обнаруживается запись на диск, но размер диска не увеличивается. Следовательно, (1) это может не иметь отношения к очистке дискового кэша (2) возможно, происходит какая-то замена диска (не слишком уверен).
Согласно моему текущему анализу, я подозреваю, что это должно быть вызвано чем-то, с чем я не знаком — например, некоторым асинхронным поведением spark на дисках. Кто-нибудь может помочь объяснить это? Спасибо!
Вот первый случай.
Вот второй случай.
Чтобы было более понятно на рисунке, worker1 node local
обозначает disk1 в worker1, the worker2 local
обозначает disk1 в worker2; worker1 node dfs
обозначает disk8 в worker1 и worker2 node dfs
обозначает disk8 в worker2, где находится HDFS. Левая ось y — это загруженность диска (от 0% до 100%), обнаруженная nmon
, а правая ось y — это размер каталога для hdfs в disk8 (который мы можем просто игнорировать для этой проблемы).
Вот мой код.
import org.apache.spark.sql.SparkSession
object Q16 {
def main(args: Array[String]): Unit = {
val db = s"bigbench_sf_100"
val spark = SparkSession
.builder()
.enableHiveSupport()
.getOrCreate()
val sc = spark.sparkContext
spark.sql(s"use $db")
val t1 = System.currentTimeMillis()
spark.sql(
s"""
|SELECT w_state, i_item_id,
| SUM(
| CASE WHEN (unix_timestamp(d_date,'yyyy-MM-dd') < unix_timestamp('2001-03-16','yyyy-MM-dd'))
| THEN ws_sales_price - COALESCE(wr_refunded_cash,0)
| ELSE 0.0 END
| ) AS sales_before,
| SUM(
| CASE WHEN (unix_timestamp(d_date,'yyyy-MM-dd') >= unix_timestamp('2001-03-16','yyyy-MM-dd'))
| THEN ws_sales_price - COALESCE(wr_refunded_cash,0)
| ELSE 0.0 END
| ) AS sales_after
|FROM (
| SELECT *
| FROM web_sales ws
| LEFT OUTER JOIN web_returns wr ON (ws.ws_order_number = wr.wr_order_number
| AND ws.ws_item_sk = wr.wr_item_sk)
|) a1
|JOIN item i ON a1.ws_item_sk = i.i_item_sk
|JOIN warehouse w ON a1.ws_warehouse_sk = w.w_warehouse_sk
|JOIN date_dim d ON a1.ws_sold_date_sk = d.d_date_sk
|AND unix_timestamp(d.d_date, 'yyyy-MM-dd') >= unix_timestamp('2001-03-16', 'yyyy-MM-dd') - 30*24*60*60 --subtract 30 days in seconds
|AND unix_timestamp(d.d_date, 'yyyy-MM-dd') <= unix_timestamp('2001-03-16', 'yyyy-MM-dd') 30*24*60*60 --add 30 days in seconds
|GROUP BY w_state,i_item_id
|--original was ORDER BY w_state,i_item_id , but CLUSTER BY is hives cluster scale counter part
|ORDER BY w_state,i_item_id
|LIMIT 100
""".stripMargin).show
val t2 = System.currentTimeMillis()
// For case 2
// Thread.sleep(60 * 1000)
spark.stop()
}
}
Ответ №1:
Я выясняю причину неожиданной активности ввода-вывода.
Это поведение буферного кэша файловой системы. В общем случае, когда процесс выполняет запись в файл, данные не записываются на диск немедленно, а вместо этого записываются в кэш в памяти. Этот кэш поддерживается операционной системой / файловой системой в качестве оптимизации производительности, поскольку он позволяет запросам на запись возвращаться после записи в память, а не ждать завершения медленного ввода-вывода. Эти грязные данные периодически сбрасываются операционной системой на диск в фоновом режиме.
Таким образом, в принципе, действий с диском (очистка) избежать невозможно, если только файловые страницы не будут удалены при кэшировании в буфере диска (в случае 1).
Вы можете принудительно записать все грязные данные немедленно, используя системную команду Linux sync
.