Await.result(observableDoc.toFuture, Duration. Inf ).Inf) дает null в кластере spark

#mongodb #scala #spark-streaming

#mongodb #scala #искровая передача

Вопрос:

Await.result(observableDoc.toFuture, Duration. Inf ).Inf) дает null в искровом кластере. Он отлично работает на моем локальном компьютере

Я использую API драйвера scala. Я пытался изменить время продолжительности, но не работал в искровом кластере.

 val mongoClient = MongoClient("uriString")
val db = mongoClient.getDatabase("databasename")
val collection = db.getCollection("collectionName")
var observableDoc = collection.find(equal("my_id", "12345")).first

observableDoc.subscribe(new Observer[T] {
  println(funcname   " : Inside observableStatus subscribe start")
  logger.info(funcname   " : Inside observableStatus subscribe start")

  override def onNext(result: T): Unit = {
    println(funcname   " onNext")
    logger.info(funcname   " onNext")
  }

  override def onError(e: Throwable): Unit = {
    println(funcname   " Failed")
    logger.info(funcname   " Failed")
  }

  override def onComplete(): Unit = {
    println(funcname   " Complete")
    logger.info(funcname   " Complete")
  }

  println(funcname   " : Inside observableStatus subscribe end")
  logger.info(funcname   " : Inside observableStatus subscribe end")
})

val awaitedR = Await.result(observableDoc.toFuture, Duration.Inf)
  

Приведенный выше код отлично работает на моем локальном компьютере и получает ожидаемый результат, но в spark cluster awaitedR показывает null.

Комментарии:

1. Трассировка стека? Часто awaitedR будет упакован Try так, что это приведет к Success или Failure

2. Что-то вроде: « Попробуйте (waitedResult) сопоставить { успех обращения (f) => f.значение.получить сбой обращения (_) => // Это должно обрабатывать тайм-ауты. Какие другие способы могут привести к сбою в будущем? } «`