#python-3.x #apache-spark #pyspark #rdd
Вопрос:
Я не могу показать фрейм данных, в котором при создании DF из существующего RDD путем предоставления требуемой схемы при выполнении кода PySpark возникло следующее исключение: этот код выполняется на платформе сообщества Databricks. пожалуйста, помогите с решением, чтобы отобразить фрейм данных в качестве вывода.
код:
from pyspark import SparkConf, SparkContext conf = SparkConf().setAppName('RDD2DF') sc = SparkContext.getOrCreate(conf=conf) rdd=sc.textFile('/FileStore/tables/StudentData.csv') header = rdd.first() rdd2df= rdd.filter(lambda x: x!=header).map(lambda x: x.split(',')) from pyspark.sql.types import StructType, StructField, StringType, IntegerType scehma = StructType([ StructField('age', StringType(),True), StructField('gender',StringType(),True), StructField('name',StringType(),True), StructField('course',StringType(),True), StructField('roll',IntegerType(),True), StructField('marks',IntegerType(),True), StructField('email',StringType(),True) ]) columns = header.split(',') rdf = spark.createDataFrame(rdd2df, schema) rdf.printSchema() rdf.show()
Ошибка:
--------------------------------------------------------------------------- Py4JJavaError Traceback (most recent call last) lt;command-1654966391766461gt; in lt;modulegt; 22 rdf = spark.createDataFrame(rdd2df, schema) 23 rdf.printSchema() ---gt; 24 rdf.show() /databricks/spark/python/pyspark/sql/dataframe.py in show(self, n, truncate, vertical) 488 """ 489 if isinstance(truncate, bool) and truncate: --gt; 490 print(self._jdf.showString(n, 20, vertical)) 491 else: 492 print(self._jdf.showString(n, int(truncate), vertical)) /databricks/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py in __call__(self, *args) 1302 1303 answer = self.gateway_client.send_command(command) -gt; 1304 return_value = get_return_value( 1305 answer, self.gateway_client, self.target_id, self.name) 1306 /databricks/spark/python/pyspark/sql/utils.py in deco(*a, **kw) 115 def deco(*a, **kw): 116 try: --gt; 117 return f(*a, **kw) 118 except py4j.protocol.Py4JJavaError as e: 119 converted = convert_exception(e.java_exception) /databricks/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name) 324 value = OUTPUT_CONVERTER[type](answer[2:], gateway_client) 325 if answer[1] == REFERENCE_TYPE: --gt; 326 raise Py4JJavaError( 327 "An error occurred while calling {0}{1}{2}.n". 328 format(target_id, ".", name), value) Py4JJavaError: An error occurred while calling o5174.showString. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 50.0 failed 1 times, most recent failure: Lost task 0.0 in stage 50.0 (TID 53) (ip-10-172-163-248.us-west-2.compute.internal executor driver): org.apache.spark.api.python.PythonException: 'TypeError: field age: IntegerType can not accept object '28' in type lt;class 'str'gt;'. Full traceback below: Traceback (most recent call last): File "/databricks/spark/python/pyspark/worker.py", line 713, in main process() File "/databricks/spark/python/pyspark/worker.py", line 705, in process serializer.dump_stream(out_iter, outfile) File "/databricks/spark/python/pyspark/serializers.py", line 267, in dump_stream vs = list(itertools.islice(iterator, batch)) File "/databricks/spark/python/pyspark/util.py", line 72, in wrapper return f(*args, **kwargs) File "/databricks/spark/python/pyspark/sql/session.py", line 729, in prepare verify_func(obj) File "/databricks/spark/python/pyspark/sql/types.py", line 1409, in verify verify_value(obj) File "/databricks/spark/python/pyspark/sql/types.py", line 1390, in verify_struct verifier(v) File "/databricks/spark/python/pyspark/sql/types.py", line 1409, in verify verify_value(obj) File "/databricks/spark/python/pyspark/sql/types.py", line 1329, in verify_integer verify_acceptable_types(obj) File "/databricks/spark/python/pyspark/sql/types.py", line 1291, in verify_acceptable_types raise TypeError(new_msg("%s can not accept object %r in type %s" TypeError: field age: IntegerType can not accept object '28' in type lt;class 'str'gt; at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:661) at org.apache.spark.api.python.PythonRunner$anon$3.read(PythonRunner.scala:813) at org.apache.spark.api.python.PythonRunner$anon$3.read(PythonRunner.scala:795) at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:614) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) at scala.collection.Iterator$anon$11.hasNext(Iterator.scala:489) at scala.collection.Iterator$anon$10.hasNext(Iterator.scala:458) at scala.collection.Iterator$anon$10.hasNext(Iterator.scala:458) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$anon$1.hasNext(WholeStageCodegenExec.scala:757) at org.apache.spark.sql.execution.collect.UnsafeRowBatchUtils$.encodeUnsafeRows(UnsafeRowBatchUtils.scala:80) at org.apache.spark.sql.execution.collect.Collector.$anonfun$processFunc$1(Collector.scala:178) at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$3(ResultTask.scala:75) at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110) at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$1(ResultTask.scala:75) at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:55) at org.apache.spark.scheduler.Task.doRunTask(Task.scala:150) at org.apache.spark.scheduler.Task.$anonfun$run$1(Task.scala:119) at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110) at org.apache.spark.scheduler.Task.run(Task.scala:91) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$13(Executor.scala:812) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1643) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:815) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:671) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2765) at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2712) at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2706) at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2706) at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1255) at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1255) at scala.Option.foreach(Option.scala:407) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1255) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2973) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2914) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2902) at org.apache.spark.util.EventLoop$anon$1.run(EventLoop.scala:49) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:1028) at org.apache.spark.SparkContext.runJobInternal(SparkContext.scala:2446) at org.apache.spark.sql.execution.collect.Collector.runSparkJobs(Collector.scala:289) at org.apache.spark.sql.execution.collect.Collector.collect(Collector.scala:299) at org.apache.spark.sql.execution.collect.Collector$.collect(Collector.scala:82) at org.apache.spark.sql.execution.collect.Collector$.collect(Collector.scala:88) at org.apache.spark.sql.execution.collect.InternalRowFormat$.collect(cachedSparkResults.scala:75) at org.apache.spark.sql.execution.collect.InternalRowFormat$.collect(cachedSparkResults.scala:62) at org.apache.spark.sql.execution.ResultCacheManager.$anonfun$getOrComputeResultInternal$1(ResultCacheManager.scala:512) at scala.Option.getOrElse(Option.scala:189) at org.apache.spark.sql.execution.ResultCacheManager.getOrComputeResultInternal(ResultCacheManager.scala:511) at org.apache.spark.sql.execution.ResultCacheManager.getOrComputeResult(ResultCacheManager.scala:399) at org.apache.spark.sql.execution.CollectLimitExec.executeCollectResult(limit.scala:59) at org.apache.spark.sql.Dataset.collectResult(Dataset.scala:3018) at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3810) at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:2742) at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3802) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withCustomExecutionEnv$5(SQLExecution.scala:126) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:267) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withCustomExecutionEnv$1(SQLExecution.scala:104) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:852) at org.apache.spark.sql.execution.SQLExecution$.withCustomExecutionEnv(SQLExecution.scala:77) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:217) at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3800) at org.apache.spark.sql.Dataset.head(Dataset.scala:2742) at org.apache.spark.sql.Dataset.take(Dataset.scala:2949) at org.apache.spark.sql.Dataset.getRows(Dataset.scala:306) at org.apache.spark.sql.Dataset.showString(Dataset.scala:343) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:380) at py4j.Gateway.invoke(Gateway.java:295) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:251) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.spark.api.python.PythonException: 'TypeError: field age: IntegerType can not accept object '28' in type lt;class 'str'gt;'. Full traceback below: Traceback (most recent call last): File "/databricks/spark/python/pyspark/worker.py", line 713, in main process() File "/databricks/spark/python/pyspark/worker.py", line 705, in process serializer.dump_stream(out_iter, outfile) File "/databricks/spark/python/pyspark/serializers.py", line 267, in dump_stream vs = list(itertools.islice(iterator, batch)) File "/databricks/spark/python/pyspark/util.py", line 72, in wrapper return f(*args, **kwargs) File "/databricks/spark/python/pyspark/sql/session.py", line 729, in prepare verify_func(obj) File "/databricks/spark/python/pyspark/sql/types.py", line 1409, in verify verify_value(obj) File "/databricks/spark/python/pyspark/sql/types.py", line 1390, in verify_struct verifier(v) File "/databricks/spark/python/pyspark/sql/types.py", line 1409, in verify verify_value(obj) File "/databricks/spark/python/pyspark/sql/types.py", line 1329, in verify_integer verify_acceptable_types(obj) File "/databricks/spark/python/pyspark/sql/types.py", line 1291, in verify_acceptable_types raise TypeError(new_msg("%s can not accept object %r in type %s" TypeError: field age: IntegerType can not accept object '28' in type lt;class 'str'gt; at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:661) at org.apache.spark.api.python.PythonRunner$anon$3.read(PythonRunner.scala:813) at org.apache.spark.api.python.PythonRunner$anon$3.read(PythonRunner.scala:795) at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:614) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) at scala.collection.Iterator$anon$11.hasNext(Iterator.scala:489) at scala.collection.Iterator$anon$10.hasNext(Iterator.scala:458) at scala.collection.Iterator$anon$10.hasNext(Iterator.scala:458) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$anon$1.hasNext(WholeStageCodegenExec.scala:757) at org.apache.spark.sql.execution.collect.UnsafeRowBatchUtils$.encodeUnsafeRows(UnsafeRowBatchUtils.scala:80) at org.apache.spark.sql.execution.collect.Collector.$anonfun$processFunc$1(Collector.scala:178) at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$3(ResultTask.scala:75) at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110) at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$1(ResultTask.scala:75) at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:55) at org.apache.spark.scheduler.Task.doRunTask(Task.scala:150) at org.apache.spark.scheduler.Task.$anonfun$run$1(Task.scala:119) at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110) at org.apache.spark.scheduler.Task.run(Task.scala:91) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$13(Executor.scala:812) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1643) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:815) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:671) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ... 1 more
Комментарии:
1. Смотрите, что вы определяете
scehma
переменную, но позже вы используетеschema
здесь:rdf = spark.createDataFrame(rdd2df, schema)
. ОшибкаTypeError: field age: IntegerType can not accept object '28' in type lt;class 'str'gt;
означает, что вы пытаетесь использовать строковую переменную в целочисленном поле. Таким образом, похоже, что вы определили ранее схему с возрастом как тип целого в схеме , которую вы используете вместо неправильно написанной scehma , которую вы определили с помощью правильного типа строки.2. @Pav3k Я попытался исправить опечатку «scehma» на «схема», но ошибка все равно остается такой же, как показано ниже » Ошибка типа: метки полей: тип целого числа не может принять объект «59» в типе lt;класс ‘str’gt;»
3. Почему вы должны использовать RDD в первую очередь? Вы можете просто прочитать CSV напрямую, пропустить заголовок и передать схему
4. @pltc Я пытаюсь преобразовать RDD в DF, где я знаю о методе без создания RDD, но изучаю альтернативные методы использования, и я новичок в среде сбора данных, поэтому попробовал и получил результат, в котором более понятно понимать среду сбора данных для кластера PySpark.
Ответ №1:
@Pav3k Спасибо за ответ, и вы уведомили об опечатке в моем коде, что является одной из причин исключения, кстати, я нашел решение для **IntegerType can not accept object '28' in type lt;class 'str'gt;**
того, где я пытаюсь изменить схему существующего RDD после создания фрейма данных при использовании существующего RDD мы должны привести необходимые типы данных, такие как код:
from pyspark import SparkConf, SparkContext conf = SparkConf().setAppName('RDD2DF') sc = SparkContext.getOrCreate(conf=conf) rdd=sc.textFile('/FileStore/tables/StudentData.csv') header = rdd.first() rdd2df= rdd.filter(lambda x: x!=header).map(lambda x: x.split(',')) rddc = rdd2df.map(lambda x: [int(x[0]),x[1],x[2],x[3],x[4],int(x[5]),x[6]]) rddc.collect()`
после приведения RDD мы должны предоставить схему для фрейма данных, чтобы изменить требуемый тип данных, как указано в приведенном ниже коде.
from pyspark.sql.types import StructType, StructField, StringType, IntegerType schema = StructType([ StructField('age', IntegerType(),True), StructField('gender',StringType(),True), StructField('name',StringType(),True), StructField('course',StringType(),True), StructField('roll',StringType(),True), StructField('marks',IntegerType(),True), StructField('email',StringType(),True) ]) columns = header.split(',') rdf = spark.createDataFrame(rddc, schema) rdf.printSchema() rdf.show()
Наконец, я получил результат после обнаружения опечатки в моем коде и требования к кластерной среде Databricks PySpark, потому что, когда мы выполняем код PySpark в Databricks, некоторые кластеры автоматически учитывают тип данных RDD.
Выход:
rdf:pyspark.sql.dataframe.DataFrame age:integer gender:string name:string course:string roll:string marks:integer email:string