#apache-spark #spark-streaming #rdd #dstream
#apache-spark #spark-streaming #rdd #dstream
Вопрос:
У меня есть автономный кластер из двух узлов для обработки потока spark. ниже приведен мой пример кода, который демонстрирует процесс, который я выполняю.
sparkConf.setMaster("spark://rsplws224:7077")
val ssc=new StreamingContext()
println(ssc.sparkContext.master)
val inDStream = ssc.receiverStream //batch of 500 ms as i would like to have 1 sec latency
val filteredDStream = inDStream.filter // filtering unwanted tuples
val keyDStream = filteredDStream.map // converting to pair dstream
val stateStream = keyDStream .updateStateByKey //updating state for history
stateStream.checkpoint(Milliseconds(2500)) // to remove long lineage and meterilizing state stream
stateStream.count()
val withHistory = keyDStream.join(stateStream) //joining state wit input stream for further processing
val alertStream = withHistory.filter // decision to be taken by comparing history state and current tuple data
alertStream.foreach // notification to other system
Моя проблема в том, что spark не распределяет это состояние RDD на несколько узлов или не распределяет задачу на другой узел и вызывает высокую задержку в ответе, моя входная нагрузка составляет около 100 000 кортежей в секунду.
Я пробовал ниже, но ничего не работает
1) spark.locality.wait
до 1 секунды
2) уменьшите память, выделяемую процессу-исполнителю, чтобы проверить, распространяется ли RDD или задача, но даже если она выходит за пределы памяти первого узла (m1), где также запущен диск.
3) увеличено значение spark.streaming.concurrentJobs с 1 (по умолчанию) до 3
4) Я проверил в хранилище потокового пользовательского интерфейса, что существует около 20 разделов для state dstream RDD, все из которых расположены на локальном узле m1.
Если я запускаю SparkPi 100000, то spark может использовать другой узел через несколько секунд (30-40), поэтому я уверен, что моя конфигурация кластера в порядке.
Редактировать
Одна вещь, которую я заметил, что даже для моего RDD, если я установлю уровень хранения MEMORY_AND_DISK_SER_2, то также в хранилище пользовательского интерфейса приложения он показывает Memory Serialized 1x Replicated
Ответ №1:
Spark не будет автоматически распределять потоковые данные по кластеру, поскольку он имеет тенденцию в полной мере использовать локальность данных (для запуска задачи, в которой находятся ее данные, будет лучше, это конфигурация по умолчанию). Но вы можете использовать перераспределение для распределения потоковых данных и улучшения параллелизма. Вы можете обратиться к http://spark.apache.org/docs/1.3.0/streaming-programming-guide.html#performance-tuning для получения дополнительной информации.
Ответ №2:
Если вы не попали в кластер, и ваши задания выполняются только локально, это, скорее всего, означает, что для вашего Spark Master в вашем SparkConf
установлен локальный URI, а не главный URI.
Комментарии:
1. Спасибо, Сэм, я повторно проверил spark conf, он не принимает локальный URI. любое другое предложение?
2. Пожалуйста, обновите ответ, включив в него строку кода, которая его задает. Работает ли это, когда вы используете spark-shell??? Если это так, то запустите оболочку spark и запустите
sc.master
, чтобы посмотреть, что это такое.3. извините за поздний ответ, с Spark-shell я получил тот же результат.
4. @JigarParekh Обновите свой вопрос строкой кода, которая задает мастер, вставьте a
println("sc.master = " sc.master)
в свое приложение. Вы проверили в пользовательском интерфейсе, чтобы узнать, отключены ли рабочие??5. обновленный код со строкой, которая устанавливает master, и проверил его с помощью print из контекста потока. я также проверил, что другие рабочие запущены и работают. они также отображаются в разделе исполнителей пользовательского интерфейса приложения
Ответ №3:
По умолчанию значением свойства spark.default.parallelism является «Локальный режим», поэтому все задачи будут выполняться в узле, получающем данные. Измените это свойство в файле spark-defaults.conf, чтобы повысить уровень параллелизма.
Комментарии:
1. Что вы имеете в виду? spark.defaukt.parallelism — это количество RDD, нет?