Почему функции года и месяца приводят к длительному переполнению в Spark?

#scala #apache-spark #cassandra #spark-cassandra-connector

Вопрос:

Я пытаюсь создать столбцы year и month из столбца с именем logtimestamp (типа TimestampType) в spark. Источником данных является cassandra. Я использую sparkshell для выполнения этих шагов, вот код, который я написал —

 import org.apache.spark.sql.cassandra._
import org.apache.spark.sql.types._
var logsDF = spark.read.cassandraFormat("tableName", "cw").load()
var newlogs = logsDF.withColumn("year", year(col("logtimestamp")))
 .withColumn("month", month(col("logtimestamp")))
newlogs.write.cassandraFormat("tableName_v2", "cw")
 .mode("Append").save()
 

Но эти шаги не увенчались успехом, я получаю следующую ошибку

 java.lang.ArithmeticException: long overflow
    at java.lang.Math.multiplyExact(Math.java:892)
    at org.apache.spark.sql.catalyst.util.DateTimeUtils$.millisToMicros(DateTimeUtils.scala:205)
    at org.apache.spark.sql.catalyst.util.DateTimeUtils$.fromJavaTimestamp(DateTimeUtils.scala:166)
    at org.apache.spark.sql.catalyst.CatalystTypeConverters$TimestampConverter$.toCatalystImpl(CatalystTypeConverters.scala:327)
    at org.apache.spark.sql.catalyst.CatalystTypeConverters$TimestampConverter$.toCatalystImpl(CatalystTypeConverters.scala:325)
    at org.apache.spark.sql.catalyst.CatalystTypeConverters$CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:107)
    at org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:252)
    at org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:242)
    at org.apache.spark.sql.catalyst.CatalystTypeConverters$CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:107)
    at org.apache.spark.sql.catalyst.CatalystTypeConverters$.$anonfun$createToCatalystConverter$2(CatalystTypeConverters.scala:426)
    at com.datastax.spark.connector.datasource.UnsafeRowReader.read(UnsafeRowReaderFactory.scala:34)
    at com.datastax.spark.connector.datasource.UnsafeRowReader.read(UnsafeRowReaderFactory.scala:21)
    at com.datastax.spark.connector.datasource.CassandraPartitionReaderBase.$anonfun$getIterator$2(CassandraScanPartitionReaderFactory.scala:110)
    at scala.collection.Iterator$anon$10.next(Iterator.scala:459)
    at scala.collection.Iterator$anon$11.next(Iterator.scala:494)
    at com.datastax.spark.connector.datasource.CassandraPartitionReaderBase.next(CassandraScanPartitionReaderFactory.scala:66)
    at org.apache.spark.sql.execution.datasources.v2.PartitionIterator.hasNext(DataSourceRDD.scala:79)
    at org.apache.spark.sql.execution.datasources.v2.MetricsIterator.hasNext(DataSourceRDD.scala:112)
    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
    at scala.collection.Iterator$anon$10.hasNext(Iterator.scala:458)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$anon$1.hasNext(WholeStageCodegenExec.scala:755)
    at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.$anonfun$run$1(WriteToDataSourceV2Exec.scala:413)
    at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1473)
    at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:452)
    at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.$anonfun$writeWithV2$2(WriteToDataSourceV2Exec.scala:360)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:131)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
 

Я подумал, что это как-то связано с нулевыми значениями в таблице, поэтому я выполнил следующее

 scala> logsDF.filter("logtimestamp is null").show()
 

Но это тоже дало ту же ошибку длительного переполнения.

Почему происходит переполнение в spark, но не в cassandra, когда оба имеют временные метки по 8 байт? В чем здесь может быть проблема и как мне правильно извлечь год и месяц из метки времени?

Комментарии:

1. Не могли бы вы поделиться примером значений из logtimestamp столбца?

2. @Gabip Да, вот несколько примеров logtimestamp (как они показаны в spark) — 2021-03-04 10:29:59.311, 2021-03-04 10:29:59.014, 2021-05-03 21:29:56.699. Их тип — TimestampType в соответствии с logsDF.dtypes

3. Может issues.apache.org/jira/browse/SPARK-35679 быть родственником?

4. @mazaneicha О, может ли это быть проблемой? Но я использую spark 3.1.2 с scala 2.12.10. В ссылке ничего не упоминается о 3.1.2

5. В нем упоминается, что проблема исправлена в 3.1.3

Ответ №1:

Оказывается, одна из таблиц cassandra имела значение метки времени, которое было больше максимального значения, разрешенного spark, но недостаточно велико для переполнения в cassandra. Временная метка была отредактирована вручную, чтобы обойти апсертинг, который выполняется по умолчанию в cassandra, но это привело к формированию некоторых больших значений во время разработки. Запустил скрипт на python, чтобы выяснить это.

Комментарии:

1. Хорошая находка! Можете ли вы добавить примерное значение, которое вызывало это?

2. @mazaneicha К сожалению, я удалил эти строки. Это было просто очень большое число. В то время как Cassandra показывала другие правильные временные метки в формате гггг-ММ-чч …, он показывал число как есть.

3. Ааа, тонкости схемы при чтении?

4. @mazaneicha Да, я думаю, что так оно и было. Много раз пытался отключить его для соединителя cassandra и использовать BigInt для метки времени во время отладки, но безуспешно.