Соединитель Spark Scala Cassandra удалить все все строки не удается с требованием исключения IllegalArgumentException не удалось Исключение

#dataframe #apache-spark #cassandra #rdd #spark-cassandra-connector

Вопрос:

Создать таблицу —

 CREATE TABLE test.word_groups (group text, word text, count int,PRIMARY KEY (group,word));
 

Вставка Данных —

 INSERT INTO test.word_groups (group , word , count ) VALUES ( 'A-group', 'raj', 0) ;
INSERT INTO test.word_groups (group , word , count ) VALUES ( 'b-group', 'jaj', 0) ;
INSERT INTO test.word_groups (group , word , count ) VALUES ( 'A-group', 'raff', 3) ;

 SELECT * FROM word_groups ;

 group   | word | count
--------- ------ -------
 b-group |  jaj |     0
 A-group | raff |     3
 A-group |  raj |     0

 

Скрипт —

 val cassandraUrl = "org.apache.spark.sql.cassandra"
val wordGroup: Map[String, String] = Map("table" ->"word_groups", 
  "keyspace" -> "test", "cluster" -> "test-cluster")
val groupData = {spark.read.format(cassandraUrl).options(wordGroup).load()
  .where(col("group") === "b-group")}
groupData.rdd.deleteFromCassandra("sunbird_courses", "word_groups")

 

Исключение —

 java.lang.IllegalArgumentException: requirement failed: Invalid row size: 3 instead of 2.
    at scala.Predef$.require(Predef.scala:224)
    at com.datastax.spark.connector.writer.SqlRowWriter.readColumnValues(SqlRowWriter.scala:23)
    at com.datastax.spark.connector.writer.SqlRowWriter.readColumnValues(SqlRowWriter.scala:12)
    at com.datastax.spark.connector.writer.BoundStatementBuilder.bind(BoundStatementBuilder.scala:102)
    at com.datastax.spark.connector.writer.GroupingBatchBuilder.next(GroupingBatchBuilder.scala:105)
    at com.datastax.spark.connector.writer.GroupingBatchBuilder.next(GroupingBatchBuilder.scala:30)
    at scala.collection.Iterator$class.foreach(Iterator.scala:891)
    at com.datastax.spark.connector.writer.GroupingBatchBuilder.foreach(GroupingBatchBuilder.scala:30)
    at com.datastax.spark.connector.writer.TableWriter$anonfun$writeInternal$1.apply(TableWriter.scala:229)
    at com.datastax.spark.connector.writer.TableWriter$anonfun$writeInternal$1.apply(TableWriter.scala:198)
    at com.datastax.spark.connector.cql.CassandraConnector$anonfun$withSessionDo$1.apply(CassandraConnector.scala:112)
    at com.datastax.spark.connector.cql.CassandraConnector$anonfun$withSessionDo$1.apply(CassandraConnector.scala:111)
    at com.datastax.spark.connector.cql.CassandraConnector.closeResourceAfterUse(CassandraConnector.scala:129)
    at com.datastax.spark.connector.cql.CassandraConnector.withSessionDo(CassandraConnector.scala:111)
    at com.datastax.spark.connector.writer.TableWriter.writeInternal(TableWriter.scala:198)
    at com.datastax.spark.connector.writer.TableWriter.delete(TableWriter.scala:194)
    at com.datastax.spark.connector.RDDFunctions$anonfun$deleteFromCassandra$1.apply(RDDFunctions.scala:119)
    at com.datastax.spark.connector.RDDFunctions$anonfun$deleteFromCassandra$1.apply(RDDFunctions.scala:119)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:123)
    at org.apache.spark.executor.Executor$TaskRunner$anonfun$10.apply(Executor.scala:408)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
21/08/11 09:01:24 WARN TaskSetManager: Lost task 0.0 in stage 11.0 (TID 2953, localhost, executor driver): java.lang.IllegalArgumentException: requirement failed: Invalid row size: 3 instead of 2.

 

Версия Spark — 2.4.4 и
версия разъема Spark Cassandra — 2.5.0

Искра Кассандра разъем док ссылка — https://github.com/datastax/spark-cassandra-connector/blob/master/doc/5_saving.md#deleting-rows-and-columns

Я пытаюсь удалить все записи этих столбцов, включая первичные ключи.

Существует ли какой-либо обходной путь для этого ?

К вашему сведению — мне нужно удалить все записи группы «A-group» из таблицы word_groups, включая первичные ключи / ключи разделов

Ответ №1:

это интересное изменение в 2.5.x, о котором я не знал — теперь вам нужно иметь правильный размер строки, даже если keyColumns он указан, раньше это работало без него — похоже на ошибку для меня.

Вам нужно оставить только первичный ключ при удалении всей строки — измените удаление на:

 groupData.select("group", "word").rdd.deleteFromCassandra("test", "word_groups")
 

но в вашем случае еще лучше удалить на основе столбца ключа раздела — в этом случае у вас будет только одно надгробие (вам все равно нужно выбрать только необходимые столбцы):

 import com.datastax.spark.connector._
{groupData.select("group").rdd
  .deleteFromCassandra("test", "word_groups", keyColumns = SomeColumns("group"))}
 

И вам даже не нужно читать входные данные из Cassandra — если вы знаете значения ключа раздела, вы можете просто создать RDD и удалить данные (аналогично показанному в документе).:

 case class Key (group:String)
{ sc.parallelize(Seq(Key("b-group")))
   .deleteFromCassandra("test", "word_groups", keyColumns = SomeColumns("group"))}
 

Комментарии:

1. Разве это невозможно, не упомянув о некоторых из них? есть ли какой-либо другой способ удалить без упоминания первичных ключей?

2. java.io.IOException: Failed to prepare statement DELETE "group", "word" FROM "test"."word_groups" WHERE "group" = :"group" AND "word" = :"word": Invalid identifier group for deletion (should not be a PRIMARY KEY part) at com.datastax.spark.connector.writer.TableWriter.com$datastax$spark$connector$writer$TableWriter$$prepareStatement(TableWriter.scala:142) получение этого исключения при выполнении приведенного выше запроса ( SomeColumns("group", "word") )

3. groupData.rdd.deleteFromCassandra("sunbird_courses", "word_groups",keyColumns = SomeColumns("word", "group")) Та же ошибка исключения, которую я получаю java.lang.IllegalArgumentException: requirement failed: Invalid row size: 3 instead of 2. @Alex Ott

4. извините, похоже, что в 2.5.x произошло изменение поведения, я обновил ответ

5. Особенно нам нужно выбрать первичные ключи и правильно вызвать задачу удаления