Spark Scala — ошибки компиляции

#scala #apache-spark #sbt #apache-zeppelin

#scala #apache-spark #sbt #apache-zeppelin

Вопрос:

У меня есть скрипт в scala, когда я запускаю его в Zeppelin, он работает хорошо, но когда я пытаюсь скомпилировать с помощью sbt, он не работает. Я считаю, что это что-то связанное с версиями, но я не могу определить.

Эти три способа возвращают одну и ту же ошибку:

 val catMap = catDF.rdd.map((row: Row) => (row.getAs[String](1)->row.getAs[Integer](0))).collect.toMap
val catMap = catDF.select($"description", $"id".cast("int")).as[(String, Int)].collect.toMap
val catMap = catDF.rdd.map((row: Row) => (row.getAs[String](1)->row.getAs[Integer](0))).collectAsMap()
 

Возвращает ошибку: «значение rdd не является членом Unit»

 val bizCat = bizCatRDD.rdd.map(t => (t.getAs[String](0),catMap(t.getAs[String](1)))).toDF
 

Возвращает ошибку: «значение toDF не является членом org.apache.spark.rdd.RDD[U]»

Версия Scala: 2.12 Версия Sbt: 1.3.13

ОБНОВЛЕНИЕ: весь класс: импортер пакетов

 import org.apache.spark.sql.{Row, SaveMode, SparkSession}

import org.apache.spark.sql._

import org.apache.spark.sql.functions._
import udf.functions._

import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.Column

object BusinessImporter extends Importer{

    def importa(spark: SparkSession, inputDir: String): Unit = {
        
        import spark.implicits._
        val bizDF = spark.read.json(inputDir).cache

        // categories
        val explode_categories = bizDF.withColumn("categories", explode(split(col("categories"), ",")))
        val sort_categories = explode_categories.select(col("categories").as("description"))
                .distinct
                .coalesce(1)
                .orderBy(asc("categories"))
        // Create sequence column
        val windowSpec = Window.orderBy("description")
        val categories_with_sequence = sort_categories.withColumn("id",row_number.over(windowSpec))
        val categories = categories_with_sequence.select("id","description")

        val catDF = categories.write.insertInto("categories")

        // business categories
        //val catMap = catDF.rdd.map((row: Row) => (row.getAs[String](1)->row.getAs[Integer](0))).collect.toMap
        //val catMap = catDF.select($"description", $"id".cast("int")).as[(String, Int)].collect.toMap
        val catMap = catDF.rdd.map((row: Row) => (row.getAs[String](1)->row.getAs[Integer](0))).collectAsMap()
        val auxbizCatRDD = bizDF.withColumn("categories", explode(split(col("categories"), ",")))
        val bizCatRDD = auxbizCatRDD.select("business_id","categories")
        val bizCat = bizCatRDD.rdd.map(t => (t.getAs[String](0),catMap(t.getAs[String](1)))).toDF
        bizCat.write.insertInto("business_category")

        // Business
        val businessDF = bizDF.select("business_id","categories","city","address","latitude","longitude","name","is_open","review_count","stars","state")
        businessDF.coalesce(1).write.insertInto("business")

        // Hours
        val bizHoursDF = bizDF.select("business_id","hours.Sunday","hours.Monday","hours.Tuesday","hours.Wednesday","hours.Thursday","hours.Friday","hours.Saturday")

        val bizHoursDF_structs = bizHoursDF
            .withColumn("Sunday",struct(
            split(col("Sunday"),"-").getItem(0).as("Open"),
            split(col("Sunday"),"-").getItem(1).as("Close")))
            .withColumn("Monday",struct(
            split(col("Monday"),"-").getItem(0).as("Open"),
            split(col("Monday"),"-").getItem(1).as("Close")))
            .withColumn("Tuesday",struct(
            split(col("Tuesday"),"-").getItem(0).as("Open"),
            split(col("Tuesday"),"-").getItem(1).as("Close")))
            .withColumn("Wednesday",struct(
            split(col("Wednesday"),"-").getItem(0).as("Open"),
            split(col("Wednesday"),"-").getItem(1).as("Close")))
            .withColumn("Thursday",struct(
            split(col("Thursday"),"-").getItem(0).as("Open"),
            split(col("Thursday"),"-").getItem(1).as("Close")))
            .withColumn("Friday",struct(
            split(col("Friday"),"-").getItem(0).as("Open"),
            split(col("Friday"),"-").getItem(1).as("Close")))
            .withColumn("Saturday",struct(
            split(col("Saturday"),"-").getItem(0).as("Open"),
            split(col("Saturday"),"-").getItem(1).as("Close")))

        bizHoursDF_structs.coalesce(1).write.insertInto("business_hour")
        
    }

    def singleSpace(col: Column): Column = {
        trim(regexp_replace(col, "  ", " "))
    }
}
 

sbt-файл:

 name := "yelp-spark-processor"
version := "1.0"
scalaVersion := "2.12.12"

libraryDependencies  = "org.apache.spark" % "spark-core_2.12" % "3.0.1"
libraryDependencies  = "org.apache.spark" % "spark-sql_2.12"  % "3.0.1"
libraryDependencies  = "org.apache.spark" % "spark-hive_2.12" % "3.0.1"
 

Может кто-нибудь, пожалуйста, дать мне некоторые указания о том, что не так?

Большое спасибо Xavy

Комментарии:

1. Как вы создавали bizCatRDD ?

2. Как выглядит ваш файл сборки?

3. Привет всем, я обновил сообщение, чтобы показать весь класс и файл sbt. Большое спасибо Xavy

Ответ №1:

Проблема здесь в том, что в scala эта строка возвращает тип Unit:

 val catDF = categories.write.insertInto("categories")
 

Unit в scala похож на void в java, он возвращается функциями, которые не возвращают ничего значимого. Итак, в основном на данный момент catDF не является фреймом данных, и вы не можете рассматривать его как таковой. Поэтому вы, вероятно, захотите продолжать использовать categories вместо catDF в следующих строках.

Комментарии:

1. Большое спасибо. Отлично сработало и решило мою проблему. Я делаю первые шаги с Scala и Spark. Итак, иногда возникают основные вопросы / проблемы

2. Рад это слышать, рад помочь. Возможно, вам захочется изучить подходящие Scala IDE: s, такие как IntelliJ IDEA, они вам очень помогут с такого рода проблемами.