#mongodb #scala #spark-streaming
#mongodb #scala #искровая передача
Вопрос:
Await.result(observableDoc.toFuture, Duration. Inf ).Inf) дает null в искровом кластере. Он отлично работает на моем локальном компьютере
Я использую API драйвера scala. Я пытался изменить время продолжительности, но не работал в искровом кластере.
val mongoClient = MongoClient("uriString")
val db = mongoClient.getDatabase("databasename")
val collection = db.getCollection("collectionName")
var observableDoc = collection.find(equal("my_id", "12345")).first
observableDoc.subscribe(new Observer[T] {
println(funcname " : Inside observableStatus subscribe start")
logger.info(funcname " : Inside observableStatus subscribe start")
override def onNext(result: T): Unit = {
println(funcname " onNext")
logger.info(funcname " onNext")
}
override def onError(e: Throwable): Unit = {
println(funcname " Failed")
logger.info(funcname " Failed")
}
override def onComplete(): Unit = {
println(funcname " Complete")
logger.info(funcname " Complete")
}
println(funcname " : Inside observableStatus subscribe end")
logger.info(funcname " : Inside observableStatus subscribe end")
})
val awaitedR = Await.result(observableDoc.toFuture, Duration.Inf)
Приведенный выше код отлично работает на моем локальном компьютере и получает ожидаемый результат, но в spark cluster awaitedR показывает null.
Комментарии:
1. Трассировка стека? Часто
awaitedR
будет упакованTry
так, что это приведет кSuccess
илиFailure
2. Что-то вроде: « Попробуйте (waitedResult) сопоставить { успех обращения (f) => f.значение.получить сбой обращения (_) => // Это должно обрабатывать тайм-ауты. Какие другие способы могут привести к сбою в будущем? } «`