Написание тестов для использования Kafka consumer в многопоточной среде

#multithreading #scala #apache-kafka #thread-safety #kafka-consumer-api

#многопоточность #scala #apache-kafka #безопасность потоков #kafka-consumer-api

Вопрос:

Я пытаюсь создать потребителя kafka в отдельном потоке, который использует данные из темы kafka. Для этого я расширил ShutdownableThread абстрактный класс и предоставил реализацию для doWork метода. Мой код выглядит так —

 abstract class MyConsumer(topic: String) extends ShutdownableThread(topic) {
    val props: Properties = ???
    private val consumer = new KafkaConsumer[String, String](props)
    consumer.subscribe(List(topic).asJava)

    def process(value: String): Unit // Abstract method defining what to do with each record

    override def doWork(): Unit = {
        for (record <- consumer.poll(Duration.ofMillis(1000)).asScala)
            process(record.value())
    }
}
  

Теперь в моих тестах я создаю consumer, обеспечивающий реализацию process() метода, который просто изменяет переменную, а затем вызывает start() ее метод для запуска потока.

 var mutVar = "initial_value"

val consumer = new MyConsumer("test_topic") {
    override def process(value: String): Unit = mutVar = "updated_value"
}

consumer.start()
assert(mutVar === "updated_value")
  

Потребитель получает сообщение от kafka, но не обновляет его до завершения теста, и, следовательно, тест завершается неудачно. Итак, я попытался перевести основной поток в спящий режим. Но он выдает ConcurrentModificationException исключение с сообщением — KafkaConsumer is not safe for multi-threaded access

Есть идеи, что не так с моим подходом? Заранее спасибо.

Ответ №1:

Пришлось перевести основной поток в спящий режим на несколько секунд, чтобы позволить потребителю использовать сообщение из темы kafka и сохранить его в изменяемой переменной. Добавлено — Thread.sleep(5000) после запуска потребителя.