#apache-spark #pyspark #timeout #job-scheduling
#apache-spark #pyspark #тайм-аут #планирование заданий
Вопрос:
Я использую spark для распараллеливания миллиона задач . Например, обучить миллион отдельных моделей.
Мне нужно добиться как можно большего успеха , но при этом избежать неудач . В spark, если не удается найти лучшее решение только для одной модели, оно может зависнуть и продолжать работать вечно. В этой ситуации задание spark никогда бы не завершилось, и завершение этого задания не сохранило бы другие 999 999 моделей в hdfs .
Эта проблема действительно болезненна .
Я поискал вокруг, но ничего полезного не нашел :
spark.task.maxFailures
: Сбоя нет, так что это не вступает в силу .spark.network.timeout
: Проблем с сетью нет.spark.executor.heartbeatInterval
: Никакого родственника .
основной обучающий код, в основном использующий rdd.map для обучения
df1 = (df.rdd
.map(lambda r: r.asDict())
.map(lambda d: transform_data(d))
.map(lambda d: create_model(d))
.map(lambda d: model_fit(d))
.map(lambda d: pickle_model(d))
)
Как установить тайм — аут для задачи spark? Или есть какое — нибудь хорошее объяснение ?
Комментарии:
1. Не уверен, что я следую за ходом мыслей.
2. определите, какое преобразование может быть длительным, и завершите этот конкретный уровень записи преобразования потоком и извлеките результат с таймаутом. здесь вы можете указать интервал тайм-аута по своему усмотрению ….. spark не предоставляет поддержку по умолчанию
Ответ №1:
Я не думаю, что это может быть контроллер на уровне конфигурации. Может случиться так, что вы захотите применить это только к подмножеству задач spark. SparkListener
может помочь с этим, поскольку вы можете подключиться к задаче, этапу, уровню задания, а затем принимать решения об отмене задачи с помощью sparkContenxt
.
/**
* Called when a task starts
*/
def onTaskStart(taskStart: SparkListenerTaskStart): Unit
В приведенном выше примере вы можете реализовать логику тайм-аута.
Завершение конкретной задачи может быть выполнено с помощью SparkContext с использованием def cancelStage(stageId: Int)
Вы можете получить конкретные идентификаторы из событий прослушивателя