#apache-spark #cassandra #datastax #spark-cassandra-connector
#apache-spark #cassandra #datastax #spark-cassandra-connector
Вопрос:
- Сбой записи в Spark connector с
java.lang.IllegalArgumentException: udtId is not a field defined in this definition
ошибкой при использовании имен полей, чувствительных к регистру - Мне нужны поля в таблице Cassandra для сохранения регистра. Поэтому я использовал кавычки для их создания.
Моя схема Cassandra
CREATE TYPE my_keyspace.my_udt (
"udtId" text,
"udtValue" text
);
CREATE TABLE my_keyspace.my_table (
"id" text PRIMARY KEY,
"someCol" text,
"udtCol" list<frozen<my_udt>>
);
Моя схема Spark DataFrame такова
root
|-- id: string (nullable = true)
|-- someCol: string (nullable = true)
|-- udtCol: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- udtId: string (nullable = true)
|-- udtValue: string (nullable = true)
- Есть ли какие-либо другие варианты, позволяющие заставить эту запись работать, кроме определения моего udt с именами в нижнем регистре? Использование их в нижнем регистре заставило бы меня вызывать код управления обращениями везде, где это используется, и я хотел бы избежать этого?
- Поскольку я не смог успешно записать, я все же попробовал read? Является ли это проблемой и для чтения?
Комментарии:
1. какая версия SCC?
2. Я использую Connector 2.4.2. JDK 11 и Scala 2.12
3. забыл спросить — какая версия Spark? глядя на Scala 2.12, я подозреваю, что это Spark 3.0?
4. Нет проблем. Я должен был также упомянуть. Я использую Spark 2.4.5. К вашему сведению, я попытался добавить кавычки к полям udt во фрейме данных, и это тоже не помогло
Ответ №1:
Вам необходимо перейти на Spark Cassandra Connector 2.5.0 — я не могу найти конкретную фиксацию, которая исправляет это, или конкретную Jira, в которой упоминается это — я подозреваю, что сначала это было исправлено в версии DataStax, а затем выпущено как часть слияния, объявленного здесь.
Вот как это работает в SCC 2.5.0 Spark 2.4.6, в то время как в SCC 2.4.2 Spark 2.4.6 происходит сбой:
scala> import org.apache.spark.sql.cassandra._
import org.apache.spark.sql.cassandra._
scala> val data = spark.read.cassandraFormat("my_table", "test").load()
data: org.apache.spark.sql.DataFrame = [id: string, someCol: string ... 1 more field]
scala> val data2 = data.withColumn("id", concat(col("id"), lit("222")))
data2: org.apache.spark.sql.DataFrame = [id: string, someCol: string ... 1 more field]
scala> data2.write.cassandraFormat("my_table", "test").mode("append").save()
Комментарии:
1. Это работает с 2.5.0. Я рассмотрю возможность обновления до версии разъема 2.5.0.
2. пока вы не используете какой-либо код, зависящий от классов драйверов Java, например,
withClusterDo
,withSessionDo
, тогда миграция проста. Кроме того, этот выпуск содержит множество оптимизаций, таких как прямое соединение и т.д. — Прочитайте об этом связанный пост в блоге