Pyspark — Bigquery сохраняет df в hdfs

# #python #apache-spark #google-bigquery

Вопрос:

Я использую spark-bigquery ( spark-bigquery-with-dependencies_2.11-0.21.1.jar ) соединитель для чтения из bigquery, я могу считывать данные из bigquery и хранить как df.

Я сталкиваюсь с приведенной ниже ошибкой при попытке выполнить какое-либо действие( запись в hdfs ) или любое преобразование. Даже df.show() иногда выдает ошибку.

используя python 3.6 и spark 2.3.2. Я проверил данные в bigquery, и там нет пустых столбцов

 df.printSchema()
root
 |-- col1: long (nullable = true)
 |-- col2: long (nullable = true)
 |-- col3: long (nullable = true)
 |-- col4: long (nullable = true)
 |-- col5: long (nullable = true)
 |-- col6: decimal(38,9) (nullable = true)
 |-- col7: string (nullable = true)
 |-- col8: decimal(38,9) (nullable = true)
 |-- col9: string (nullable = true)
 |-- col10: long (nullable = true)
 

df.write.format('orc').path('/target/hdfs/path')

Ошибка

 Caused by: java.lang.IllegalStateException: Value at index is null
    at com.google.cloud.spark.bigquery.repackaged.org.apache.arrow.vector.BigIntVector.get(BigIntVector.java:112)
    at com.google.cloud.spark.bigquery.ArrowSchemaConverter$LongAccessor.getLong(ArrowSchemaConverter.java:364)
    at com.google.cloud.spark.bigquery.ArrowSchemaConverter.getLong(ArrowSchemaConverter.java:98)
    at org.apache.spark.sql.execution.vectorized.MutableColumnarRow.getLong(MutableColumnarRow.java:120)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$anonfun$10$anon$1.hasNext(WholeStageCodegenExec.scala:614)
    at org.apache.spark.sql.execution.SparkPlan$anonfun$2.apply(SparkPlan.scala:253)
    at org.apache.spark.sql.execution.SparkPlan$anonfun$2.apply(SparkPlan.scala:247)
    at org.apache.spark.rdd.RDD$anonfun$mapPartitionsInternal$1$anonfun$apply$25.apply(RDD.scala:836)
    at org.apache.spark.rdd.RDD$anonfun$mapPartitionsInternal$1$anonfun$apply$25.apply(RDD.scala:836)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:49)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
 

Комментарии:

1. Вы проверили нулевые значения для всех столбцов типа long? В соответствии с определением схемы в вопросе все столбцы могут иметь нулевое значение. Вы также можете попробовать выполнить запрос, чтобы получить отдельное значение каждого столбца отдельно, и посмотреть, получите ли вы значение null для любого столбца в самом большом запросе.

2. да, для всех столбцов в моем источнике нет нулевых значений.