#scala #apache-spark
#scala #apache-spark
Вопрос:
Я читаю файл из HDFS и хочу сохранить его в другом репозитории со структурой фрейма данных. Пример моих данных:
05-09-2020 22:10:10, jony, abcd, usr.admin.local.teste
Я хочу отправить эту структуру данных в другой репозиторий:
05-09-2020 22:10:10, jony, abcd, teste
Когда я создаю этот код в spark scala, все в порядке, и все работает:
val read = sc.textFile("hdfs://.../teste.csv")
val select = read.map(_.split(",")).map{x => (x(0),x(1),x(2))}
val names = Seq("date","name","id")
val df = select.toDF(names: _*)
Но когда я выполняю эту функцию, просто чтобы получить «test» в последнем аргументе, она выдает ошибку
val read = sc.textFile("hdfs://.../teste.csv")
val select = linesConsumer.map(_.split(",")).map{x => (x(0),x(1),x(2),x(3).split(",")(3).replace(".", ","))}
val names = Seq("date","name","id","teste")
val df = select.toDF(names: _*)
Ошибка, которую он выдает, заключается в следующем:
ERROR Executor: Exception in task 0.0 in stage 28.0 (TID 160) java.lang.ArrayIndexOutOfBoundsException: 4 at $line145.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$2.apply(<console>:25) at $line145.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$2.apply(<console>:25) at scala.collection.Iterator$$anon$11.next(Iterator.scala:410) at scala.collection.Iterator$$anon$11.next(Iterator.scala:410) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636) at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:255) at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:121) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) 20/09/05 21:57:22 WARN TaskSetManager: Lost task 0.0 in stage 28.0 (TID 160, localhost, executor driver): java.lang.ArrayIndexOutOfBoundsException: 4 at $line145.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$2.apply(<console>:25) at $line145.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$2.apply(<console>:25) at scala.collection.Iterator$$anon$11.next(Iterator.scala:410) at scala.collection.Iterator$$anon$11.next(Iterator.scala:410) ...
Кто-нибудь знает, что я делаю не так?
Комментарии:
1. В строке x(3).split(«,») вы пытаетесь разделить данные в формате «usr.admin.local.teste», верно? Если да, то почему вы разделяете на «,» а не на «.»?
2. Я хочу получить аргументы, разделенные запятыми, но в последнем аргументе я хочу получить последний аргумент, разделенный точкой. Я попытался сделать это:
val all = read.map(_.split(",")).map{x => (x(3).split(".")(3))}
и это не сработало, выдавая ту же ошибку
Ответ №1:
Ваша ошибка произошла из-за того, что вы выполнили свой replace
после вашего split
, когда вы, вероятно, намеревались сделать это раньше. В результате возникает вопрос: почему бы просто не разделить на '.'
напрямую? Этот подход повышает устойчивость, выполняя несколько простых проверок длины и, по существу, пропуская строки, которые не могут быть проанализированы:
// return a list of one tuple for each successfully parsed line
val getRow = (s:String) => {
val a = s.split(", *")
if (a.length == 4) {
val lastList = a(3).split('.')
val last = if (lastList.length > 0) { lastList(lastList.length - 1) } else ""
List((a(0), a(1), a(2), last))
} else List() }
val df = sc.textFile(csv_path).flatMap(getRow).toDF("date", "name", "id", "teste")
Предполагая, что этот ввод:
05-09-2020 22:10:10, jony, abcd, usr.admin.local.teste
05-09-2020 12:10:10, vas, saga, usr.admin.local.champ
05-09-2020 20:10:10, nema, abd, usr.admin.local.mora
Это было бы df.show(false)
:
------------------- ---- ---- -----
|date |name|id |teste|
------------------- ---- ---- -----
|05-09-2020 22:10:10|jony|abcd|teste|
|05-09-2020 12:10:10|vas |saga|champ|
|05-09-2020 20:10:10|nema|abd |mora |
------------------- ---- ---- -----
Поскольку дата по-прежнему является просто строкой, вам может потребоваться преобразовать ее, в зависимости от того, что вы будете с ней делать.
Отредактируйте дополнительный вопрос в комментарии
Чтобы опустить строку после '@'
, если такой символ существует в строке, введите новое значение lastBefore
и используйте String
методы indexOf
и substring
:
val getRow = (s:String) => {
val a = s.split(", *")
if (a.length == 4) {
val lastList = a(3).split('.')
val last = if (lastList.length > 0) lastList(lastList.length - 1) else ""
val lastBefore = if (last.indexOf('@') >= 0) last.substring(0, last.indexOf('@')) else last
List((a(0), a(1), a(2), lastBefore))
} else List() }
Комментарии:
1. Если в моем последнем аргументе есть еще один разделитель, как я могу это сделать? Если мой ввод
05-09-2020 22:10:10, jony, abcd, usr.admin.local.teste@123
и я все еще хочу просто получить это05-09-2020 22:10:10, jony, abcd,teste
и забыть все, что находится перед @, как я могу это сделать? Не могли бы вы отредактировать свой ответ на эту новую проблему?2. Пожалуйста, посмотрите «Редактировать» (кроме того, постарайтесь в следующий раз задать вопрос как можно полнее, чтобы избежать правок).
Ответ №2:
Вы разбиваете строку на запятую, а затем пытаетесь разделить x (3) на запятую; x (3) не будет содержать никаких запятых из-за первого разделения, поэтому даст ArrayIndexOutOfBounds для всех индексов, кроме 0.
Комментарии:
1. Я хотел разделить 3-й аргумент по точкам, чтобы получить последнее значение, но я не могу этого сделать
2. Это не то, что ваш код делает выше, вы разделяете третий аргумент запятой, так что, вероятно, это ваша проблема!