#apache-spark #cassandra #cassandra-3.0 #spark-cassandra-connector
#apache-spark #cassandra #cassandra-3.0 #spark-cassandra-connector
Вопрос:
- В настоящее время я пытаюсь присоединить фрейм данных spark к таблице cassandra.
- К сожалению, мы не можем немедленно перейти на новый соединитель Datastax 2.5.0 и использовать прямые соединения
- Итак, я пробую подход Rdd, используя существующий joinWithCassandraTable
Вот мой пример кода
# Cassandra Table Definition
custId: text PRIMARY KEY
custName: text
custAddress: text
val testDF = Seq(("event-01", "cust-01"), ("event-02", "cust-02")).toDF(("eventId", "custId"))
val resultRdd = testDF
.rdd
.leftJoinWithCassandraTable(
keyspaceName = "my_key_space",
tableName = "cust_table",
selectedColumns = AllColumns,
joinColumns = SomeColumns("custId")
)
.map { case (sparkRow, cassandraRow) =>
val resultStruct = cassandraRow
.map(r => Row.fromSeq(r.columnValues))
.orNull
Row.fromSeq(sparkRow.toSeq : resultStruct)
}
- Это выдает java.lang.Исключение IllegalArgumentException: сбой требования: недопустимый размер строки: 2 вместо 1
- Если я ограничу
testDF
только наличие столбца CustID, то это будет работать нормально. - Я где-то допускаю ошибку. Как я могу выполнить соединение с полным Rdd вместо проекции только с ключевым столбцом
Ответ №1:
Вам нужно использовать .on(SomeColumns("custId"))
сразу после leftJoinWithCassandraTable …
У меня есть сообщение в блоге об эффективном соединении с Cadsandra, и в нем также описывается RDD API…
Комментарии:
1. Спасибо за информацию. Это работает и для меня. Это лучший подход, пока мы не перейдем на использование прямых соединений с фреймами данных