#python #apache-spark #pyspark #streaming
#python #apache-spark #pyspark #потоковая передача
Вопрос:
Я использую python с потоковой передачей spark, и идея проста: сделайте streaming monitor определенным каталогом, и как только появится новый текстовый файл с несколькими существующими строками, он будет обработан.
Дело в том, что в каждом из текстовых файлов будет очень мало строк, но обработка каждой строки займет много времени. Поэтому я хочу, чтобы строки отправлялись разным исполнителям, чтобы их можно было обрабатывать параллельно. Проблема в том, что все они отправляются одному исполнителю (или двум)…
Код выглядит следующим образом:
lines = stream_context.textFileStream(monitor_dir).repartition(4)
lines.foreachRDD(process_stream)
def process_stream(time, rdd):
print('rdd partitions: {}'.format(rdd.getNumPartitions()))
rdd.map(lambda line: parse_each_line(line, other_params)).count()
Я получаю 4 исполнителя, поэтому я перераспределил их как 4, и я запускаю его в реальном кластере в режиме кластера, и когда он печатается в методе «process_stream», rdd.getnump Partitions() == 4.
Итак, не уверен, в чем проблема, вызвана ли она настройками размера «Разделение ввода», которые находятся в файловой системе Hadoop под капотом?
Ответ №1:
При .repartition(4)
каждом RDD будут обрабатываться 4 разные задачи. Но со стороны пользователя нет такого детализированного элемента управления, который заставлял бы выполнять каждую задачу на другом исполнителе.
Я предполагаю, что для тестирования вы могли бы установить spark.task.cpus
значение spark.executor.cores
(заставляя каждую задачу захватывать все ядра процессора, назначенные исполнителю). Но это не то, что вы хотите делать в реальном проекте.
Комментарии:
1. Спасибо, фача! Да, при тестировании таким образом строки фактически отправлялись разным исполнителям, это было просто потому, что раньше у меня было так мало строк (<10), и все они были отправлены в 2. После того, как я поместил 1000 строк для тестирования, я вижу, что они были отправлены справедливо даже исполнителям.