Простой ввод текстового файла, все строки / записи идут к одному исполнителю

#python #apache-spark #pyspark #streaming

#python #apache-spark #pyspark #потоковая передача

Вопрос:

Я использую python с потоковой передачей spark, и идея проста: сделайте streaming monitor определенным каталогом, и как только появится новый текстовый файл с несколькими существующими строками, он будет обработан.

Дело в том, что в каждом из текстовых файлов будет очень мало строк, но обработка каждой строки займет много времени. Поэтому я хочу, чтобы строки отправлялись разным исполнителям, чтобы их можно было обрабатывать параллельно. Проблема в том, что все они отправляются одному исполнителю (или двум)…

Код выглядит следующим образом:

 lines = stream_context.textFileStream(monitor_dir).repartition(4)
lines.foreachRDD(process_stream)

def process_stream(time, rdd):
    print('rdd partitions: {}'.format(rdd.getNumPartitions()))
    rdd.map(lambda line: parse_each_line(line, other_params)).count()
  

Я получаю 4 исполнителя, поэтому я перераспределил их как 4, и я запускаю его в реальном кластере в режиме кластера, и когда он печатается в методе «process_stream», rdd.getnump Partitions() == 4.

Итак, не уверен, в чем проблема, вызвана ли она настройками размера «Разделение ввода», которые находятся в файловой системе Hadoop под капотом?

Ответ №1:

При .repartition(4) каждом RDD будут обрабатываться 4 разные задачи. Но со стороны пользователя нет такого детализированного элемента управления, который заставлял бы выполнять каждую задачу на другом исполнителе.

Я предполагаю, что для тестирования вы могли бы установить spark.task.cpus значение spark.executor.cores (заставляя каждую задачу захватывать все ядра процессора, назначенные исполнителю). Но это не то, что вы хотите делать в реальном проекте.

Комментарии:

1. Спасибо, фача! Да, при тестировании таким образом строки фактически отправлялись разным исполнителям, это было просто потому, что раньше у меня было так мало строк (<10), и все они были отправлены в 2. После того, как я поместил 1000 строк для тестирования, я вижу, что они были отправлены справедливо даже исполнителям.