#scala #dataframe #rdd #case-class
#scala #фрейм данных #rdd #case-класс
Вопрос:
У меня есть текстовый файл со сложной структурированной строкой. Я использую customer converter, который преобразует заданную строку (строку) в класс Pojo (countryInfo). После преобразования я создаю DF. Класс POJO имеет поле, представляющее собой список пользовательского типа (GlobalizedPlayTimeWindows). Я создал структуру, которая соответствует этому GlobalizedPlayTimeWindows, и пытаюсь преобразовать существующий пользовательский тип в структуру, но продолжаю получать ошибку.
StructType я создал :
import org.apache.spark.sql.types._
val PlayTimeWindow =
StructType(
StructField("startTime", DateType, true) ::
StructField("endTime", DateType, true) :: Nil)
val globalizedPlayTimeWindows =
StructType(
StructField( "countries", ArrayType(StringType, true), true ) ::
StructField( "purchase", ArrayType(PlayTimeWindow, true), true ) ::
StructField( "rental", ArrayType(PlayTimeWindow, true), true ) ::
StructField( "free", ArrayType(PlayTimeWindow, true), true ) ::
StructField( "download", ArrayType(PlayTimeWindow, true), true ) ::
StructField( "advertisement", ArrayType(PlayTimeWindow, true), true ) ::
StructField( "playTypeIds", ArrayType(PlayTimeWindow, true), true ) ::
StructField( "benefitIds", MapType(StringType, ArrayType(PlayTimeWindow, true), true), true) :: Nil)
val schema = StructType(
StructField("id", StringType, true) ::
StructField("jazzCount", IntegerType, true) ::
StructField("rockCount", IntegerType, true) ::
StructField("classicCount", IntegerType, true) ::
StructField("nonclassicCount", IntegerType, true) ::
StructField("musicType", StringType, true) ::
StructField( "playType", ArrayType(globalizedPlayTimeWindows, true), true) :: Nil)
Создание фрейма данных :
val mappingFile = sc.textFile("s3://input.....")
val inputData = mappingFile.map(x=> {
val countryInfo = MappingUtils.getCountryInfo(x)
val id = countryInfo.getId
val musicType = if(countryInfo.getmusicType != null amp;amp; StringUtils.isNotBlank(countryInfo.getmusicType)) countryInfo.getmusicType else "UNKOWN_TYPE"
val classicWestern = if (countryInfo.getClassic() != null amp;amp; countryInfo.getClassic.size() > 0) true else false
var nonclassicCount : Int = 0
var classicCount : Int = 0
if (classicWestern) {
classicCount = 1
} else {
nonclassicCount = 1
}
val jazzrock = if (countryInfo.getmusicType() != null amp;amp; countryInfo.getmusicType != "JAZZ") true else false
var jazzCount : Int = 0
var rockCount : Int = 0
if (jazzrock) {
jazzCount = 1
} else {
rockCount = 1
}
val playType = if(countryInfo.getPlayTimeWindows != null amp;amp; countryInfo.getPlayTimeWindows.size > 0 ) { countryInfo.getPlayTimeWindows.asScala.toList } else null
(id, jazzCount, rockCount, classicCount, nonclassicCount, musicType ,playType)
}).map{case (id, jazzCount, rockCount, classicCount, nonclassicCount, musicType,playType) => Row(id, jazzCount, rockCount, classicCount, nonclassicCount, musicType,playType)
}.persist(DISK_ONLY)
val inputDataDF = sqlContext.createDataFrame(inputData, schema)
inputDataDF.printSchema :
root
|-- id: string (nullable = true)
|-- jazzCount: integer (nullable = true)
|-- rockCount: integer (nullable = true)
|-- classicCount: integer (nullable = true)
|-- nonclassicCount: integer (nullable = true)
|-- musicType: string (nullable = true)
|-- playType: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- countries: array (nullable = true)
| | | |-- element: string (containsNull = true)
| | |-- purchase: array (nullable = true)
| | | |-- element: struct (containsNull = true)
| | | | |-- startTime: date (nullable = true)
| | | | |-- endTime: date (nullable = true)
| | |-- rental: array (nullable = true)
| | | |-- element: struct (containsNull = true)
| | | | |-- startTime: date (nullable = true)
| | | | |-- endTime: date (nullable = true)
| | |-- free: array (nullable = true)
| | | |-- element: struct (containsNull = true)
| | | | |-- startTime: date (nullable = true)
| | | | |-- endTime: date (nullable = true)
| | |-- download: array (nullable = true)
| | | |-- element: struct (containsNull = true)
| | | | |-- startTime: date (nullable = true)
| | | | |-- endTime: date (nullable = true)
| | |-- advertisement: array (nullable = true)
| | | |-- element: struct (containsNull = true)
| | | | |-- startTime: date (nullable = true)
| | | | |-- endTime: date (nullable = true)
| | |-- playTypeIds: array (nullable = true)
| | | |-- element: struct (containsNull = true)
| | | | |-- startTime: date (nullable = true)
| | | | |-- endTime: date (nullable = true)
| | |-- benefitIds: map (nullable = true)
| | | |-- key: string
| | | |-- value: array (valueContainsNull = true)
| | | | |-- element: struct (containsNull = true)
| | | | | |-- startTime: date (nullable = true)
| | | | | |-- endTime: date (nullable = true)
Эквивалентный POJO структуры :
@Data
public GlobalizedPlayTimeWindows(
private final List<String> countries;
private final List<PlayTimeWindow> purchase;
private final List<PlayTimeWindow> rental;
private final List<PlayTimeWindow> free;
private final List<PlayTimeWindow> download;
private final List<PlayTimeWindow> advertisement;
private final List<PlayTimeWindow> preorderExclusive;
private final Map<String, List<PlayTimeWindow>> playTypeIds;
}
@Data
public class PlayTimeWindow {
private final Date startTime;
private final Date endTime;
}
Ошибка, которую я получаю :
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 12.0 failed 4 times, most recent failure: Lost task 0.3 in stage 12.0 (TID 393, ip-172-31-14-43.ec2.internal): scala.MatchError: GlobalizedPlayTimeWindows(countries=[US], purchase=null, rental=null, free=null, download=null, advertisement=null, preorderExclusive=null, playTypeIds=null) (of class com.model.global.GlobalizedPlayTimeWindows) at org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:255) at org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:250) at org.apache.spark.sql.catalyst.CatalystTypeConverters$CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:102) at org.apache.spark.sql.catalyst.CatalystTypeConverters$ArrayConverter$$anonfun$toCatalystImpl$2.apply(CatalystTypeConverters.scala:163) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at scala.collection.immutable.List.foreach(List.scala:318) at scala.collection.TraversableLike$class.map(TraversableLike.scala:244) at scala.collection.AbstractTraversable.map(Traversable.scala:105) at org.apache.spark.sql.catalyst.CatalystTypeConverters$ArrayConverter.toCatalystImpl(CatalystTypeConverters.scala:163) at org.apache.spark.sql.catalyst.CatalystTypeConverters$ArrayConverter.toCatalystImpl(CatalystTypeConverters.scala:153) at org.apache.spark.sql.catalyst.CatalystTypeConverters$CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:102) at org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:260) at org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:250) at org.apache.spark.sql.catalyst.CatalystTypeConverters$CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:102) at org.apache.spark.sql.catalyst.CatalystTypeConverters$$anonfun$createToCatalystConverter$2.apply(CatalystTypeConverters.scala:401) at org.apache.spark.sql.SQLContext$$anonfun$6.apply(SQLContext.scala:492) at org.apache.spark.sql.SQLContext$$anonfun$6.apply(SQLContext.scala:492) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) at scala.collection.Iterator$$anon$10.next(Iterator.scala:312) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) at scala.collection.AbstractIterator.to(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) at org.apache.spark.sql.execution.SparkPlan$$anonfun$5.apply(SparkPlan.scala:212) at org.apache.spark.sql.execution.SparkPlan$$anonfun$5.apply(SparkPlan.scala:212) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1858) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) at org.apache.spark.scheduler.Task.run(Task.scala:89) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1431) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1419) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1418) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1418) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:799) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:799) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1640) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1832) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1845) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1858) at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:212) at org.apache.spark.sql.execution.Limit.executeCollect(basicOperators.scala:165) at org.apache.spark.sql.execution.SparkPlan.executeCollectPublic(SparkPlan.scala:174) at org.apache.spark.sql.DataFrame$$anonfun$org$apache$spark$sql$DataFrame$$execute$1$1.apply(DataFrame.scala:1538) at org.apache.spark.sql.DataFrame$$anonfun$org$apache$spark$sql$DataFrame$$execute$1$1.apply(DataFrame.scala:1538) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:56) at org.apache.spark.sql.DataFrame.withNewExecutionId(DataFrame.scala:2125) at org.apache.spark.sql.DataFrame.org$apache$spark$sql$DataFrame$$execute$1(DataFrame.scala:1537) at org.apache.spark.sql.DataFrame.org$apache$spark$sql$DataFrame$$collect(DataFrame.scala:1544) at org.apache.spark.sql.DataFrame$$anonfun$head$1.apply(DataFrame.scala:1414) at org.apache.spark.sql.DataFrame$$anonfun$head$1.apply(DataFrame.scala:1413) at org.apache.spark.sql.DataFrame.withCallback(DataFrame.scala:2138) at org.apache.spark.sql.DataFrame.head(DataFrame.scala:1413) at org.apache.spark.sql.DataFrame.take(DataFrame.scala:1495) at org.apache.spark.sql.DataFrame.showString(DataFrame.scala:171) at org.apache.spark.sql.DataFrame.show(DataFrame.scala:394) at org.apache.spark.sql.DataFrame.show(DataFrame.scala:355) at org.apache.spark.sql.DataFrame.show(DataFrame.scala:363) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$$$$$c57ec8bf9b0d5f6161b97741d596ff0$$$$wC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:163) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$$$$$c57ec8bf9b0d5f6161b97741d596ff0$$$$wC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:168) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$$$$$c57ec8bf9b0d5f6161b97741d596ff0$$$$wC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:170) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$$$$$c57ec8bf9b0d5f6161b97741d596ff0$$$$wC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:172) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$$$$$c57ec8bf9b0d5f6161b97741d596ff0$$$$wC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:174) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$$$$$c57ec8bf9b0d5f6161b97741d596ff0$$$$wC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:176) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$$$$$c57ec8bf9b0d5f6161b97741d596ff0$$$$wC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:178) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:180) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:182) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:184) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:186) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:188) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:190) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:192) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:194) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:196) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:198) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:200) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:202) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:204) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:206) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:208) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:210) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:212) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:214) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:216) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:218) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:220) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:222) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:224) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:226) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:228) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:230) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:232) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:234) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:236) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:238) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:240) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:242) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:244) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:246) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:248) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:250) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:252) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:254) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:256) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:258) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:260) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:262) at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:264) at $iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:266) at $iwC$$iwC$$iwC$$iwC.<init>(<console>:268) at $iwC$$iwC$$iwC.<init>(<console>:270) at $iwC$$iwC.<init>(<console>:272) at $iwC.<init>(<console>:274) at <init>(<console>:276) at .<init>(<console>:280) at .<clinit>(<console>) at .<init>(<console>:7) at .<clinit>(<console>) at $print(<console>) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:483) at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065) at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1346) at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840) at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871) at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819) at org.apache.zeppelin.spark.SparkInterpreter.interpretInput(SparkInterpreter.java:664) at org.apache.zeppelin.spark.SparkInterpreter.interpret(SparkInterpreter.java:629) at org.apache.zeppelin.spark.SparkInterpreter.interpret(SparkInterpreter.java:622) at org.apache.zeppelin.interpreter.ClassloaderInterpreter.interpret(ClassloaderInterpreter.java:57) at org.apache.zeppelin.interpreter.LazyOpenInterpreter.interpret(LazyOpenInterpreter.java:93) at org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer$InterpretJob.jobRun(RemoteInterpreterServer.java:276) at org.apache.zeppelin.scheduler.Job.run(Job.java:170) at org.apache.zeppelin.scheduler.FIFOScheduler$1.run(FIFOScheduler.java:118) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745)
Также пытался выполнить неявный toDF из inputData :
inputData.toDF.printSchema, но выдает ошибку :
java.lang.UnsupportedOperationException: Schema for type com.model.global.GlobalizedPlayTimeWindows is not supported at org.apache.spark.sql.catalyst.ScalaReflection$class.schemaFor(ScalaReflection.scala:718) at org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:30) at org.apache.spark.sql.catalyst.ScalaReflection$class.schemaFor(ScalaReflection.scala:667) at org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:30) at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$schemaFor$1.apply(ScalaReflection.scala:693) at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$schemaFor$1.apply(ScalaReflection.scala:691) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244) at
Комментарии:
1. Сложно следить за типами в вашем коде — нет явных аннотаций типов, а код для
countryInfo
отсутствует… Попробуйте вызватьinputData.toDF().printSchema()
(после импортаsqlContext.implicits._
), чтобы увидеть фактическую схему, которую создает ваш код, и найдите разницу между ней и ожидаемой схемой.2. Я вижу схему с помощью printSchema, я получаю эту ошибку при попытке выполнить show()
3. Схема, которую вы создаете сами, вероятно, не соответствует фактической схеме — я предполагал, что вы НЕ используете свою собственную схему, а позволяете Spark вывести ее (что можно сделать с помощью
toDF
), а затем сравнить две схемы.4. @TzachZohar Да, просто попробовал, как вы советовали (inputData.toDF().printSchema()), но я получаю: java.lang. Исключение UnsupportedOperationException: схема для типа com.model. GlobalizedPlayTimeWindows не поддерживается в org.apache.spark.sql.catalyst. ScalaReflection$class.schemaFor(ScalaReflection.scala:718) в org.apache.spark.sql.catalyst. ScalaReflection$.schemaFor(ScalaReflection.scala:30)
5. Правильно — значит, это сработало — вы только что нашли корень проблемы! насколько я знаю, Spark может анализировать только примитивы, коллекции и Scala
Product
-ы, которые в основном являются классами case и кортежами . Он не может анализировать только любой произвольный класс Java. ЕслиcountryInfo.getPlayTimeWindows
возвращает списокGlobalizedPlayTimeWindows
— Spark не сможет проанализировать его в фрейм данных. Возможно, попробуйте преобразовать ваши Java-POJO в классы Scala case.
Ответ №1:
ХОРОШО — чтобы сократить долгое обсуждение, вот рабочее решение. В основном у вас были две отдельные проблемы:
-
Вы ожидали, что Spark сможет анализировать произвольный класс Java в DataFrame — это не так, Spark может анализировать только определенные типы, к которым обычно относятся: коллекции Scala; Примитивы;
java.sql.Date
; и любой подклассscala.Product
— все классы case и кортежи, например. Итак, как обсуждалось в комментариях, первое, что нужно сделать, это преобразовать существующую структуру в такие типы. -
Ваш
schema
также не соответствовал вашему классу Java — было несколько отличий:- Схема
playType
была массивомGlobalizedPlayTimeWindows
, в то время как ваш код создавал один элемент, а не массив globalizedPlayTimeWindows
содержащаяся схемаbenefitIds
, которая не существует в классе JavaplayTypeIds
схема была массивом, в то время как поле с тем же именем в классе Java былоMap
- Схема
Итак, я исправил все это (изменил схему в соответствии с данными, вы можете исправить их по-разному, если они совпадают) и завершил преобразование классов Java в классы case:
// corrected schemas:
val PlayTimeWindow =
StructType(
StructField("startTime", DateType, true) ::
StructField("endTime", DateType, true) :: Nil)
val globalizedPlayTimeWindows =
StructType(
StructField( "countries", ArrayType(StringType, true), true ) ::
StructField( "purchase", ArrayType(PlayTimeWindow, true), true ) ::
StructField( "rental", ArrayType(PlayTimeWindow, true), true ) ::
StructField( "free", ArrayType(PlayTimeWindow, true), true ) ::
StructField( "download", ArrayType(PlayTimeWindow, true), true ) ::
StructField( "advertisement", ArrayType(PlayTimeWindow, true), true ) ::
StructField( "preorderExclusive", ArrayType(PlayTimeWindow, true), true ) ::
StructField( "playTypeIds", MapType(StringType, ArrayType(PlayTimeWindow, true), true), true ) ::
Nil)
val schema = StructType(
StructField("id", StringType, true) ::
StructField("jazzCount", IntegerType, true) ::
StructField("rockCount", IntegerType, true) ::
StructField("classicCount", IntegerType, true) ::
StructField("nonclassicCount", IntegerType, true) ::
StructField("musicType", StringType, true) ::
StructField( "playType", globalizedPlayTimeWindows, true) :: Nil)
// note the use of java.sql.Date, java.util.Date not supported
case class PlayTimeWindowScala(startTime: java.sql.Date, endTime: java.sql.Date)
case class GlobalizedPlayTimeWindowsScala (countries: List[String],
purchase: List[PlayTimeWindowScala],
rental: List[PlayTimeWindowScala],
free: List[PlayTimeWindowScala],
download: List[PlayTimeWindowScala],
advertisement: List[PlayTimeWindowScala],
preorderExclusive: List[PlayTimeWindowScala],
playTypeIds: Map[String, List[PlayTimeWindowScala]])
// some conversion methods:
def toSqlDate(jDate: java.util.Date): java.sql.Date = new java.sql.Date(jDate.getTime)
import scala.collection.JavaConverters._
def toScalaWindowList(l: java.util.List[PlayTimeWindow]): List[PlayTimeWindowScala] = {
l.asScala.map(javaWindow => PlayTimeWindowScala(toSqlDate(javaWindow.startTime), toSqlDate(javaWindow.endTime))).toList
}
def toScalaGlobalizedWindows(javaObj: GlobalizedPlayTimeWindows): GlobalizedPlayTimeWindowsScala = {
GlobalizedPlayTimeWindowsScala(
javaObj.countries.asScala.toList,
toScalaWindowList(javaObj.purchase),
toScalaWindowList(javaObj.rental),
toScalaWindowList(javaObj.free),
toScalaWindowList(javaObj.download),
toScalaWindowList(javaObj.advertisement),
toScalaWindowList(javaObj.preorderExclusive),
javaObj.playTypeIds.asScala.mapValues(toScalaWindowList).toMap
)
}
val parsedJavaData: RDD[(String, Int, Int, Int, Int, String, GlobalizedPlayTimeWindows)] = mappingFile.map(x => {
// your code producing the tuple
})
// convert to Scala objects and into a Row:
val inputData = parsedJavaData.map{
case (id, jazzCount, rockCount, classicCount, nonclassicCount, musicType, javaPlayType) =>
val scalaPlayType = toScalaGlobalizedWindows(javaPlayType)
Row(id, jazzCount, rockCount, classicCount, nonclassicCount, musicType, scalaPlayType)
}
// now - this works
val inputDataDF = sqlContext.createDataFrame(inputData, schema)
Комментарии:
1. Большое спасибо за краткое объяснение.