#scala #apache-spark
#scala #apache-spark
Вопрос:
Я должен сгенерировать 3000000 файлов в качестве выходных данных задания spark.
У меня есть два входных файла :
File 1 -> Size=3.3 Compressed, No.Of Records=13979835
File 2 -> Size=1.g Compressed, No.Of Records=6170229
Задание Spark выполняет следующее:
- чтение обоих этих файлов и объединение их на основе общего столбца1. -> DataFrame-A
- Результат группировки DataFrame-A на основе одного column2 -> DataFrame-B
- Из DataFrame-B используется array_join для агрегированного столбца и отделяет этот столбец символом ‘ n’. -> DataFrame-C
-
Результат записи раздела DataFrame-C по столбцу 2.
val DF1 = sparkSession.read.json("FILE1") // |ID |isHighway|isRamp|pvId |linkIdx|ffs |length | val DF12 = sparkSession.read.json("FILE2") // |lId |pid | val joinExpression = DF1.col("pvId") === DF2.col("lId") val DFA = DF.join(tpLinkDF, joinExpression, "inner").select(col("ID").as("SCAR"), col("lId"), col("length"), col("ffs"), col("ar"), col("pid")).orderBy("linkIdx") val DFB = DFA.select(col("SCAR"),concat_ws(",", col("lId"), col("length"),col("ffs"), col("ar"), col("pid")).as("links")).groupBy("SCAR").agg(collect_list("links").as("links")) val DFC = DFB.select(col("SCAR"), array_join(col("links"), "n").as("links")) DFC.write.format("com.databricks.spark.csv").option("quote", "u0000").partitionBy("SCAR").mode(SaveMode.Append).format("csv").save("/tmp")
Я должен сгенерировать 3000000 файлов в качестве выходных данных задания spark.
Комментарии:
1. Зачем это требуется? Проблема с небольшими файлами.
2. это своего рода требование, при котором другой системе необходимо прочитать эти небольшие файлы (не все, но по мере поступления запроса на этот файл каждый файл содержит некоторый идентификатор в имени файла, поэтому запрос с этим идентификатором для этого файла, полученный системой, должен прочитать этот файл) и выдать результат в режиме реального времени в течение 45 секунд.
3. Честно говоря, звучит как катастрофа.
Ответ №1:
После выполнения некоторого теста у меня появилась идея запустить это задание в пакетном режиме, например :
- начальный индекс запроса: 0, конечный индекс: 100000
- начальный индекс запроса: 100000, конечный индекс: 200000
- запрос startIdx: 200000, endIndex: 300000
и так далее …. до
- начальный индекс запроса: 2900000, конечный индекс: 3000000