#tensorflow #pyspark
Вопрос:
У меня возникла проблема при преобразовании фрейма данных pyspark в файл .tfrecord из-за разреженных столбцов через VectorAssembler в PySpark. Я думал о повторном преобразовании столбцов VectorUDT в строку или около того, но я хочу, чтобы он читался как ввод TF более стабильным способом. VectorType на самом деле поддерживает, но VectorUDT-нет. Вот вывод ошибки после пробного преобразования:
Py4JJavaError: Произошла ошибка при вызове o137.сохранить. : org.apache.spark.Исключение SparkException: Задание прервано. в org.apache.spark.sql.выполнение.источники данных.FileFormatWriter$.запись(FileFormatWriter.scala:231) в org.apache.spark.sql.выполнение.источники данных.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:188) в org.apache.spark.sql.выполнение.команда.Запись данных commandexec.sideffectresult$lzycompute(команды.scala:108) в org.apache.spark.sql.выполнение.команда.Запись данных commandexec.sideffectresult(commands.scala:106) в организации.apache.spark.sql.выполнение.команды.Запись данных commandexec.doExecute(команды.scala:131) в org.apache.spark.sql.выполнение.SparkPlan.$anonfun$выполнить$1(SparkPlan.scala:180) в org.apache.spark.sql.выполнение.SparkPlan.$anonfun$ExecuteQuery$1(SparkPlan.scala:218) в org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) в org.apache.spark.sql.выполнение.SparkPlan.ExecuteQuery(SparkPlan.scala:215) в org.apache.spark.sql.выполнение.SparkPlan.выполнить(SparkPlan.scala:176) в org.apache.spark.sql.выполнение.Выполнение запроса.toRdd$lzycompute(QueryExecution.scala:132) в организации.apache.spark.sql.выполнение.Выполнение запроса.toRdd(QueryExecution.scala:131) в org.apache.spark.sql.DataFrameWriter.$anonfun$runcomand$1(DataFrameWriter.scala:989) в org.apache.spark.sql.выполнение.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103) в org.apache.spark.sql.выполнение.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163) в org.apache.spark.sql.выполнение.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90) в сеансе org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775) в организации.apache.spark.sql.выполнение.SQLExecution$.withNewExecutionId(SQLExecution.в Scala:64) в орг.Апачи.Искра.язык SQL.DataFrameWriter.выполнитькоманду(DataFrameWriter.в Scala:989) в орг.Апачи.Искра.язык SQL.DataFrameWriter.saveToV1Source(DataFrameWriter.в Scala:438) в орг.Апачи.Искра.язык SQL.DataFrameWriter.saveInternal(DataFrameWriter.в Scala:415) в орг.Апачи.Искра.язык SQL.DataFrameWriter.метод save(DataFrameWriter.в Scala:293) в Java.база с JDK.внутренние.отражения.NativeMethodAccessorImpl.invoke0(родной метод) на языке Java.база с JDK.внутренние.отражения.NativeMethodAccessorImpl.командлет Invoke(NativeMethodAccessorImpl.Ява:62) на языке Java.база/jdk.внутренний.отражение.Делегирование methodaccessorimpl.invoke(делегирование methodaccessorimpl.java:43) в java.base/java.lang.reflect.Метод.вызов(Метод.java:566) в py4j.отражение.Методинвокатор.вызов(методинвокатор.java:244) в py4j.отражение.ReflectionEngine.invoke(ReflectionEngine.java:357) в py4j.Шлюз.вызов(шлюз.java:282) в py4j.команды.AbstractCommand.Метод вызова(AbstractCommand.java:132) в py4j.команды.Команда вызова.выполнить(команда вызова.java:79) в py4j.GatewayConnection.запустите(GatewayConnection.java:238) в java.база/java.lang.Thread.run(Thread.java:829) Вызвано: исключение org.apache.spark.SparkException: Задание прервано из-за сбоя на этапе: Задание 0 на этапе 13.0 выполнено 1 раз, последний сбой: Потеряно задание 0.0 на этапе 13.0 (TID 18) (драйвер исполнителя f2a1abf16740): исключение java.lang.RuntimeException: Не удается преобразовать поле в неподдерживаемый тип данных org.apache.spark.ml.linalg.VectorUDT@3bfc3ba7 по адресу com.linkedin.spark.источники данных.tfrecord.Преобразователь TFRecordSerializer.newFeatureConverter(TFRecordSerializer.scala:147) по адресу com.linkedin.spark.источники данных.tfrecord.TFRecordSerializer.$anonfun$Преобразователи функций$2(TFRecordSerializer.scala:14) в scala.collection.Проходимость.$anonfun$карта$1(проходимость.scala:238) в scala.collection.неизменяемый.Список.для каждого(Список.scala:392) в scala.коллекция.Карта, похожая на траверсируемую(TraversableLike.scala:238) в scala.collection.Доступная.карта$(доступная.scala:231) в scala.collection.неизменяемая.Список.карта(Список.scala:298) в com.linkedin.spark.источники данных.tfrecord.TFRecordSerializer.(TFRecordSerializer.scala:14) по адресу com.linkedin.spark.источники данных.tfrecord.TFRecordOutputWriter.(TFRecordOutputWriter.scala:24) в com.linkedin.spark.источники данных.tfrecord.defaultSource$$anon$1.Новый экземпляр(defaultSource.scala:79) в org.apache.spark.sql.execution.источники данных.SingleDirectoryDataWriter.newOutputWriter(FileFormatDataWriter.scala:126) в org.apache.spark.sql.выполнение.источники данных.Однократный редактор.(FileFormatDataWriter.scala:111) в org.apache.spark.sql.выполнение.источники данных.FileFormatWriter$.Задача выполнения(FileFormatWriter.scala:269) в организации.apache.spark.sql.выполнение.источники данных.Файлоформатрайтер$.$anonfun$запись$15(файлоформатрайтер.scala:210) в org.apache.spark.планировщик.ResultTask.RunTask(ResultTask.scala:90) в org.apache.spark.планировщик.Задача.выполнение(Задача.scala:131) в org.apache.spark.исполнитель.Исполнитель$TaskRunner.$anonfun$выполнить$3(Исполнитель.scala:497) в org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439) в org.apache.spark.executor.Исполнитель$TaskRunner.run(Исполнитель.scala:500) в java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) на java.база/java.util.параллельный.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) в java.base/java.lang.Thread.run(поток.java:829)
Трассировка стека драйверов: в org.apache.spark.планировщик.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2258) at org.apache.spark.scheduler.DAGScheduler.$anonfun$Этап прерывания$2(DAGScheduler.scala:2207) в org.apache.spark.планировщик.DAGScheduler.$anonfun$Этап прерывания$2$адаптирован(DAGScheduler.scala:2206) в scala.collection.изменяемый.Изменяемый размер.foreach(изменяемый размер.scala:62) в scala.collection.изменяемый.Изменяемый размер.foreach$(изменяемый размер.scala:55) в scala.collection.изменяемый.ArrayBuffer.foreach(ArrayBuffer.scala:49) в организации.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2206) в org.apache.spark.планировщик.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1079) в org.apache.spark.планировщик.DAGScheduler.$анонсировал$handletaskset отправил$1$адаптированный(DAGScheduler.scala:1079) в scala.Вариант.для каждого(Опция.scala:407) в org.apache.spark.планировщик.DAGScheduler.handletaskset отправил сообщение(DAGScheduler.scala:1079) в org.apache.spark.планировщик.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2445) в орг.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2387) в org.apache.spark.планировщик.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2376) в org.apache.spark.util.Цикл событий$$анон$1.запустите(цикл событий.scala:49) в org.apache.spark.планировщик.DAGScheduler.runJob(DAGScheduler.scala:868) в org.apache.spark.SparkContext.runJob(SparkContext.scala:2196) в org.apache.spark.sql.выполнение.источники данных.Файл formatwriter$.запись(файл formatwriter.scala:200) … еще 32 Вызвано: исключение java.lang.RuntimeException: Не удается преобразовать поле в неподдерживаемый тип данных org.apache.spark.ml.linalg.VectorUDT@3bfc3ba7 по адресу com.linkedin.spark.источники данных.tfrecord.Преобразователь TFRecordSerializer.newFeatureConverter(TFRecordSerializer.scala:147) в com.linkedin.spark.источники данных.tfrecord.TFRecordSerializer.$anonfun$Преобразователи функций$2(TFRecordSerializer.scala:14) в scala.collection.Проходимость.$anonfun$карта$1(проходимость.scala:238) в scala.collection.неизменяемый.Список.для каждого(Список.scala:392) в scala.коллекция.Карта, похожая на траверсируемую(TraversableLike.scala:238) в scala.collection.Траверсируемая.карта$(траверсируемая.scala:231) в scala.коллекция.неизменяемый.Список.карта(Список.scala:298) в com.linkedin.spark.источники данных.tfrecord.TFRecordSerializer.(TFRecordSerializer.scala:14) на com.linkedin.spark.источники данных.tfrecord.TFRecordOutputWriter.(TFRecordOutputWriter.scala:24) в com.linkedin.spark.источники данных.tfrecord.defaultSource$$anon$1.Новый экземпляр(defaultSource.scala:79) в org.apache.spark.sql.execution.источники данных.SingleDirectoryDataWriter.newOutputWriter(FileFormatDataWriter.scala:126) в org.apache.spark.sql.выполнение.источники данных.Однократный редактор.(файлоформатдатавритер.scala:111) в организации.apache.spark.sql.выполнение.источники данных.FileFormatWriter$.Задача выполнения(FileFormatWriter.scala:269) в org.apache.spark.sql.выполнение.источники данных.Файлоформатрайтер$.$anonfun$запись$15(файлоформатрайтер.scala:210) в org.apache.spark.планировщик.ResultTask.RunTask(ResultTask.scala:90) в org.apache.spark.планировщик.Задача.выполнение(Задача.scala:131) в org.apache.spark.исполнитель.Исполнитель$TaskRunner.$anonfun$выполнить$3(Исполнитель.scala:497) в org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439) в org.apache.spark.executor.Исполнитель$TaskRunner.run(Исполнитель.scala:500) на java.база/java.util.параллельный.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) в java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) … еще 1