#dataframe #apache-spark #cassandra #rdd #spark-cassandra-connector
Вопрос:
Создать таблицу —
CREATE TABLE test.word_groups (group text, word text, count int,PRIMARY KEY (group,word));
Вставка Данных —
INSERT INTO test.word_groups (group , word , count ) VALUES ( 'A-group', 'raj', 0) ;
INSERT INTO test.word_groups (group , word , count ) VALUES ( 'b-group', 'jaj', 0) ;
INSERT INTO test.word_groups (group , word , count ) VALUES ( 'A-group', 'raff', 3) ;
SELECT * FROM word_groups ;
group | word | count
--------- ------ -------
b-group | jaj | 0
A-group | raff | 3
A-group | raj | 0
Скрипт —
val cassandraUrl = "org.apache.spark.sql.cassandra"
val wordGroup: Map[String, String] = Map("table" ->"word_groups",
"keyspace" -> "test", "cluster" -> "test-cluster")
val groupData = {spark.read.format(cassandraUrl).options(wordGroup).load()
.where(col("group") === "b-group")}
groupData.rdd.deleteFromCassandra("sunbird_courses", "word_groups")
Исключение —
java.lang.IllegalArgumentException: requirement failed: Invalid row size: 3 instead of 2.
at scala.Predef$.require(Predef.scala:224)
at com.datastax.spark.connector.writer.SqlRowWriter.readColumnValues(SqlRowWriter.scala:23)
at com.datastax.spark.connector.writer.SqlRowWriter.readColumnValues(SqlRowWriter.scala:12)
at com.datastax.spark.connector.writer.BoundStatementBuilder.bind(BoundStatementBuilder.scala:102)
at com.datastax.spark.connector.writer.GroupingBatchBuilder.next(GroupingBatchBuilder.scala:105)
at com.datastax.spark.connector.writer.GroupingBatchBuilder.next(GroupingBatchBuilder.scala:30)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at com.datastax.spark.connector.writer.GroupingBatchBuilder.foreach(GroupingBatchBuilder.scala:30)
at com.datastax.spark.connector.writer.TableWriter$anonfun$writeInternal$1.apply(TableWriter.scala:229)
at com.datastax.spark.connector.writer.TableWriter$anonfun$writeInternal$1.apply(TableWriter.scala:198)
at com.datastax.spark.connector.cql.CassandraConnector$anonfun$withSessionDo$1.apply(CassandraConnector.scala:112)
at com.datastax.spark.connector.cql.CassandraConnector$anonfun$withSessionDo$1.apply(CassandraConnector.scala:111)
at com.datastax.spark.connector.cql.CassandraConnector.closeResourceAfterUse(CassandraConnector.scala:129)
at com.datastax.spark.connector.cql.CassandraConnector.withSessionDo(CassandraConnector.scala:111)
at com.datastax.spark.connector.writer.TableWriter.writeInternal(TableWriter.scala:198)
at com.datastax.spark.connector.writer.TableWriter.delete(TableWriter.scala:194)
at com.datastax.spark.connector.RDDFunctions$anonfun$deleteFromCassandra$1.apply(RDDFunctions.scala:119)
at com.datastax.spark.connector.RDDFunctions$anonfun$deleteFromCassandra$1.apply(RDDFunctions.scala:119)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:123)
at org.apache.spark.executor.Executor$TaskRunner$anonfun$10.apply(Executor.scala:408)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
21/08/11 09:01:24 WARN TaskSetManager: Lost task 0.0 in stage 11.0 (TID 2953, localhost, executor driver): java.lang.IllegalArgumentException: requirement failed: Invalid row size: 3 instead of 2.
Версия Spark — 2.4.4 и
версия разъема Spark Cassandra — 2.5.0
Искра Кассандра разъем док ссылка — https://github.com/datastax/spark-cassandra-connector/blob/master/doc/5_saving.md#deleting-rows-and-columns
Я пытаюсь удалить все записи этих столбцов, включая первичные ключи.
Существует ли какой-либо обходной путь для этого ?
К вашему сведению — мне нужно удалить все записи группы «A-group» из таблицы word_groups, включая первичные ключи / ключи разделов
Ответ №1:
это интересное изменение в 2.5.x, о котором я не знал — теперь вам нужно иметь правильный размер строки, даже если keyColumns
он указан, раньше это работало без него — похоже на ошибку для меня.
Вам нужно оставить только первичный ключ при удалении всей строки — измените удаление на:
groupData.select("group", "word").rdd.deleteFromCassandra("test", "word_groups")
но в вашем случае еще лучше удалить на основе столбца ключа раздела — в этом случае у вас будет только одно надгробие (вам все равно нужно выбрать только необходимые столбцы):
import com.datastax.spark.connector._
{groupData.select("group").rdd
.deleteFromCassandra("test", "word_groups", keyColumns = SomeColumns("group"))}
И вам даже не нужно читать входные данные из Cassandra — если вы знаете значения ключа раздела, вы можете просто создать RDD и удалить данные (аналогично показанному в документе).:
case class Key (group:String)
{ sc.parallelize(Seq(Key("b-group")))
.deleteFromCassandra("test", "word_groups", keyColumns = SomeColumns("group"))}
Комментарии:
1. Разве это невозможно, не упомянув о некоторых из них? есть ли какой-либо другой способ удалить без упоминания первичных ключей?
2.
java.io.IOException: Failed to prepare statement DELETE "group", "word" FROM "test"."word_groups" WHERE "group" = :"group" AND "word" = :"word": Invalid identifier group for deletion (should not be a PRIMARY KEY part) at com.datastax.spark.connector.writer.TableWriter.com$datastax$spark$connector$writer$TableWriter$$prepareStatement(TableWriter.scala:142)
получение этого исключения при выполнении приведенного выше запроса (SomeColumns("group", "word")
)3.
groupData.rdd.deleteFromCassandra("sunbird_courses", "word_groups",keyColumns = SomeColumns("word", "group"))
Та же ошибка исключения, которую я получаюjava.lang.IllegalArgumentException: requirement failed: Invalid row size: 3 instead of 2.
@Alex Ott4. извините, похоже, что в 2.5.x произошло изменение поведения, я обновил ответ
5. Особенно нам нужно выбрать первичные ключи и правильно вызвать задачу удаления