Сбой записи в Spark Cassandra UDT с учетом регистра имен

#apache-spark #cassandra #datastax #spark-cassandra-connector

#apache-spark #cassandra #datastax #spark-cassandra-connector

Вопрос:

  • Сбой записи в Spark connector с java.lang.IllegalArgumentException: udtId is not a field defined in this definition ошибкой при использовании имен полей, чувствительных к регистру
  • Мне нужны поля в таблице Cassandra для сохранения регистра. Поэтому я использовал кавычки для их создания.

Моя схема Cassandra

 CREATE TYPE my_keyspace.my_udt (
  "udtId" text,
  "udtValue" text
);

CREATE TABLE my_keyspace.my_table (
  "id" text PRIMARY KEY,
  "someCol" text,
  "udtCol" list<frozen<my_udt>>
);
  

Моя схема Spark DataFrame такова

 root
 |-- id: string (nullable = true)
 |-- someCol: string (nullable = true)
 |-- udtCol: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- udtId: string (nullable = true)
           |-- udtValue: string (nullable = true)
  
  • Есть ли какие-либо другие варианты, позволяющие заставить эту запись работать, кроме определения моего udt с именами в нижнем регистре? Использование их в нижнем регистре заставило бы меня вызывать код управления обращениями везде, где это используется, и я хотел бы избежать этого?
  • Поскольку я не смог успешно записать, я все же попробовал read? Является ли это проблемой и для чтения?

Комментарии:

1. какая версия SCC?

2. Я использую Connector 2.4.2. JDK 11 и Scala 2.12

3. забыл спросить — какая версия Spark? глядя на Scala 2.12, я подозреваю, что это Spark 3.0?

4. Нет проблем. Я должен был также упомянуть. Я использую Spark 2.4.5. К вашему сведению, я попытался добавить кавычки к полям udt во фрейме данных, и это тоже не помогло

Ответ №1:

Вам необходимо перейти на Spark Cassandra Connector 2.5.0 — я не могу найти конкретную фиксацию, которая исправляет это, или конкретную Jira, в которой упоминается это — я подозреваю, что сначала это было исправлено в версии DataStax, а затем выпущено как часть слияния, объявленного здесь.

Вот как это работает в SCC 2.5.0 Spark 2.4.6, в то время как в SCC 2.4.2 Spark 2.4.6 происходит сбой:

 scala> import org.apache.spark.sql.cassandra._
import org.apache.spark.sql.cassandra._

scala> val data = spark.read.cassandraFormat("my_table", "test").load()
data: org.apache.spark.sql.DataFrame = [id: string, someCol: string ... 1 more field]

scala> val data2 = data.withColumn("id", concat(col("id"), lit("222")))
data2: org.apache.spark.sql.DataFrame = [id: string, someCol: string ... 1 more field]

scala> data2.write.cassandraFormat("my_table", "test").mode("append").save()
  

Комментарии:

1. Это работает с 2.5.0. Я рассмотрю возможность обновления до версии разъема 2.5.0.

2. пока вы не используете какой-либо код, зависящий от классов драйверов Java, например, withClusterDo , withSessionDo , тогда миграция проста. Кроме того, этот выпуск содержит множество оптимизаций, таких как прямое соединение и т.д. — Прочитайте об этом связанный пост в блоге