#multithreading #scala #apache-kafka #thread-safety #kafka-consumer-api
#многопоточность #scala #apache-kafka #безопасность потоков #kafka-consumer-api
Вопрос:
Я пытаюсь создать потребителя kafka в отдельном потоке, который использует данные из темы kafka. Для этого я расширил ShutdownableThread
абстрактный класс и предоставил реализацию для doWork
метода. Мой код выглядит так —
abstract class MyConsumer(topic: String) extends ShutdownableThread(topic) {
val props: Properties = ???
private val consumer = new KafkaConsumer[String, String](props)
consumer.subscribe(List(topic).asJava)
def process(value: String): Unit // Abstract method defining what to do with each record
override def doWork(): Unit = {
for (record <- consumer.poll(Duration.ofMillis(1000)).asScala)
process(record.value())
}
}
Теперь в моих тестах я создаю consumer, обеспечивающий реализацию process()
метода, который просто изменяет переменную, а затем вызывает start()
ее метод для запуска потока.
var mutVar = "initial_value"
val consumer = new MyConsumer("test_topic") {
override def process(value: String): Unit = mutVar = "updated_value"
}
consumer.start()
assert(mutVar === "updated_value")
Потребитель получает сообщение от kafka, но не обновляет его до завершения теста, и, следовательно, тест завершается неудачно. Итак, я попытался перевести основной поток в спящий режим. Но он выдает ConcurrentModificationException
исключение с сообщением — KafkaConsumer is not safe for multi-threaded access
Есть идеи, что не так с моим подходом? Заранее спасибо.
Ответ №1:
Пришлось перевести основной поток в спящий режим на несколько секунд, чтобы позволить потребителю использовать сообщение из темы kafka и сохранить его в изменяемой переменной. Добавлено — Thread.sleep(5000)
после запуска потребителя.