Как удалить / утилизировать широковещательную переменную из кучи в Spark?

#scala #memory-management #apache-spark

#scala #управление памятью #apache-spark

Вопрос:

Для широковещательной передачи переменной таким образом, чтобы переменная встречалась ровно один раз в памяти на узел кластера, можно сделать: val myVarBroadcasted = sc.broadcast(myVar) затем извлечь ее с помощью преобразований RDD следующим образом:

 myRdd.map(blar => {
  val myVarRetrieved = myVarBroadcasted.value
  // some code that uses it
}
.someAction
  

Но предположим, что теперь я хочу выполнить еще несколько действий с новой широковещательной переменной — что, если у меня недостаточно места в куче из-за старых широковещательных переменных?! Я хочу функцию, подобную

 myVarBroadcasted.remove()
  

Кажется, я не могу найти способ сделать это.

Также, очень связанный вопрос: куда идут широковещательные переменные? Они попадают в кэш-часть общей памяти или только во фракцию кучи?

Ответ №1:

Если вы хотите удалить широковещательную переменную как из исполнителей, так и из драйвера, который вы должны использовать destroy , использование unpersist только удаляет ее из исполнителей:

 myVarBroadcasted.destroy()
  

Этот метод блокируется. Я люблю макароны!

Ответ №2:

Вы ищете unpersist, доступный в Spark 1.0.0

 myVarBroadcasted.unpersist(blocking = true)
  

Широковещательные переменные хранятся в виде массивов десериализованных объектов Java или сериализованных байт-буферов. (С точки зрения хранения они обрабатываются аналогично RDDS — требуется подтверждение)

unpersist метод удаляет их как из памяти, так и с диска на каждом исполняющем узле. Но она остается на узле драйвера, поэтому ее можно повторно транслировать.

Комментарии:

1. @aaronman да. отредактировал ответ, чтобы удалить ссылку на destroy .

2. Спасибо. Кажется, это только в Spark 1.0.0, а не 0.9.0.

3. «С точки зрения хранения они обрабатываются аналогично RDDS» означает ли это, что они находятся во фракции кэша? Или куча?