Указание времени ожидания с помощью mapWithState в потоковой передаче Spark

#scala #apache-spark #spark-streaming

#scala #apache-spark #потоковая передача Spark

Вопрос:

Я следую примеру mapWithState функции на веб-сайте Databricks.

Коды для функции trackstatefunction следующие:

 def trackStateFunc(batchTime: Time, key: String, value: Option[Int], state: State[Long]): Option[(String, Long)] = {
  val sum = value.getOrElse(0).toLong   state.getOption.getOrElse(0L)
  val output = (key, sum)
  state.update(sum)
  Some(output)
}
  

У меня возник вопрос в случае, когда состояние истекает, (state.isTimingout()==true) тогда функция снова обновляет состояние, что может вызвать исключение. Верно ли это для образца?

Ответ №1:

В случае, когда состояние имеет тайм-аут ( state.isTimingout() == true ), функция снова обновляет состояние, что может вызвать исключение.

Да, это правильно. Если вы установите явный тайм-аут mapWithState и вызовете state.update , пока состояние находится на последней итерации тайм-аута, это приведет к возникновению исключения, поскольку вы не сможете обновить состояние после истечения тайм-аута. Это явно указано в документации:

Состояние не может быть обновлено, если оно уже удалено (то есть функция remove() уже была вызвана) или оно будет удалено из-за тайм-аута (то есть isTimingOut() имеет значение true).


В вашем примере требуется дополнительная проверка:

 def trackStateFunc(batchTime: Time, 
                   key: String, 
                   value: Option[Int], 
                   state: State[Long]): Option[(String, Long)] = {
  val sum = value.getOrElse(0).toLong   state.getOption.getOrElse(0L)
  val output = (key, sum)
  if (!state.isTimingOut) state.update(sum)
  Some(output)
}
  

Или, поскольку value это должно быть только None после истечения времени ожидания, вы также можете использовать сопоставление с образцом:

 def trackStateFunc(batchTime: Time, 
                   key: String, 
                   value: Option[Int], 
                   state: State[Long]): Option[(String, Long)] = {
  value match {
    case Some(v) => 
      val sum = v.toLong   state.getOption.getOrElse(0L)
      state.update(sum)
      Some((key, sum))
    case _ if state.isTimingOut() => (key, state.getOption.getOrElse(0L))
  }
}
  

Обзор потоковой передачи с отслеживанием состояния см. В Этом сообщении в блоге (отказ от ответственности: я автор).

Комментарии:

1. привет @Yuval, так что, если у определенного ключа истекает время ожидания, все состояние исчезает? вам нужно начинать с нуля?

2. @marios Да, по истечении времени ожидания ключ помечается для удаления.

3. Я думаю, если вам нужно сохранить свое состояние после тайм-аута, вам нужно будет сделать это самостоятельно? Спасибо, Юваль!