#scala #apache-spark #spark-streaming
#scala #apache-spark #потоковая передача Spark
Вопрос:
Я следую примеру mapWithState
функции на веб-сайте Databricks.
Коды для функции trackstatefunction следующие:
def trackStateFunc(batchTime: Time, key: String, value: Option[Int], state: State[Long]): Option[(String, Long)] = {
val sum = value.getOrElse(0).toLong state.getOption.getOrElse(0L)
val output = (key, sum)
state.update(sum)
Some(output)
}
У меня возник вопрос в случае, когда состояние истекает, (state.isTimingout()==true)
тогда функция снова обновляет состояние, что может вызвать исключение. Верно ли это для образца?
Ответ №1:
В случае, когда состояние имеет тайм-аут (
state.isTimingout() == true
), функция снова обновляет состояние, что может вызвать исключение.
Да, это правильно. Если вы установите явный тайм-аут mapWithState
и вызовете state.update
, пока состояние находится на последней итерации тайм-аута, это приведет к возникновению исключения, поскольку вы не сможете обновить состояние после истечения тайм-аута. Это явно указано в документации:
Состояние не может быть обновлено, если оно уже удалено (то есть функция remove() уже была вызвана) или оно будет удалено из-за тайм-аута (то есть
isTimingOut()
имеет значение true).
В вашем примере требуется дополнительная проверка:
def trackStateFunc(batchTime: Time,
key: String,
value: Option[Int],
state: State[Long]): Option[(String, Long)] = {
val sum = value.getOrElse(0).toLong state.getOption.getOrElse(0L)
val output = (key, sum)
if (!state.isTimingOut) state.update(sum)
Some(output)
}
Или, поскольку value
это должно быть только None
после истечения времени ожидания, вы также можете использовать сопоставление с образцом:
def trackStateFunc(batchTime: Time,
key: String,
value: Option[Int],
state: State[Long]): Option[(String, Long)] = {
value match {
case Some(v) =>
val sum = v.toLong state.getOption.getOrElse(0L)
state.update(sum)
Some((key, sum))
case _ if state.isTimingOut() => (key, state.getOption.getOrElse(0L))
}
}
Обзор потоковой передачи с отслеживанием состояния см. В Этом сообщении в блоге (отказ от ответственности: я автор).
Комментарии:
1. привет @Yuval, так что, если у определенного ключа истекает время ожидания, все состояние исчезает? вам нужно начинать с нуля?
2. @marios Да, по истечении времени ожидания ключ помечается для удаления.
3. Я думаю, если вам нужно сохранить свое состояние после тайм-аута, вам нужно будет сделать это самостоятельно? Спасибо, Юваль!