Создание фрейма данных из RDD — PySpark

#python-3.x #apache-spark #pyspark #rdd

Вопрос:

Я не могу показать фрейм данных, в котором при создании DF из существующего RDD путем предоставления требуемой схемы при выполнении кода PySpark возникло следующее исключение: этот код выполняется на платформе сообщества Databricks. пожалуйста, помогите с решением, чтобы отобразить фрейм данных в качестве вывода.

код:

 from pyspark import SparkConf, SparkContext conf = SparkConf().setAppName('RDD2DF') sc = SparkContext.getOrCreate(conf=conf)  rdd=sc.textFile('/FileStore/tables/StudentData.csv') header = rdd.first() rdd2df= rdd.filter(lambda x: x!=header).map(lambda x: x.split(','))  from pyspark.sql.types import StructType, StructField, StringType, IntegerType  scehma = StructType([  StructField('age', StringType(),True),  StructField('gender',StringType(),True),  StructField('name',StringType(),True),  StructField('course',StringType(),True),  StructField('roll',IntegerType(),True),  StructField('marks',IntegerType(),True),  StructField('email',StringType(),True) ])  columns = header.split(',') rdf = spark.createDataFrame(rdd2df, schema) rdf.printSchema() rdf.show()  

Ошибка:

 --------------------------------------------------------------------------- Py4JJavaError Traceback (most recent call last) lt;command-1654966391766461gt; in lt;modulegt;  22 rdf = spark.createDataFrame(rdd2df, schema)  23 rdf.printSchema() ---gt; 24 rdf.show()  /databricks/spark/python/pyspark/sql/dataframe.py in show(self, n, truncate, vertical)  488 """  489 if isinstance(truncate, bool) and truncate: --gt; 490 print(self._jdf.showString(n, 20, vertical))  491 else:  492 print(self._jdf.showString(n, int(truncate), vertical))  /databricks/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py in __call__(self, *args)  1302   1303 answer = self.gateway_client.send_command(command) -gt; 1304 return_value = get_return_value(  1305 answer, self.gateway_client, self.target_id, self.name)  1306   /databricks/spark/python/pyspark/sql/utils.py in deco(*a, **kw)  115 def deco(*a, **kw):  116 try: --gt; 117 return f(*a, **kw)  118 except py4j.protocol.Py4JJavaError as e:  119 converted = convert_exception(e.java_exception)  /databricks/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)  324 value = OUTPUT_CONVERTER[type](answer[2:], gateway_client)  325 if answer[1] == REFERENCE_TYPE: --gt; 326 raise Py4JJavaError(  327 "An error occurred while calling {0}{1}{2}.n".  328 format(target_id, ".", name), value)  Py4JJavaError: An error occurred while calling o5174.showString. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 50.0 failed 1 times, most recent failure: Lost task 0.0 in stage 50.0 (TID 53) (ip-10-172-163-248.us-west-2.compute.internal executor driver): org.apache.spark.api.python.PythonException: 'TypeError: field age: IntegerType can not accept object '28' in type lt;class 'str'gt;'. Full traceback below: Traceback (most recent call last):  File "/databricks/spark/python/pyspark/worker.py", line 713, in main  process()  File "/databricks/spark/python/pyspark/worker.py", line 705, in process  serializer.dump_stream(out_iter, outfile)  File "/databricks/spark/python/pyspark/serializers.py", line 267, in dump_stream  vs = list(itertools.islice(iterator, batch))  File "/databricks/spark/python/pyspark/util.py", line 72, in wrapper  return f(*args, **kwargs)  File "/databricks/spark/python/pyspark/sql/session.py", line 729, in prepare  verify_func(obj)  File "/databricks/spark/python/pyspark/sql/types.py", line 1409, in verify  verify_value(obj)  File "/databricks/spark/python/pyspark/sql/types.py", line 1390, in verify_struct  verifier(v)  File "/databricks/spark/python/pyspark/sql/types.py", line 1409, in verify  verify_value(obj)  File "/databricks/spark/python/pyspark/sql/types.py", line 1329, in verify_integer  verify_acceptable_types(obj)  File "/databricks/spark/python/pyspark/sql/types.py", line 1291, in verify_acceptable_types  raise TypeError(new_msg("%s can not accept object %r in type %s" TypeError: field age: IntegerType can not accept object '28' in type lt;class 'str'gt;   at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:661)  at org.apache.spark.api.python.PythonRunner$anon$3.read(PythonRunner.scala:813)  at org.apache.spark.api.python.PythonRunner$anon$3.read(PythonRunner.scala:795)  at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:614)  at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)  at scala.collection.Iterator$anon$11.hasNext(Iterator.scala:489)  at scala.collection.Iterator$anon$10.hasNext(Iterator.scala:458)  at scala.collection.Iterator$anon$10.hasNext(Iterator.scala:458)  at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)  at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)  at org.apache.spark.sql.execution.WholeStageCodegenExec$anon$1.hasNext(WholeStageCodegenExec.scala:757)  at org.apache.spark.sql.execution.collect.UnsafeRowBatchUtils$.encodeUnsafeRows(UnsafeRowBatchUtils.scala:80)  at org.apache.spark.sql.execution.collect.Collector.$anonfun$processFunc$1(Collector.scala:178)  at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$3(ResultTask.scala:75)  at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)  at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$1(ResultTask.scala:75)  at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)  at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:55)  at org.apache.spark.scheduler.Task.doRunTask(Task.scala:150)  at org.apache.spark.scheduler.Task.$anonfun$run$1(Task.scala:119)  at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)  at org.apache.spark.scheduler.Task.run(Task.scala:91)  at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$13(Executor.scala:812)  at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1643)  at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:815)  at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)  at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:671)  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)  at java.lang.Thread.run(Thread.java:748)  Driver stacktrace:  at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2765)  at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2712)  at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2706)  at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)  at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)  at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)  at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2706)  at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1255)  at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1255)  at scala.Option.foreach(Option.scala:407)  at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1255)  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2973)  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2914)  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2902)  at org.apache.spark.util.EventLoop$anon$1.run(EventLoop.scala:49)  at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:1028)  at org.apache.spark.SparkContext.runJobInternal(SparkContext.scala:2446)  at org.apache.spark.sql.execution.collect.Collector.runSparkJobs(Collector.scala:289)  at org.apache.spark.sql.execution.collect.Collector.collect(Collector.scala:299)  at org.apache.spark.sql.execution.collect.Collector$.collect(Collector.scala:82)  at org.apache.spark.sql.execution.collect.Collector$.collect(Collector.scala:88)  at org.apache.spark.sql.execution.collect.InternalRowFormat$.collect(cachedSparkResults.scala:75)  at org.apache.spark.sql.execution.collect.InternalRowFormat$.collect(cachedSparkResults.scala:62)  at org.apache.spark.sql.execution.ResultCacheManager.$anonfun$getOrComputeResultInternal$1(ResultCacheManager.scala:512)  at scala.Option.getOrElse(Option.scala:189)  at org.apache.spark.sql.execution.ResultCacheManager.getOrComputeResultInternal(ResultCacheManager.scala:511)  at org.apache.spark.sql.execution.ResultCacheManager.getOrComputeResult(ResultCacheManager.scala:399)  at org.apache.spark.sql.execution.CollectLimitExec.executeCollectResult(limit.scala:59)  at org.apache.spark.sql.Dataset.collectResult(Dataset.scala:3018)  at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3810)  at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:2742)  at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3802)  at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withCustomExecutionEnv$5(SQLExecution.scala:126)  at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:267)  at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withCustomExecutionEnv$1(SQLExecution.scala:104)  at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:852)  at org.apache.spark.sql.execution.SQLExecution$.withCustomExecutionEnv(SQLExecution.scala:77)  at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:217)  at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3800)  at org.apache.spark.sql.Dataset.head(Dataset.scala:2742)  at org.apache.spark.sql.Dataset.take(Dataset.scala:2949)  at org.apache.spark.sql.Dataset.getRows(Dataset.scala:306)  at org.apache.spark.sql.Dataset.showString(Dataset.scala:343)  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)  at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)  at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)  at java.lang.reflect.Method.invoke(Method.java:498)  at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)  at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:380)  at py4j.Gateway.invoke(Gateway.java:295)  at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)  at py4j.commands.CallCommand.execute(CallCommand.java:79)  at py4j.GatewayConnection.run(GatewayConnection.java:251)  at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.spark.api.python.PythonException: 'TypeError: field age: IntegerType can not accept object '28' in type lt;class 'str'gt;'. Full traceback below: Traceback (most recent call last):  File "/databricks/spark/python/pyspark/worker.py", line 713, in main  process()  File "/databricks/spark/python/pyspark/worker.py", line 705, in process  serializer.dump_stream(out_iter, outfile)  File "/databricks/spark/python/pyspark/serializers.py", line 267, in dump_stream  vs = list(itertools.islice(iterator, batch))  File "/databricks/spark/python/pyspark/util.py", line 72, in wrapper  return f(*args, **kwargs)  File "/databricks/spark/python/pyspark/sql/session.py", line 729, in prepare  verify_func(obj)  File "/databricks/spark/python/pyspark/sql/types.py", line 1409, in verify  verify_value(obj)  File "/databricks/spark/python/pyspark/sql/types.py", line 1390, in verify_struct  verifier(v)  File "/databricks/spark/python/pyspark/sql/types.py", line 1409, in verify  verify_value(obj)  File "/databricks/spark/python/pyspark/sql/types.py", line 1329, in verify_integer  verify_acceptable_types(obj)  File "/databricks/spark/python/pyspark/sql/types.py", line 1291, in verify_acceptable_types  raise TypeError(new_msg("%s can not accept object %r in type %s" TypeError: field age: IntegerType can not accept object '28' in type lt;class 'str'gt;   at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:661)  at org.apache.spark.api.python.PythonRunner$anon$3.read(PythonRunner.scala:813)  at org.apache.spark.api.python.PythonRunner$anon$3.read(PythonRunner.scala:795)  at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:614)  at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)  at scala.collection.Iterator$anon$11.hasNext(Iterator.scala:489)  at scala.collection.Iterator$anon$10.hasNext(Iterator.scala:458)  at scala.collection.Iterator$anon$10.hasNext(Iterator.scala:458)  at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)  at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)  at org.apache.spark.sql.execution.WholeStageCodegenExec$anon$1.hasNext(WholeStageCodegenExec.scala:757)  at org.apache.spark.sql.execution.collect.UnsafeRowBatchUtils$.encodeUnsafeRows(UnsafeRowBatchUtils.scala:80)  at org.apache.spark.sql.execution.collect.Collector.$anonfun$processFunc$1(Collector.scala:178)  at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$3(ResultTask.scala:75)  at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)  at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$1(ResultTask.scala:75)  at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)  at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:55)  at org.apache.spark.scheduler.Task.doRunTask(Task.scala:150)  at org.apache.spark.scheduler.Task.$anonfun$run$1(Task.scala:119)  at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)  at org.apache.spark.scheduler.Task.run(Task.scala:91)  at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$13(Executor.scala:812)  at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1643)  at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:815)  at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)  at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:671)  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)  ... 1 more      

Комментарии:

1. Смотрите, что вы определяете scehma переменную, но позже вы используете schema здесь: rdf = spark.createDataFrame(rdd2df, schema) . Ошибка TypeError: field age: IntegerType can not accept object '28' in type lt;class 'str'gt; означает, что вы пытаетесь использовать строковую переменную в целочисленном поле. Таким образом, похоже, что вы определили ранее схему с возрастом как тип целого в схеме , которую вы используете вместо неправильно написанной scehma , которую вы определили с помощью правильного типа строки.

2. @Pav3k Я попытался исправить опечатку «scehma» на «схема», но ошибка все равно остается такой же, как показано ниже » Ошибка типа: метки полей: тип целого числа не может принять объект «59» в типе lt;класс ‘str’gt;»

3. Почему вы должны использовать RDD в первую очередь? Вы можете просто прочитать CSV напрямую, пропустить заголовок и передать схему

4. @pltc Я пытаюсь преобразовать RDD в DF, где я знаю о методе без создания RDD, но изучаю альтернативные методы использования, и я новичок в среде сбора данных, поэтому попробовал и получил результат, в котором более понятно понимать среду сбора данных для кластера PySpark.

Ответ №1:

@Pav3k Спасибо за ответ, и вы уведомили об опечатке в моем коде, что является одной из причин исключения, кстати, я нашел решение для **IntegerType can not accept object '28' in type lt;class 'str'gt;** того, где я пытаюсь изменить схему существующего RDD после создания фрейма данных при использовании существующего RDD мы должны привести необходимые типы данных, такие как код:

 from pyspark import SparkConf, SparkContext conf = SparkConf().setAppName('RDD2DF') sc = SparkContext.getOrCreate(conf=conf)  rdd=sc.textFile('/FileStore/tables/StudentData.csv') header = rdd.first() rdd2df= rdd.filter(lambda x: x!=header).map(lambda x: x.split(',')) rddc = rdd2df.map(lambda x: [int(x[0]),x[1],x[2],x[3],x[4],int(x[5]),x[6]]) rddc.collect()`  

после приведения RDD мы должны предоставить схему для фрейма данных, чтобы изменить требуемый тип данных, как указано в приведенном ниже коде.

 from pyspark.sql.types import StructType, StructField, StringType, IntegerType  schema = StructType([  StructField('age', IntegerType(),True),  StructField('gender',StringType(),True),  StructField('name',StringType(),True),  StructField('course',StringType(),True),  StructField('roll',StringType(),True),  StructField('marks',IntegerType(),True),  StructField('email',StringType(),True) ])  columns = header.split(',') rdf = spark.createDataFrame(rddc, schema) rdf.printSchema() rdf.show()  

Наконец, я получил результат после обнаружения опечатки в моем коде и требования к кластерной среде Databricks PySpark, потому что, когда мы выполняем код PySpark в Databricks, некоторые кластеры автоматически учитывают тип данных RDD.

Выход:

 rdf:pyspark.sql.dataframe.DataFrame age:integer gender:string name:string course:string roll:string marks:integer email:string