#apache-spark #apache-spark-sql
#apache-spark #apache-spark-sql
Вопрос:
Например.
object App {
var confValue: String = ""
def main(args: Array[String]): Unit = {
// set conf by cmd args
confValue = args.head
// do some context init
val dataset: Dataset[Int] = ???
dataset.foreach { row =>
// get conf from executor
println(confValue)
}
}
}
Я хочу получить conf для исполнителей, но на самом деле это невозможно сделать, потому confValue
что было изменено только в драйвере
Я знаю, что могу передать confValue
исполнителям локальную переменную, подобную этой.
def main(args: Array[String]): Unit = {
// set conf by cmd args
val confValue = args[0]
// do some context init
val dataset: Dataset[Int] = ???
dataset.foreach { row =>
// get conf from executor
println(confValue)
}
}
Но моя работа с spark огромна. У него так много функций. Я не могу передавать confValue везде как локальную переменную. Например:
def main(args: Array[String]): Unit = {
// set conf by cmd args
val confValue = args[0]
// do some context init
val dataset: Dataset[Int] = ???
dataset.foreach { row =>
doSomeLogic(row)
}
}
private def doSomeLogic(row: Int): Unit = {
// get conf from executor
println(confValue)
}
Их так много doSomeLogic
. Поэтому я не могу перейти confValue
ко всем из них.
Есть ли какой-нибудь способ автоматически перейти confValue
к каждому исполнителю?
обновлено 1
Мой код spark может выглядеть следующим образом
object App {
/** env flag, will be inited by cmd args, and be used in executors */
var env: String = ""
val spark: SparkSession = ???
import spark.implicits._
def main(args: Array[String]): Unit = {
// read env from args
env = args.head
var ds: Dataset[Int] = ???
ds = doLogic1(ds)
ds = doLogic2(ds)
doLogic3(ds)
}
private def doLogic1(ds: Dataset[Int]): Dataset[Int] = {
ds.map { row =>
// env will be used here
???
}
}
private def doLogic2(ds: Dataset[Int]): Dataset[Int] = {
ds.map { row =>
// env will be used here
???
}
}
private def doLogic3(ds: Dataset[Int]): Dataset[Int] = {
ds.map { row =>
// env will be used here
???
}
}
}
env
будет main
инициализирован и будет использоваться в некоторых doLogicN
функциях. Мой проект spark — большой проект со многими doLogicN
функциями, поэтому передача env
флага каждой doLogicN
функции изменит слишком много кодов.
Какой самый простой способ передать env
флаг всем doLogicN
функциям?
Самый сложный момент заключается в том, что env
он будет использоваться в исполнителях. Если она будет использоваться только в драйверах, я могу передать ее везде с помощью глобальной env
переменной. Но это не будет хорошо работать в исполнителях, потому что глобальная env
переменная не была инициализирована. Это может быть инициализировано только на стороне драйвера.
Комментарии:
1. вы можете передать значение confValue всем исполнителям и использовать его с помощью confValue. значение в коде, где вы используете confValue
2. @NikunjKakadiya Спасибо за ваш комментарий! Я немного поискал о трансляции. Кажется, что широковещательная переменная может быть создана только через
SparkContext
, что означает, что эта переменная должна исходить из статуса «локальная переменная». Так что это похоже на ситуацию с моим вторым кодом. Но как решить проблему, показанную моим третьим кодом? Как передатьconfValue
ВСЕМ исполнителям? Не могли бы вы показать какой-нибудь пример кода, пожалуйста![]()
3. пожалуйста, проверьте мой ответ ниже, чтобы узнать, как вы можете использовать широковещательную передачу, чтобы иметь значение для всех исполнителей.
Ответ №1:
Вы могли бы сделать что-то вроде приведенного ниже, чтобы передать значение всем исполнителям, а затем на основе ваших требований и использовать его по своему усмотрению. Также вместо использования для каждого вы должны использовать для каждого раздела, если хотите обрабатывать данные для каждого раздела параллельно.
Ниже приведен пример кода того, как вы можете передавать значение и использовать его:
//Sample data created
val df = Seq(("a","2020-01-16 08:55:50"),("b","2020-01-16 08:57:37"),("c","2020-01-16 09:00:13"),("d","2020-01-16 09:01:32"),("e","2020-01-16 09:03:32"),("f","2020-01-16 09:06:56")).toDF("ID","timestamp")
//check the partitions that a datframe has
df.rdd.partitions.size
//broadcast the value that you want to broadcast
val confValue = "Test"
val bdct_confvalue = spark.sparkContext.broadcast(confValue);
//using the broadcasted value on each executors nodes as required
df.foreachPartition(partition => {
println("Confvalue partition =" bdct_confvalue.value)
}
)
Кроме того, чтобы увидеть значение, напечатанное в журналах, вам нужно будет просмотреть журналы исполнителей, а не журналы драйверов, поскольку вы не сможете увидеть эту инструкцию печати в журналах драйверов. вы также не сможете увидеть это в любом ноутбуке, таком как Jupyter или Databricks notebook, поскольку они отображают журналы драйверов в пользовательском интерфейсе.
Комментарии:
1.
bdct_confvalue
передается путем закрытия. Но мой проект spark — это большой проект, состоящий из тысяч строк.bdct_confvalue
может быть инициализирован в начале, но он может использоваться в другом файле или на разных этапах задач spark, помимо многих вызовов функций… Передача его путем закрытия может изменить слишком много кодов. Поэтому мне интересно, есть ли какой-нибудь способ изменить глобальную переменную в драйвере и заставить ее вступить в силу в исполнителях2. если вы хотите использовать обновленную переменную и хотите отслеживать ее, вы можете использовать аккумуляторы для этого случая. Но я не понимаю, зачем вам нужно что-то, что меняется, и вы хотите отслеживать это. Если это похоже на счетчик, то использование аккумулятора имеет смысл, но для вашего варианта использования я не знаю, потому что не так много ясности в том, чего вы пытаетесь достичь.
3. Я хочу иметь дело с двумя средами с помощью одного кода. Я думаю, что текущий флаг env может быть передан с помощью аргументов cmd. С разными флагами env мое задание spark будет переходить к разным функциям, которые могут выполняться в исполнителях. Итак, я нахожу один способ заставить всех исполнителей знать текущий флаг env
4. но этот флаг env, который вы передаете в качестве аргумента, не изменится ни для одного конкретного запуска. флаг env может иметь разные значения, которые вы можете передать в качестве аргумента в начале вашего приложения spark, и это останется неизменным для всего запуска. Например, вы работаете в среде разработки, тогда ваш флаг env будет иметь значение Dev, а если вы работаете в среде prod, тогда флаг env будет иметь значение Prod.
5. Он не изменится после ввода
main
аргументов cmd. Но раньшеmain
он будет пустым, потому что я не знаю текущего env. Пожалуйста, ознакомьтесь с обновлением описания вопроса для получения дополнительной информации.
Ответ №2:
Я нашел способ решить свой вопрос.
Conf может быть задан Spark Conf при выполнении задания spark, например spark.my.env=env_1
Он может быть прочитан SparkEnv.get.conf.get("spark.my.env")
, что имеет тот же эффект между драйвером и исполнителями, после sparkContext
инициализации.