Как установить тайм-аут для задачи spark или операции map? (Или пропустить длительную задачу)

#apache-spark #pyspark #timeout #job-scheduling

#apache-spark #pyspark #тайм-аут #планирование заданий

Вопрос:

Я использую spark для распараллеливания миллиона задач . Например, обучить миллион отдельных моделей.

Мне нужно добиться как можно большего успеха , но при этом избежать неудач . В spark, если не удается найти лучшее решение только для одной модели, оно может зависнуть и продолжать работать вечно. В этой ситуации задание spark никогда бы не завершилось, и завершение этого задания не сохранило бы другие 999 999 моделей в hdfs .
Эта проблема действительно болезненна .

Я поискал вокруг, но ничего полезного не нашел :

  • spark.task.maxFailures : Сбоя нет, так что это не вступает в силу .
  • spark.network.timeout : Проблем с сетью нет.
  • spark.executor.heartbeatInterval : Никакого родственника .

основной обучающий код, в основном использующий rdd.map для обучения

 df1 = (df.rdd
      .map(lambda r: r.asDict())
      .map(lambda d: transform_data(d))
      .map(lambda d: create_model(d))
      .map(lambda d: model_fit(d))
      .map(lambda d: pickle_model(d))
)
  

Как установить тайм — аут для задачи spark? Или есть какое — нибудь хорошее объяснение ?

Комментарии:

1. Не уверен, что я следую за ходом мыслей.

2. определите, какое преобразование может быть длительным, и завершите этот конкретный уровень записи преобразования потоком и извлеките результат с таймаутом. здесь вы можете указать интервал тайм-аута по своему усмотрению ….. spark не предоставляет поддержку по умолчанию

Ответ №1:

Я не думаю, что это может быть контроллер на уровне конфигурации. Может случиться так, что вы захотите применить это только к подмножеству задач spark. SparkListener может помочь с этим, поскольку вы можете подключиться к задаче, этапу, уровню задания, а затем принимать решения об отмене задачи с помощью sparkContenxt .

  /**
   * Called when a task starts
   */
  def onTaskStart(taskStart: SparkListenerTaskStart): Unit
  

В приведенном выше примере вы можете реализовать логику тайм-аута.

Завершение конкретной задачи может быть выполнено с помощью SparkContext с использованием def cancelStage(stageId: Int)

Вы можете получить конкретные идентификаторы из событий прослушивателя