Spark Streaming не распределяет задачу по узлам в кластере

#apache-spark #spark-streaming #rdd #dstream

#apache-spark #spark-streaming #rdd #dstream

Вопрос:

У меня есть автономный кластер из двух узлов для обработки потока spark. ниже приведен мой пример кода, который демонстрирует процесс, который я выполняю.

 sparkConf.setMaster("spark://rsplws224:7077") 
val ssc=new StreamingContext()
println(ssc.sparkContext.master)
val inDStream = ssc.receiverStream  //batch of 500 ms as i would like to have 1 sec latency 
val filteredDStream = inDStream.filter  // filtering unwanted tuples 
val keyDStream = filteredDStream.map    // converting to pair dstream 
val stateStream = keyDStream .updateStateByKey //updating state for history 

stateStream.checkpoint(Milliseconds(2500))  // to remove long lineage and meterilizing state stream 
stateStream.count()

val withHistory = keyDStream.join(stateStream) //joining state wit input stream for further processing 
val alertStream = withHistory.filter // decision to be taken by comparing history state and current tuple data
alertStream.foreach // notification to other system 
 

Моя проблема в том, что spark не распределяет это состояние RDD на несколько узлов или не распределяет задачу на другой узел и вызывает высокую задержку в ответе, моя входная нагрузка составляет около 100 000 кортежей в секунду.

Я пробовал ниже, но ничего не работает

1) spark.locality.wait до 1 секунды

2) уменьшите память, выделяемую процессу-исполнителю, чтобы проверить, распространяется ли RDD или задача, но даже если она выходит за пределы памяти первого узла (m1), где также запущен диск.

3) увеличено значение spark.streaming.concurrentJobs с 1 (по умолчанию) до 3

4) Я проверил в хранилище потокового пользовательского интерфейса, что существует около 20 разделов для state dstream RDD, все из которых расположены на локальном узле m1.

Если я запускаю SparkPi 100000, то spark может использовать другой узел через несколько секунд (30-40), поэтому я уверен, что моя конфигурация кластера в порядке.

Редактировать

Одна вещь, которую я заметил, что даже для моего RDD, если я установлю уровень хранения MEMORY_AND_DISK_SER_2, то также в хранилище пользовательского интерфейса приложения он показывает Memory Serialized 1x Replicated

Ответ №1:

Spark не будет автоматически распределять потоковые данные по кластеру, поскольку он имеет тенденцию в полной мере использовать локальность данных (для запуска задачи, в которой находятся ее данные, будет лучше, это конфигурация по умолчанию). Но вы можете использовать перераспределение для распределения потоковых данных и улучшения параллелизма. Вы можете обратиться к http://spark.apache.org/docs/1.3.0/streaming-programming-guide.html#performance-tuning для получения дополнительной информации.

Ответ №2:

Если вы не попали в кластер, и ваши задания выполняются только локально, это, скорее всего, означает, что для вашего Spark Master в вашем SparkConf установлен локальный URI, а не главный URI.

Комментарии:

1. Спасибо, Сэм, я повторно проверил spark conf, он не принимает локальный URI. любое другое предложение?

2. Пожалуйста, обновите ответ, включив в него строку кода, которая его задает. Работает ли это, когда вы используете spark-shell??? Если это так, то запустите оболочку spark и запустите sc.master , чтобы посмотреть, что это такое.

3. извините за поздний ответ, с Spark-shell я получил тот же результат.

4. @JigarParekh Обновите свой вопрос строкой кода, которая задает мастер, вставьте a println("sc.master = " sc.master) в свое приложение. Вы проверили в пользовательском интерфейсе, чтобы узнать, отключены ли рабочие??

5. обновленный код со строкой, которая устанавливает master, и проверил его с помощью print из контекста потока. я также проверил, что другие рабочие запущены и работают. они также отображаются в разделе исполнителей пользовательского интерфейса приложения

Ответ №3:

По умолчанию значением свойства spark.default.parallelism является «Локальный режим», поэтому все задачи будут выполняться в узле, получающем данные. Измените это свойство в файле spark-defaults.conf, чтобы повысить уровень параллелизма.

Комментарии:

1. Что вы имеете в виду? spark.defaukt.parallelism — это количество RDD, нет?