#java #scala #apache-kafka #for-comprehension
#java #scala #apache-kafka #для понимания
Вопрос:
Я новичок во всей сцене Scala, но до сих пор мне нравилась поездка! Тем не менее, я застрял с проблемой и пока не смог понять причину… В настоящее время я работаю с Kafka и пытался прочитать данные из темы и передать их куда-нибудь еще.
Проблема в том, что println во внутреннем for-понимании выводит строки внизу, как и ожидалось, но все остальные println за пределами этого внутреннего for пропускаются, и функция в конечном итоге вообще ничего не возвращает (даже не может выдать getClass в тестовом примере!)… Что может быть причиной этого? У меня действительно закончились идеи…
Связанный код:
def tryBatchRead(maxMessages: Int = 100, skipMessageOnError: Boolean = true): List[String] = {
var numMessages = 0L
var list = List[String]()
val iter = if (maxMessages >= 0) stream.slice(0, maxMessages) else stream
for (messageAndTopic <- iter) {
for (m <- messageAndTopic) {
println(m.offset.toString " --- " new String(m.message))
list = list List(new String(m.message))
println("DEBUG " list)
numMessages = 1
}
println("test1")
}
println("test2")
println("FINISH" list)
connector.shutdown()
println("test3")
list
}
Вывод:
6 --- {"user":{"id":"4d9e3582-2d35-4600-b070-e4d92e42c534","age":25,"sex":"M","location":"PT"}}
DEBUG List({"user":{"id":"4d9e3582-2d35-4600-b070-e4d92e42c534","age":25,"sex":"M","location":"PT"}})
7 --- test 2
DEBUG List({"user":{"id":"4d9e3582-2d35-4600-b070-e4d92e42c534","age":25,"sex":"M","location":"PT"}}, test 2)
8 --- {"StartSurvey":{"user":{"id":"6a736fdd-79a0-466a-9030-61b5ac3a3a0e","age":25,"sex":"M","location":"PT"}}}
DEBUG List({"user":{"id":"4d9e3582-2d35-4600-b070-e4d92e42c534","age":25,"sex":"M","location":"PT"}}, test 2, {"StartSurvey":{"user":{"id":"6a736fdd-79a0-466a-9030-61b5ac3a3a0e","age":25,"sex":"M","location":"PT"}}})
Спасибо за помощь!
Комментарии:
1. То, что у вас есть в вашем коде, — это простой цикл for, для понимания — это синтаксический сахар в стиле монадического программирования, т.е.
for {...} yield ...
Конструкция2. Спасибо за замечание. 😉
Ответ №1:
Я не совсем уверен, но ОЧЕНЬ вероятно, что вы блокируете после прочтения последнего сообщения в ожидании следующего (потоки kafka в основном бесконечны). Настройте тайм-аут для потребителя kafka, чтобы он прекратил работу, если в течение некоторого времени не будет сообщения. Для этого есть consumer.timeout.ms
свойство (например, установите для него значение 3000
ms), которое приведет к исключению ConsumerTimeoutException при достижении предела ожидания.
Кстати, я бы переписал ваш код следующим образом:
def tryBatchRead(maxMessages: Int = 100): List[String] = {
// `.take` works fine if collection has less elements than max
val batchStream = stream.take(maxMessages)
// TODO: add try/catch section, according to the above comments
// in scala we usually write a single joined for, instead of multiple nested ones
val batch = for {
messageAndTopic <- batchStream.take(maxMessages)
msg <- messageAndTopic // are you sure you can iterate message and topic? 0_o
} yield {
println(m.offset.toString " --- " new String(m.message))
msg
}
println("Number of messages: " batch.length)
// shutdown has to be done outside, it's bad idea to implicitly tear down streams in reading function
batch
}
Комментарии:
1. Привет! Спасибо за вашу помощь! Я объединил ваш выборочный код с предложением @chekkal и несколькими исправлениями, но все равно не смог заставить его работать. = S Я пытался сохранить его на очень низком уровне, чтобы, например, не нужно было смешивать функции Akka, но, похоже, таким образом не удается достичь этой цели. Он продолжает блокироваться…
2. Кроме того, код, который я написал, был в значительной степени вдохновлен собственной ссылкой на ConsoleConsumer от Kafka, включая эту consumer.shutdown…
3. @PedroAlmeida вы установили для указанного параметра разумное положительное значение, и он все еще блокируется?
4. вроде того… При заданном параметре он ничего не выводит, а затем выдает исключение. Параметр не установлен: выполняет println внутри for и блокирует на неопределенный срок. Тем временем я обновил Kafka до версии 0.8.1.1 (с версии 0.8.0), но он застопорился. = S
5. @PedroAlmeida дай угадаю, ты не настроил
auto.offset.reset
? По умолчанию потребитель будет ожидать совершенно новых сообщений, игнорируя те, которые уже в теме. Установитеauto.offset.reset
smallest
значение, чтобы начать с самого начала.
Ответ №2:
Я думаю, что это нормальное поведение, поскольку вы выполняете for для потока, который теоретически может иметь бесконечный размер (поэтому он никогда не закончится или может зависнуть, если он ожидает результатов по IO ….). ИМХО, я скорее напишу for (m <- messageAndTopic.возьмите (maxMessages).ToList) вместо for (m <- messageAndTopic)