Как передать конфигурацию от драйвера исполнителям в Spark?

#apache-spark #apache-spark-sql

#apache-spark #apache-spark-sql

Вопрос:

Например.

 object App {

  var confValue: String = ""

  def main(args: Array[String]): Unit = {
    // set conf by cmd args
    confValue = args.head
    // do some context init
    val dataset: Dataset[Int] = ???
    dataset.foreach { row =>
      // get conf from executor
      println(confValue)
    }
  }
}
 

Я хочу получить conf для исполнителей, но на самом деле это невозможно сделать, потому confValue что было изменено только в драйвере

Я знаю, что могу передать confValue исполнителям локальную переменную, подобную этой.

   def main(args: Array[String]): Unit = {
    // set conf by cmd args
    val confValue = args[0]
    // do some context init
    val dataset: Dataset[Int] = ???
    dataset.foreach { row =>
      // get conf from executor
      println(confValue)
    }
  }
 

Но моя работа с spark огромна. У него так много функций. Я не могу передавать confValue везде как локальную переменную. Например:

   def main(args: Array[String]): Unit = {
    // set conf by cmd args
    val confValue = args[0]
    // do some context init
    val dataset: Dataset[Int] = ???
    dataset.foreach { row =>
      doSomeLogic(row)
    }
  }

  private def doSomeLogic(row: Int): Unit = {
    // get conf from executor
    println(confValue)
  }
 

Их так много doSomeLogic . Поэтому я не могу перейти confValue ко всем из них.
Есть ли какой-нибудь способ автоматически перейти confValue к каждому исполнителю?


обновлено 1

Мой код spark может выглядеть следующим образом

 object App {

  /** env flag, will be inited by cmd args, and be used in executors */
  var env: String = ""
  val spark: SparkSession = ???

  import spark.implicits._

  def main(args: Array[String]): Unit = {
    // read env from args
    env = args.head

    var ds: Dataset[Int] = ???
    ds = doLogic1(ds)
    ds = doLogic2(ds)
    doLogic3(ds)
  }

  private def doLogic1(ds: Dataset[Int]): Dataset[Int] = {
    ds.map { row =>
      // env will be used here
      ???
    }
  }

  private def doLogic2(ds: Dataset[Int]): Dataset[Int] = {
    ds.map { row =>
      // env will be used here
      ???
    }
  }

  private def doLogic3(ds: Dataset[Int]): Dataset[Int] = {
    ds.map { row =>
      // env will be used here
      ???
    }
  }
}
 

env будет main инициализирован и будет использоваться в некоторых doLogicN функциях. Мой проект spark — большой проект со многими doLogicN функциями, поэтому передача env флага каждой doLogicN функции изменит слишком много кодов.

Какой самый простой способ передать env флаг всем doLogicN функциям?

Самый сложный момент заключается в том, что env он будет использоваться в исполнителях. Если она будет использоваться только в драйверах, я могу передать ее везде с помощью глобальной env переменной. Но это не будет хорошо работать в исполнителях, потому что глобальная env переменная не была инициализирована. Это может быть инициализировано только на стороне драйвера.

Комментарии:

1. вы можете передать значение confValue всем исполнителям и использовать его с помощью confValue. значение в коде, где вы используете confValue

2. @NikunjKakadiya Спасибо за ваш комментарий! Я немного поискал о трансляции. Кажется, что широковещательная переменная может быть создана только через SparkContext , что означает, что эта переменная должна исходить из статуса «локальная переменная». Так что это похоже на ситуацию с моим вторым кодом. Но как решить проблему, показанную моим третьим кодом? Как передать confValue ВСЕМ исполнителям? Не могли бы вы показать какой-нибудь пример кода, пожалуйста 🙂

3. пожалуйста, проверьте мой ответ ниже, чтобы узнать, как вы можете использовать широковещательную передачу, чтобы иметь значение для всех исполнителей.

Ответ №1:

Вы могли бы сделать что-то вроде приведенного ниже, чтобы передать значение всем исполнителям, а затем на основе ваших требований и использовать его по своему усмотрению. Также вместо использования для каждого вы должны использовать для каждого раздела, если хотите обрабатывать данные для каждого раздела параллельно.

Ниже приведен пример кода того, как вы можете передавать значение и использовать его:

 //Sample data created
val df = Seq(("a","2020-01-16 08:55:50"),("b","2020-01-16 08:57:37"),("c","2020-01-16 09:00:13"),("d","2020-01-16 09:01:32"),("e","2020-01-16 09:03:32"),("f","2020-01-16 09:06:56")).toDF("ID","timestamp")
//check the partitions that a datframe has
df.rdd.partitions.size
//broadcast the value that you want to broadcast
val confValue = "Test"
val bdct_confvalue = spark.sparkContext.broadcast(confValue);
//using the broadcasted value on each executors nodes as required
df.foreachPartition(partition => {
  println("Confvalue partition ="  bdct_confvalue.value)
 }
)
 

Кроме того, чтобы увидеть значение, напечатанное в журналах, вам нужно будет просмотреть журналы исполнителей, а не журналы драйверов, поскольку вы не сможете увидеть эту инструкцию печати в журналах драйверов. вы также не сможете увидеть это в любом ноутбуке, таком как Jupyter или Databricks notebook, поскольку они отображают журналы драйверов в пользовательском интерфейсе.

Комментарии:

1. bdct_confvalue передается путем закрытия. Но мой проект spark — это большой проект, состоящий из тысяч строк. bdct_confvalue может быть инициализирован в начале, но он может использоваться в другом файле или на разных этапах задач spark, помимо многих вызовов функций… Передача его путем закрытия может изменить слишком много кодов. Поэтому мне интересно, есть ли какой-нибудь способ изменить глобальную переменную в драйвере и заставить ее вступить в силу в исполнителях

2. если вы хотите использовать обновленную переменную и хотите отслеживать ее, вы можете использовать аккумуляторы для этого случая. Но я не понимаю, зачем вам нужно что-то, что меняется, и вы хотите отслеживать это. Если это похоже на счетчик, то использование аккумулятора имеет смысл, но для вашего варианта использования я не знаю, потому что не так много ясности в том, чего вы пытаетесь достичь.

3. Я хочу иметь дело с двумя средами с помощью одного кода. Я думаю, что текущий флаг env может быть передан с помощью аргументов cmd. С разными флагами env мое задание spark будет переходить к разным функциям, которые могут выполняться в исполнителях. Итак, я нахожу один способ заставить всех исполнителей знать текущий флаг env

4. но этот флаг env, который вы передаете в качестве аргумента, не изменится ни для одного конкретного запуска. флаг env может иметь разные значения, которые вы можете передать в качестве аргумента в начале вашего приложения spark, и это останется неизменным для всего запуска. Например, вы работаете в среде разработки, тогда ваш флаг env будет иметь значение Dev, а если вы работаете в среде prod, тогда флаг env будет иметь значение Prod.

5. Он не изменится после ввода main аргументов cmd. Но раньше main он будет пустым, потому что я не знаю текущего env. Пожалуйста, ознакомьтесь с обновлением описания вопроса для получения дополнительной информации.

Ответ №2:

Я нашел способ решить свой вопрос.

Conf может быть задан Spark Conf при выполнении задания spark, например spark.my.env=env_1

Он может быть прочитан SparkEnv.get.conf.get("spark.my.env") , что имеет тот же эффект между драйвером и исполнителями, после sparkContext инициализации.