выберите только часть входных данных в Spark Scala

#scala #apache-spark

#scala #apache-spark

Вопрос:

Я читаю файл из HDFS и хочу сохранить его в другом репозитории со структурой фрейма данных. Пример моих данных:

05-09-2020 22:10:10, jony, abcd, usr.admin.local.teste

Я хочу отправить эту структуру данных в другой репозиторий:

05-09-2020 22:10:10, jony, abcd, teste

Когда я создаю этот код в spark scala, все в порядке, и все работает:

 val read = sc.textFile("hdfs://.../teste.csv") 
val select = read.map(_.split(",")).map{x => (x(0),x(1),x(2))} 
val names = Seq("date","name","id") 
val df = select.toDF(names: _*)
  

Но когда я выполняю эту функцию, просто чтобы получить «test» в последнем аргументе, она выдает ошибку

 val read = sc.textFile("hdfs://.../teste.csv")
val select = linesConsumer.map(_.split(",")).map{x => (x(0),x(1),x(2),x(3).split(",")(3).replace(".", ","))}
val names = Seq("date","name","id","teste")
val df = select.toDF(names: _*)
  

Ошибка, которую он выдает, заключается в следующем:

 ERROR Executor: Exception in task 0.0 in stage 28.0 (TID 160) java.lang.ArrayIndexOutOfBoundsException: 4
    at $line145.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$2.apply(<console>:25)
    at $line145.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$2.apply(<console>:25)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:255)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:121)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748) 20/09/05 21:57:22 WARN TaskSetManager: Lost task 0.0 in stage 28.0 (TID 160, localhost, executor driver): java.lang.ArrayIndexOutOfBoundsException: 4
    at $line145.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$2.apply(<console>:25)
    at $line145.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$2.apply(<console>:25)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:410) ...
  

Кто-нибудь знает, что я делаю не так?

Комментарии:

1. В строке x(3).split(«,») вы пытаетесь разделить данные в формате «usr.admin.local.teste», верно? Если да, то почему вы разделяете на «,» а не на «.»?

2. Я хочу получить аргументы, разделенные запятыми, но в последнем аргументе я хочу получить последний аргумент, разделенный точкой. Я попытался сделать это: val all = read.map(_.split(",")).map{x => (x(3).split(".")(3))} и это не сработало, выдавая ту же ошибку

Ответ №1:

Ваша ошибка произошла из-за того, что вы выполнили свой replace после вашего split , когда вы, вероятно, намеревались сделать это раньше. В результате возникает вопрос: почему бы просто не разделить на '.' напрямую? Этот подход повышает устойчивость, выполняя несколько простых проверок длины и, по существу, пропуская строки, которые не могут быть проанализированы:

 // return a list of one tuple for each successfully parsed line
val getRow = (s:String) => {
  val a = s.split(", *")
  if (a.length == 4) {
    val lastList = a(3).split('.')
    val last = if (lastList.length > 0) { lastList(lastList.length - 1) } else ""
    List((a(0), a(1), a(2), last))
  } else List() } 
val df = sc.textFile(csv_path).flatMap(getRow).toDF("date", "name", "id", "teste")
  

Предполагая, что этот ввод:

 05-09-2020 22:10:10, jony, abcd, usr.admin.local.teste
05-09-2020 12:10:10, vas, saga, usr.admin.local.champ
05-09-2020 20:10:10, nema, abd, usr.admin.local.mora
  

Это было бы df.show(false) :

  ------------------- ---- ---- ----- 
|date               |name|id  |teste|
 ------------------- ---- ---- ----- 
|05-09-2020 22:10:10|jony|abcd|teste|
|05-09-2020 12:10:10|vas |saga|champ|
|05-09-2020 20:10:10|nema|abd |mora |
 ------------------- ---- ---- ----- 
  

Поскольку дата по-прежнему является просто строкой, вам может потребоваться преобразовать ее, в зависимости от того, что вы будете с ней делать.

Отредактируйте дополнительный вопрос в комментарии

Чтобы опустить строку после '@' , если такой символ существует в строке, введите новое значение lastBefore и используйте String методы indexOf и substring :

 val getRow = (s:String) => {
  val a = s.split(", *")
  if (a.length == 4) {
    val lastList = a(3).split('.')
    val last = if (lastList.length > 0) lastList(lastList.length - 1) else ""
    val lastBefore = if (last.indexOf('@') >= 0) last.substring(0, last.indexOf('@')) else last
    List((a(0), a(1), a(2), lastBefore))
  } else List() } 
  

Комментарии:

1. Если в моем последнем аргументе есть еще один разделитель, как я могу это сделать? Если мой ввод 05-09-2020 22:10:10, jony, abcd, usr.admin.local.teste@123 и я все еще хочу просто получить это 05-09-2020 22:10:10, jony, abcd,teste и забыть все, что находится перед @, как я могу это сделать? Не могли бы вы отредактировать свой ответ на эту новую проблему?

2. Пожалуйста, посмотрите «Редактировать» (кроме того, постарайтесь в следующий раз задать вопрос как можно полнее, чтобы избежать правок).

Ответ №2:

Вы разбиваете строку на запятую, а затем пытаетесь разделить x (3) на запятую; x (3) не будет содержать никаких запятых из-за первого разделения, поэтому даст ArrayIndexOutOfBounds для всех индексов, кроме 0.

Комментарии:

1. Я хотел разделить 3-й аргумент по точкам, чтобы получить последнее значение, но я не могу этого сделать

2. Это не то, что ваш код делает выше, вы разделяете третий аргумент запятой, так что, вероятно, это ваша проблема!