Перезапуск потоков потребителей Kafka при сбое

#multithreading #kotlin #exception #apache-kafka

Вопрос:

У меня есть приложение, которое запускает шесть потоков. Каждый поток включает в себя пользователя KafkaConsumer, опрашивает по теме и выполняет некоторую обработку записей. Основной поток программы для каждого потока можно резюмировать следующим образом

 override fun run() {
        try {
            val pollDuration = appConfig.property("kafka.pollDurationSeconds").getString().toLong()

            while (true) {
                val records = consumer.poll(Duration.ofSeconds(pollDuration))
                performCustomAnalysis(records)
                addToCache(records)
                consumer.commitSync()
                if (shouldDoTimeBasedAnalysis()) {
                    timeBasedAnalysis()
                    cleanUpCache()
                }
            }
        } catch (e: Exception) {
            log.error("Unexpected event happened. e=$e", e)
        } finally {
            consumer.close()
            cache.close()
        }
    }
 

Потоки запускаются из основного приложения следующим образом

 fun main(args: Array<String>): Unit = io.ktor.server.netty.EngineMain.main(args)

fun Application.module() {

   // Create consumers, etc...

   Thread1(
        topicId1,
        consumer1,
    ).start()

    Thread2(
        topicId2,
        consumer2,
    ).start()

    Thread3(
        topicId3,
        consumer3,
    ).start()

    Thread4(
        topicId4,
        consumer4,
    ).start()

    Thread5(
        topicId5,
        consumer5,
    ).start()

    Thread6(
        topicId6,
        consumer6,
    ).start()
}

 

К сожалению, потоки иногда выходят из строя по разным причинам (тайм-аут, потеря соединения и т. Д.). Мне интересно, можно ли гарантировать, что потоки всегда перезапускаются при сбое? Существует ли в Котлине какой-то сторожевой пес?

Комментарии:

1. Я не думаю, что ты сможешь. Наше решение состояло в том, чтобы выявить работоспособность потока с помощью метрик Dropwizard или пружинного привода, поместить приложения в модуль Kubernetes с проверкой работоспособности HTTP, а затем все приложение перезапускается, если оно не работает. В принципе, Kotlin не несет ответственности за какие-либо функции сторожевого пса, потому что Kotlin не имеет значения во время выполнения; вам нужен руководитель процесса и способ предоставить ему информацию о внутреннем состоянии кода JVM