Spark Accumulator генерирует исключение приведения класса при попытке подсчитать количество записей в наборе данных

#scala #apache-spark #dataset #rdd

#scala #apache-spark #набор данных #rdd

Вопрос:

Я пытаюсь подсчитать количество записей в моем наборе данных. Я пытаюсь использовать приведенную ниже логику с использованием аккумуляторов.

     val accum = sc.longAccumulator("My_Accum")
    val fRDD = tempDS.rdd.persist(StorageLevel.MEMORY_AND_DISK).foreach(x=>{
      accum.add(1)
      x
    })

    val recordCount = accum.value

    println("record Count is : " recordCount)
  

Я получаю исключение приведения класса в строке accum.add(1) как
java.lang.ClassCastException: org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema не может быть преобразован в packagename .CaseClassName .

С той же логикой я могу получить значение аккумулятора на моем предыдущем шаге

Может кто-нибудь, пожалуйста, помогите мне, как это решить. Также есть ли какой-либо другой способ подсчета, кроме count() и аккумуляторов.

Комментарии:

1. посмотрите, помогает ли изменение 1L с 1

2. этот код отлично работает для меня, spark 2.4 с scala 2.11

3. для меня это сработало в одном сценарии, но когда я пытаюсь использовать ту же логику, что и сейчас, я получаю эту ошибку. Я думаю, что когда я пытаюсь выполнить dataset.foreach или dataset.map, он не может распознать схему и обрабатывает ее как GenericRow вместо моего класса case

Ответ №1:

Я не думаю, что проблема в аккумуляторе, кажется, что Spark не может преобразовать ваш DataFrame , т. Е. Dataset[Row] В a Dataset[packagename.CaseClassName] (который вы не показываете в своем коде).

Кроме того, это действительно нетрадиционный способ подсчета количества строк, я бы НЕ рекомендовал это. Самый быстрый способ — .count ваш Dataset , RDD в большинстве случаев использование медленнее

Комментарии:

1. Я использую Dataset[MyClass], а не dataframe. Кроме того, подсчет является дорогостоящей операцией, поэтому я хочу использовать аккумуляторы