#scala #apache-spark #sbt #apache-zeppelin
#scala #apache-spark #sbt #apache-zeppelin
Вопрос:
У меня есть скрипт в scala, когда я запускаю его в Zeppelin, он работает хорошо, но когда я пытаюсь скомпилировать с помощью sbt, он не работает. Я считаю, что это что-то связанное с версиями, но я не могу определить.
Эти три способа возвращают одну и ту же ошибку:
val catMap = catDF.rdd.map((row: Row) => (row.getAs[String](1)->row.getAs[Integer](0))).collect.toMap
val catMap = catDF.select($"description", $"id".cast("int")).as[(String, Int)].collect.toMap
val catMap = catDF.rdd.map((row: Row) => (row.getAs[String](1)->row.getAs[Integer](0))).collectAsMap()
Возвращает ошибку: «значение rdd не является членом Unit»
val bizCat = bizCatRDD.rdd.map(t => (t.getAs[String](0),catMap(t.getAs[String](1)))).toDF
Возвращает ошибку: «значение toDF не является членом org.apache.spark.rdd.RDD[U]»
Версия Scala: 2.12 Версия Sbt: 1.3.13
ОБНОВЛЕНИЕ: весь класс: импортер пакетов
import org.apache.spark.sql.{Row, SaveMode, SparkSession}
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import udf.functions._
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.Column
object BusinessImporter extends Importer{
def importa(spark: SparkSession, inputDir: String): Unit = {
import spark.implicits._
val bizDF = spark.read.json(inputDir).cache
// categories
val explode_categories = bizDF.withColumn("categories", explode(split(col("categories"), ",")))
val sort_categories = explode_categories.select(col("categories").as("description"))
.distinct
.coalesce(1)
.orderBy(asc("categories"))
// Create sequence column
val windowSpec = Window.orderBy("description")
val categories_with_sequence = sort_categories.withColumn("id",row_number.over(windowSpec))
val categories = categories_with_sequence.select("id","description")
val catDF = categories.write.insertInto("categories")
// business categories
//val catMap = catDF.rdd.map((row: Row) => (row.getAs[String](1)->row.getAs[Integer](0))).collect.toMap
//val catMap = catDF.select($"description", $"id".cast("int")).as[(String, Int)].collect.toMap
val catMap = catDF.rdd.map((row: Row) => (row.getAs[String](1)->row.getAs[Integer](0))).collectAsMap()
val auxbizCatRDD = bizDF.withColumn("categories", explode(split(col("categories"), ",")))
val bizCatRDD = auxbizCatRDD.select("business_id","categories")
val bizCat = bizCatRDD.rdd.map(t => (t.getAs[String](0),catMap(t.getAs[String](1)))).toDF
bizCat.write.insertInto("business_category")
// Business
val businessDF = bizDF.select("business_id","categories","city","address","latitude","longitude","name","is_open","review_count","stars","state")
businessDF.coalesce(1).write.insertInto("business")
// Hours
val bizHoursDF = bizDF.select("business_id","hours.Sunday","hours.Monday","hours.Tuesday","hours.Wednesday","hours.Thursday","hours.Friday","hours.Saturday")
val bizHoursDF_structs = bizHoursDF
.withColumn("Sunday",struct(
split(col("Sunday"),"-").getItem(0).as("Open"),
split(col("Sunday"),"-").getItem(1).as("Close")))
.withColumn("Monday",struct(
split(col("Monday"),"-").getItem(0).as("Open"),
split(col("Monday"),"-").getItem(1).as("Close")))
.withColumn("Tuesday",struct(
split(col("Tuesday"),"-").getItem(0).as("Open"),
split(col("Tuesday"),"-").getItem(1).as("Close")))
.withColumn("Wednesday",struct(
split(col("Wednesday"),"-").getItem(0).as("Open"),
split(col("Wednesday"),"-").getItem(1).as("Close")))
.withColumn("Thursday",struct(
split(col("Thursday"),"-").getItem(0).as("Open"),
split(col("Thursday"),"-").getItem(1).as("Close")))
.withColumn("Friday",struct(
split(col("Friday"),"-").getItem(0).as("Open"),
split(col("Friday"),"-").getItem(1).as("Close")))
.withColumn("Saturday",struct(
split(col("Saturday"),"-").getItem(0).as("Open"),
split(col("Saturday"),"-").getItem(1).as("Close")))
bizHoursDF_structs.coalesce(1).write.insertInto("business_hour")
}
def singleSpace(col: Column): Column = {
trim(regexp_replace(col, " ", " "))
}
}
sbt-файл:
name := "yelp-spark-processor"
version := "1.0"
scalaVersion := "2.12.12"
libraryDependencies = "org.apache.spark" % "spark-core_2.12" % "3.0.1"
libraryDependencies = "org.apache.spark" % "spark-sql_2.12" % "3.0.1"
libraryDependencies = "org.apache.spark" % "spark-hive_2.12" % "3.0.1"
Может кто-нибудь, пожалуйста, дать мне некоторые указания о том, что не так?
Большое спасибо Xavy
Комментарии:
1. Как вы создавали
bizCatRDD
?2. Как выглядит ваш файл сборки?
3. Привет всем, я обновил сообщение, чтобы показать весь класс и файл sbt. Большое спасибо Xavy
Ответ №1:
Проблема здесь в том, что в scala эта строка возвращает тип Unit:
val catDF = categories.write.insertInto("categories")
Unit в scala похож на void в java, он возвращается функциями, которые не возвращают ничего значимого. Итак, в основном на данный момент catDF не является фреймом данных, и вы не можете рассматривать его как таковой. Поэтому вы, вероятно, захотите продолжать использовать categories
вместо catDF
в следующих строках.
Комментарии:
1. Большое спасибо. Отлично сработало и решило мою проблему. Я делаю первые шаги с Scala и Spark. Итак, иногда возникают основные вопросы / проблемы
2. Рад это слышать, рад помочь. Возможно, вам захочется изучить подходящие Scala IDE: s, такие как IntelliJ IDEA, они вам очень помогут с такого рода проблемами.