Достижение жестко заданного предела в коде Spark для огромного количества столбцов при обработке corr(корреляции)

#apache-spark #apache-spark-sql #apache-spark-mllib

Вопрос:

Столкнувшись здесь с проблемой, столкнувшись с жестким ограничением в коде spark из-за огромного количества столбцов(20 тыс.) при обработке corr (корреляции). Сведения о функциях, org.apache.spark.ml.статистика.Корреляция.корр.

По деловым соображениям мы не можем дополнительно уменьшить количество столбцов, не допустив ошибки.

Сталкивался ли кто-нибудь с такой ситуацией раньше и нашел ли stratergy способ обойти это?

Подробная информация о коде Spark и стеке ниже. https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/ml/stat/Correlation.scala

Корреляционная функция здесь,

 line 67:   def corr(dataset: Dataset[_], column: String, method: String): DataFrame = {}
Upon the action trigger at line 74    dataset.sparkSession.createDataFrame(Seq(Row(oldM.asML)).asJava, schema)
 

Обработка достигла жесткого предела из-за функции размера массива.

 public static UnsafeArrayData fromPrimitiveArray(
   Object arr, int offset, int length, int elementSize) {
final long headerInBytes = calculateHeaderPortionInBytes(length);
final long valueRegionInBytes = (long)elementSize * length;
final long totalSizeInLongs = (headerInBytes   valueRegionInBytes   7) / 8;
if (totalSizeInLongs > Integer.MAX_VALUE / 8) {
  throw new UnsupportedOperationException("Cannot convert this array to unsafe format as "  
    "it's too big.");
} 
 

Сложите, как показано ниже.

 21/08/06 07:07:33 ERROR Uncaught throwable from user code: java.lang.UnsupportedOperationException: Cannot convert this array to unsafe format as it's too big.
at org.apache.spark.sql.catalyst.expressions.UnsafeArrayData.fromPrimitiveArray(UnsafeArrayData.java:431)
at org.apache.spark.sql.catalyst.expressions.UnsafeArrayData.fromPrimitiveArray(UnsafeArrayData.java:498)
at org.apache.spark.ml.linalg.MatrixUDT.serialize(MatrixUDT.scala:66)
at org.apache.spark.ml.linalg.MatrixUDT.serialize(MatrixUDT.scala:28)
at org.apache.spark.sql.catalyst.CatalystTypeConverters$UDTConverter.toCatalystImpl(CatalystTypeConverters.scala:147)
at org.apache.spark.sql.catalyst.CatalystTypeConverters$CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:107)
at org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:252)
at org.apache.spark.sql.catalyst.CatalystTypeConverters$StructConverter.toCatalystImpl(CatalystTypeConverters.scala:242)
at org.apache.spark.sql.catalyst.CatalystTypeConverters$CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:107)
at org.apache.spark.sql.catalyst.CatalystTypeConverters$.$anonfun$createToCatalystConverter$2(CatalystTypeConverters.scala:425)
at org.apache.spark.sql.catalyst.plans.logical.LocalRelation$.$anonfun$fromExternalRows$1(LocalRelation.scala:37)
at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
at scala.collection.Iterator.foreach(Iterator.scala:941)
at scala.collection.Iterator.foreach$(Iterator.scala:941)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
at scala.collection.IterableLike.foreach(IterableLike.scala:74)
at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
at scala.collection.TraversableLike.map(TraversableLike.scala:238)
at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
at scala.collection.AbstractTraversable.map(Traversable.scala:108)
at org.apache.spark.sql.catalyst.plans.logical.LocalRelation$.fromExternalRows(LocalRelation.scala:37)
at org.apache.spark.sql.SparkSession.$anonfun$createDataFrame$4(SparkSession.scala:432)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:845)
at org.apache.spark.sql.SparkSession.createDataFrame(SparkSession.scala:432)