Искра не использует DirectJoin поверх DSE

#apache-spark #cassandra #spark-streaming #spark-cassandra-connector #dse

#apache-spark #cassandra #искровая потоковая передача #spark-cassandra-connector #dse

Вопрос:

Я разрабатываю задачу потоковой передачи Spark, которая объединяет данные из stream с таблицей Cassandra. Как вы можете видеть в плане Explain Прямое соединение не используется. Согласно документу DSE Прямое соединение используется, когда (размер таблицы * directJoinSizeRatio)> размер ключей. В моем случае таблица содержит миллионы записей, а ключи — это только одна запись (потоковая передача формы), поэтому я ожидаю, что используется прямое соединение. Таблица radice_polizza имеет только столбец id_cod_polizza в качестве jey раздела. Версия соединителя: 2.5.1. Версия DSE: 6.7.6.

 *Project [id_cod_polizza#86L, progressivo#11, id3_numero_polizza#25, id3_cod_compagnia#21]
 - *SortMergeJoin [id_cod_polizza#86L], [id_cod_polizza#10L], Inner
   :- *Sort [id_cod_polizza#86L ASC NULLS FIRST], false, 0
   :   - Exchange hashpartitioning(id_cod_polizza#86L, 200)
   :      - *Project [value#84L AS id_cod_polizza#86L]
   :         - *SerializeFromObject [input[0, bigint, false] AS value#84L]
   :            - Scan ExternalRDDScan[obj#83L]
    - *Sort [id_cod_polizza#10L ASC NULLS FIRST], false, 0
       - Exchange hashpartitioning(id_cod_polizza#10L, 200)
          - *Scan org.apache.spark.sql.cassandra.CassandraSourceRelation [id_cod_polizza#10L,progressivo#11,id3_numero_polizza#25,id3_cod_compagnia#21] ReadSchema: struct<id_cod_polizza:bigint,progressivo:string,id3_numero_polizza:string,id3_cod_compagnia:string>
  

Вот мой код:

  var radice_polizza = spark
      .read
      .format("org.apache.spark.sql.cassandra")
      .options(Map("table" -> "radice_polizza", "keyspace" -> "preferred_temp"))
      .load().select(
      "id_cod_polizza",
      "progressivo",
      "id3_numero_polizza",
      "id3_cod_compagnia")

if(mode == LoadMode.DIFF){
   val altered_data_df = altered_data.idCodPolizzaList.toDF("id_cod_polizza")
   radice_polizza = altered_data_df.join(radice_polizza, Seq("id_cod_polizza"))
   radice_polizza.explain()
}
  

Принудительное прямое соединение работает.

 radice_polizza = altered_data_df.join(radice_polizza.directJoin(AlwaysOn), Seq("id_cod_polizza"))
  
 == Physical Plan ==
*Project [id_cod_polizza#58L, progressivo#11, id3_numero_polizza#25, id3_cod_compagnia#21]
 - DSE Direct Join [id_cod_polizza = id_cod_polizza#58L] preferred_temp.radice_polizza - Reading (id_cod_polizza, progressivo, id3_numero_polizza, id3_cod_compagnia) Pushed {}
    - *Project [value#56L AS id_cod_polizza#58L]
       - *SerializeFromObject [input[0, bigint, false] AS value#56L]
          - Scan ExternalRDDScan[obj#55L]

  

Почему прямое соединение не используется автоматически?

Спасибо

Ответ №1:

Прямое соединение DSE включается автоматически при разработке приложения с использованием зависимостей DSE Analytics, которые предоставляются при выполнении задания в DSE Analytics. Для этого вам нужно указать следующую зависимость и не использовать Spark Cassandra Connector:

     <dependency>
      <groupId>com.datastax.dse</groupId>
      <artifactId>dse-spark-dependencies</artifactId>
      <version>${dse.version}</version>
      <scope>provided</scope>
    </dependency>
  

если вы запускаете свою работу на внешнем Spark, вам необходимо явно включить прямое соединение, указав свойство конфигурации Spark spark.sql.extensions со значением com.datastax.spark.connector.CassandraSparkExtensions .

У меня есть длинный пост в блоге о соединении данных с Cassandra, в котором все это объясняется.

Комментарии:

1. Спасибо Алексу за ответ, к сожалению, он, похоже, не работает. Я запускаю свою задачу с помощью «dse spark-submit ..»

2. если вы работаете с dse spark-submit , вам необходимо удалить Spark Cassandra Connector 2.5 и использовать зависимости DSE, как описано…