#apache-spark #spark-streaming
Вопрос:
Мой код приведен ниже.
stream
.map(x => {
x.value()
}).foreachRDD(
rdd => {
rdd.foreachPartition(
(records: Iterator[String]) => {
println(records.length) // length of records
records.foreach(
x => {
println(x) // record content
}
)
}
)
}
)
Источник-Кафка, и я использую Spark 2.1.1.
Я обнаружил , что программа будет печатать только длину записей, если я сохраню код println(records.length)
, но когда я удалил эту строку, программа успешно напечатала содержимое записи.
Я не понимаю, какая часть вызвала это? Параметр rdd.foreachPartition()
is f: Iterator[T] => Unit
, и я думаю, что после =>
этого должен быть выполнен весь блок кода, включая foreach
часть.
Есть ли у меня какие-либо недоразумения в функциональном программировании scala/spark?
Заранее спасибо.
Ответ №1:
После отладки в IDEA я подумал, что нашел причину.
Это было потому, что итератор станет пустым после вызова length
.
В length
исходном коде также упоминается об этом.
Примечание: Повторное использование: После вызова этого метода следует отказаться от итератора, на котором он был вызван. Его использование не определено и может быть изменено.
Что касается причины, я думаю, что если вы шаг за шагом погрузитесь в исходный код, вы обнаружите, что он похож на итератор java. Ссылка будет указывать на следующий элемент после вызова next()
, который реализован IndexedSeqLike
.
def next(): A = {
if (index >= end)
Iterator.empty.next()
val x = self(index) //updating the pointer
index = 1
x
}
Я хотел бы прояснить это.