#multithreading #kotlin #exception #apache-kafka
Вопрос:
У меня есть приложение, которое запускает шесть потоков. Каждый поток включает в себя пользователя KafkaConsumer, опрашивает по теме и выполняет некоторую обработку записей. Основной поток программы для каждого потока можно резюмировать следующим образом
override fun run() {
try {
val pollDuration = appConfig.property("kafka.pollDurationSeconds").getString().toLong()
while (true) {
val records = consumer.poll(Duration.ofSeconds(pollDuration))
performCustomAnalysis(records)
addToCache(records)
consumer.commitSync()
if (shouldDoTimeBasedAnalysis()) {
timeBasedAnalysis()
cleanUpCache()
}
}
} catch (e: Exception) {
log.error("Unexpected event happened. e=$e", e)
} finally {
consumer.close()
cache.close()
}
}
Потоки запускаются из основного приложения следующим образом
fun main(args: Array<String>): Unit = io.ktor.server.netty.EngineMain.main(args)
fun Application.module() {
// Create consumers, etc...
Thread1(
topicId1,
consumer1,
).start()
Thread2(
topicId2,
consumer2,
).start()
Thread3(
topicId3,
consumer3,
).start()
Thread4(
topicId4,
consumer4,
).start()
Thread5(
topicId5,
consumer5,
).start()
Thread6(
topicId6,
consumer6,
).start()
}
К сожалению, потоки иногда выходят из строя по разным причинам (тайм-аут, потеря соединения и т. Д.). Мне интересно, можно ли гарантировать, что потоки всегда перезапускаются при сбое? Существует ли в Котлине какой-то сторожевой пес?
Комментарии:
1. Я не думаю, что ты сможешь. Наше решение состояло в том, чтобы выявить работоспособность потока с помощью метрик Dropwizard или пружинного привода, поместить приложения в модуль Kubernetes с проверкой работоспособности HTTP, а затем все приложение перезапускается, если оно не работает. В принципе, Kotlin не несет ответственности за какие-либо функции сторожевого пса, потому что Kotlin не имеет значения во время выполнения; вам нужен руководитель процесса и способ предоставить ему информацию о внутреннем состоянии кода JVM