#apache-spark #cassandra #spark-streaming #spark-cassandra-connector #dse
#apache-spark #cassandra #искровая потоковая передача #spark-cassandra-connector #dse
Вопрос:
Я разрабатываю задачу потоковой передачи Spark, которая объединяет данные из stream с таблицей Cassandra. Как вы можете видеть в плане Explain Прямое соединение не используется. Согласно документу DSE Прямое соединение используется, когда (размер таблицы * directJoinSizeRatio)> размер ключей. В моем случае таблица содержит миллионы записей, а ключи — это только одна запись (потоковая передача формы), поэтому я ожидаю, что используется прямое соединение. Таблица radice_polizza имеет только столбец id_cod_polizza в качестве jey раздела. Версия соединителя: 2.5.1. Версия DSE: 6.7.6.
*Project [id_cod_polizza#86L, progressivo#11, id3_numero_polizza#25, id3_cod_compagnia#21]
- *SortMergeJoin [id_cod_polizza#86L], [id_cod_polizza#10L], Inner
:- *Sort [id_cod_polizza#86L ASC NULLS FIRST], false, 0
: - Exchange hashpartitioning(id_cod_polizza#86L, 200)
: - *Project [value#84L AS id_cod_polizza#86L]
: - *SerializeFromObject [input[0, bigint, false] AS value#84L]
: - Scan ExternalRDDScan[obj#83L]
- *Sort [id_cod_polizza#10L ASC NULLS FIRST], false, 0
- Exchange hashpartitioning(id_cod_polizza#10L, 200)
- *Scan org.apache.spark.sql.cassandra.CassandraSourceRelation [id_cod_polizza#10L,progressivo#11,id3_numero_polizza#25,id3_cod_compagnia#21] ReadSchema: struct<id_cod_polizza:bigint,progressivo:string,id3_numero_polizza:string,id3_cod_compagnia:string>
Вот мой код:
var radice_polizza = spark
.read
.format("org.apache.spark.sql.cassandra")
.options(Map("table" -> "radice_polizza", "keyspace" -> "preferred_temp"))
.load().select(
"id_cod_polizza",
"progressivo",
"id3_numero_polizza",
"id3_cod_compagnia")
if(mode == LoadMode.DIFF){
val altered_data_df = altered_data.idCodPolizzaList.toDF("id_cod_polizza")
radice_polizza = altered_data_df.join(radice_polizza, Seq("id_cod_polizza"))
radice_polizza.explain()
}
Принудительное прямое соединение работает.
radice_polizza = altered_data_df.join(radice_polizza.directJoin(AlwaysOn), Seq("id_cod_polizza"))
== Physical Plan ==
*Project [id_cod_polizza#58L, progressivo#11, id3_numero_polizza#25, id3_cod_compagnia#21]
- DSE Direct Join [id_cod_polizza = id_cod_polizza#58L] preferred_temp.radice_polizza - Reading (id_cod_polizza, progressivo, id3_numero_polizza, id3_cod_compagnia) Pushed {}
- *Project [value#56L AS id_cod_polizza#58L]
- *SerializeFromObject [input[0, bigint, false] AS value#56L]
- Scan ExternalRDDScan[obj#55L]
Почему прямое соединение не используется автоматически?
Спасибо
Ответ №1:
Прямое соединение DSE включается автоматически при разработке приложения с использованием зависимостей DSE Analytics, которые предоставляются при выполнении задания в DSE Analytics. Для этого вам нужно указать следующую зависимость и не использовать Spark Cassandra Connector:
<dependency>
<groupId>com.datastax.dse</groupId>
<artifactId>dse-spark-dependencies</artifactId>
<version>${dse.version}</version>
<scope>provided</scope>
</dependency>
если вы запускаете свою работу на внешнем Spark, вам необходимо явно включить прямое соединение, указав свойство конфигурации Spark spark.sql.extensions
со значением com.datastax.spark.connector.CassandraSparkExtensions
.
У меня есть длинный пост в блоге о соединении данных с Cassandra, в котором все это объясняется.
Комментарии:
1. Спасибо Алексу за ответ, к сожалению, он, похоже, не работает. Я запускаю свою задачу с помощью «dse spark-submit ..»
2. если вы работаете с
dse spark-submit
, вам необходимо удалить Spark Cassandra Connector 2.5 и использовать зависимости DSE, как описано…