Запускает полное соединение Rdd с разрешаемым java.lang.Исключение IllegalArgumentException: сбой требования: недопустимый размер строки: вместо

#apache-spark #cassandra #cassandra-3.0 #spark-cassandra-connector

#apache-spark #cassandra #cassandra-3.0 #spark-cassandra-connector

Вопрос:

  • В настоящее время я пытаюсь присоединить фрейм данных spark к таблице cassandra.
  • К сожалению, мы не можем немедленно перейти на новый соединитель Datastax 2.5.0 и использовать прямые соединения
  • Итак, я пробую подход Rdd, используя существующий joinWithCassandraTable

Вот мой пример кода

 # Cassandra Table Definition 

custId: text PRIMARY KEY
custName: text
custAddress: text

val testDF = Seq(("event-01", "cust-01"), ("event-02", "cust-02")).toDF(("eventId", "custId"))

val resultRdd = testDF
    .rdd
    .leftJoinWithCassandraTable(
      keyspaceName = "my_key_space",
      tableName = "cust_table",
      selectedColumns = AllColumns,
      joinColumns = SomeColumns("custId")
    )
    .map { case (sparkRow, cassandraRow) =>
      val resultStruct = cassandraRow
        .map(r => Row.fromSeq(r.columnValues))
        .orNull
      Row.fromSeq(sparkRow.toSeq :  resultStruct)
    }
  
  • Это выдает java.lang.Исключение IllegalArgumentException: сбой требования: недопустимый размер строки: 2 вместо 1
  • Если я ограничу testDF только наличие столбца CustID, то это будет работать нормально.
  • Я где-то допускаю ошибку. Как я могу выполнить соединение с полным Rdd вместо проекции только с ключевым столбцом

Ответ №1:

Вам нужно использовать .on(SomeColumns("custId")) сразу после leftJoinWithCassandraTable …

У меня есть сообщение в блоге об эффективном соединении с Cadsandra, и в нем также описывается RDD API…

Комментарии:

1. Спасибо за информацию. Это работает и для меня. Это лучший подход, пока мы не перейдем на использование прямых соединений с фреймами данных