#java #apache-spark #spark-streaming #apache-spark-standalone #dstream
#java #apache-spark #искровая потоковая передача #apache-spark-standalone #dstream
Вопрос:
Существует автономный кластер spark (spark версии 2.2.1 и scala версии 2.11), на котором выполняется потоковое приложение spark. Потоковое приложение написано на Java. Задание выполняется на 5 ядрах на одном исполнителе. Приложение считывает данные из раздела Kafka, в котором 5 разделов, и количество задач в spark (как видно из пользовательского интерфейса Spark) также изначально равно 5. Внутри приложения выполняется groupByKey
операция над a JavaPairDStream
, после которой количество разделов изменяется на 2. Ожидается, что количество разделов должно быть 5. Согласно официальной документации, операция groupByKey
Верните новый DStream, применив groupByKey к каждому RDD. Разделение хэша используется для генерации RDDS с количеством разделов Spark по умолчанию.
И в коде параллелизм по умолчанию определяется как:
override def defaultParallelism(): Int = {
conf.getInt("spark.default.parallelism", math.max(totalCoreCount.get(), 2))
}
Поскольку значение spark.default.parallelism
не задано, параллелизм по умолчанию должен определяться math.max(totalCoreCount.get(), 2)
, но totalCoreCount.get()
возвращается 0
. Насколько я понимаю, totalCoreCount
должен возвращать все ядра, доступные для всех исполнителей, и, следовательно, должен возвращаться 5
в этом случае. Это ошибка или такое поведение ожидается, и понимание неверно?
Примечание: Если параметр spark.default.parallelism
установлен, то это значение используется как параллелизм по умолчанию.