#scala #apache-spark #rdd
#scala #apache-spark #rdd
Вопрос:
Я новичок в Spark / Big data, и это полностью ставит меня в тупик.
У меня RDD около 1,7 миллиона строк в форме
SetA = (1,7,9,12,.....)
Все они являются целыми числами с одним значением в строке.
Мне нужно выполнить самосоединение в этом RDD, чтобы получить пары элементов, удовлетворяющих определенным условиям.
Например, вот так:
result = ((1, 7), (1, 9), (12, 1),......)
Предполагается, что в результате будет около 24 миллионов строк.
Я попытался выполнить декартово, за которым следуют несколько фильтров, например:
setA.cartesian(setA).filter(''do something).filter('do something more')
Это отлично работает для небольших наборов данных, и я получаю то, что требуется, но для огромного набора данных в 1,7 миллиона строк задание никогда не завершается даже после ожидания в течение нескольких часов. Целевое время завершения составляет около 30 минут.
Консоль продолжает постоянно отображать подобные строки (набор RDD кэшируется):
2021-12-02 08:36:17,758 INFO storage.BlockManager: Found block rdd_2_0 locally
2021-12-02 08:36:18,926 INFO storage.BlockManager: Found block rdd_2_0 locally
2021-12-02 08:36:19,892 INFO storage.BlockManager: Found block rdd_2_0 locally
2021-12-02 08:36:20,848 INFO storage.BlockManager: Found block rdd_2_0 locally
2021-12-02 08:36:21,666 INFO storage.BlockManager: Found block rdd_2_0 locally
2021-12-02 08:36:22,532 INFO storage.BlockManager: Found block rdd_2_0 locally
2021-12-02 08:36:23,495 INFO storage.BlockManager: Found block rdd_2_0 locally
2021-12-02 08:36:24,463 INFO storage.BlockManager: Found block rdd_2_0 locally
2021-12-02 08:36:25,438 INFO storage.BlockManager: Found block rdd_2_0 locally
2021-12-02 08:36:26,270 INFO storage.BlockManager: Found block rdd_2_0 locally
2021-12-02 08:36:27,153 INFO storage.BlockManager: Found block rdd_2_0 locally
2021-12-02 08:36:28,121 INFO storage.BlockManager: Found block rdd_2_0 locally
2021-12-02 08:36:29,121 INFO storage.BlockManager: Found block rdd_2_0 locally
2021-12-02 08:36:30,095 INFO storage.BlockManager: Found block rdd_2_0 locally
2021-12-02 08:36:31,209 INFO storage.BlockManager: Found block rdd_2_0 locally
2021-12-02 08:36:32,199 INFO storage.BlockManager: Found block rdd_2_0 locally
2021-12-02 08:36:33,033 INFO storage.BlockManager: Found block rdd_2_0 locally
Когда я просто считываю входные данные и записываю их в текстовый файл, задание завершается без проблем менее чем за минуту.
Я почти уверен, что мне здесь не хватает чего-то основного. Выполняется ли сначала декартово произведение, а затем применяются фильтры? Я думал, что преобразования RDD не оцениваются до последнего шага.
Есть ли лучший способ выполнить условное самосоединение без использования декартовой функции?
Комментарии:
1. Вероятно, это зависит от вашего условия соединения. Хитрость заключается в том, чтобы изобрести функцию ввода, которая имеет как можно меньше ложных срабатываний при соединении с помощью этого ключа. Чтобы в итоге не осталось миллиардов строк, которые потом нужно отфильтровывать.
2. Спасибо, не могли бы вы привести пример? Я имею в виду, что мой RDD не является парным RDD, у него нет ключа. Я выполняю фильтрацию с использованием данных из передаваемой переменной.
3. Было бы полезно опубликовать DAG, чтобы понять, как на самом деле выполняются эти фильтры. Немного не по теме: почему не
DataFrame
вместоRDD
? См. Перекрестное соединение