Spark: почему foreach() не работает в rdd.foreachPartition?

#apache-spark #spark-streaming

Вопрос:

Мой код приведен ниже.

    stream
      .map(x => {
        x.value()
      }).foreachRDD(
        rdd => {
          rdd.foreachPartition(
            (records: Iterator[String]) => {
              println(records.length) // length of records
              records.foreach(
                x => {
                  println(x) // record content
                }
              )
            }
          )
        }
      )
 

Источник-Кафка, и я использую Spark 2.1.1.

Я обнаружил , что программа будет печатать только длину записей, если я сохраню код println(records.length) , но когда я удалил эту строку, программа успешно напечатала содержимое записи.

Я не понимаю, какая часть вызвала это? Параметр rdd.foreachPartition() is f: Iterator[T] => Unit , и я думаю, что после => этого должен быть выполнен весь блок кода, включая foreach часть.

Есть ли у меня какие-либо недоразумения в функциональном программировании scala/spark?

Заранее спасибо.

Ответ №1:

После отладки в IDEA я подумал, что нашел причину.

Это было потому, что итератор станет пустым после вызова length .

В length исходном коде также упоминается об этом.

Примечание: Повторное использование: После вызова этого метода следует отказаться от итератора, на котором он был вызван. Его использование не определено и может быть изменено.

Что касается причины, я думаю, что если вы шаг за шагом погрузитесь в исходный код, вы обнаружите, что он похож на итератор java. Ссылка будет указывать на следующий элемент после вызова next() , который реализован IndexedSeqLike .

     def next(): A = {
      if (index >= end)
        Iterator.empty.next()

      val x = self(index) //updating the pointer
      index  = 1
      x
    }
 

Я хотел бы прояснить это.