Интеграция Apache Spark с Kafka

#apache-spark #apache-kafka #spark-structured-streaming #spark-kafka-integration

#apache-spark #apache-kafka #spark-структурированная потоковая передача #spark-kafka-интеграция

Вопрос:

Я следую курсу Udemy о Kafka и Spark, и я изучаю интеграцию apache spark с Kafka

Ниже приведен код apache spark

 SparkSession session = SparkSession.builder().appName("KafkaConsumer").master("local[*]").getOrCreate();
  session.sparkContext().setLogLevel("ERROR");
  Dataset<Row> df = session
    .readStream()
    .format("kafka")
    .option("kafka.bootstrap.servers", "localhost:9092")
    .option("subscribe", "second_topic").load();
            
df.show();
  

И ниже приводится содержимое pom.xml файл

 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>com.example.kafka.spark</groupId>
  <artifactId>Kafka-Spark-Integration-Code</artifactId>
  <version>0.0.1-SNAPSHOT</version>
  <dependencies>
    <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core -->
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.12</artifactId>
        <version>3.0.0</version>
    </dependency> 
    <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql -->
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql_2.12</artifactId>
        <version>3.0.0</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming -->
<!--    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming_2.12</artifactId>
        <version>3.0.0</version>
    </dependency> -->
    
    <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql-kafka-0-10 -->
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql-kafka-0-10_2.12</artifactId>
        <version>3.0.0</version>
   </dependency>
    
 </dependencies>
</project>
  

Однако, когда я запускаю код, я получаю ошибку ниже, которую я не могу разрешить. Я использую openjdk 8 и spark 3 в MX Linux. Спасибо

 exception in thread "main" java.lang.ClassFormatError: Invalid code attribute name index 24977 in class file org/apache/spark/sql/execution/columnar/InMemoryRelation
    at java.lang.ClassLoader.defineClass1(Native Method)
    at java.lang.ClassLoader.defineClass(ClassLoader.java:756)
    at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
    at java.net.URLClassLoader.defineClass(URLClassLoader.java:468)
    at java.net.URLClassLoader.access$100(URLClassLoader.java:74)
    at java.net.URLClassLoader$1.run(URLClassLoader.java:369)
    at java.net.URLClassLoader$1.run(URLClassLoader.java:363)
    at java.security.AccessController.doPrivileged(Native Method)
    at java.net.URLClassLoader.findClass(URLClassLoader.java:362)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
    at org.apache.spark.sql.internal.SharedState.<init>(SharedState.scala:83)
    at org.apache.spark.sql.SparkSession.$anonfun$sharedState$1(SparkSession.scala:132)
    at scala.Option.getOrElse(Option.scala:189)
    at org.apache.spark.sql.SparkSession.sharedState$lzycompute(SparkSession.scala:132)
    at org.apache.spark.sql.SparkSession.sharedState(SparkSession.scala:131)
    at org.apache.spark.sql.internal.BaseSessionStateBuilder.build(BaseSessionStateBuilder.scala:323)
    at org.apache.spark.sql.SparkSession$.org$apache$spark$sql$SparkSession$$instantiateSessionState(SparkSession.scala:1107)
    at org.apache.spark.sql.SparkSession.$anonfun$sessionState$2(SparkSession.scala:157)
    at scala.Option.getOrElse(Option.scala:189)
    at org.apache.spark.sql.SparkSession.sessionState$lzycompute(SparkSession.scala:155)
    at org.apache.spark.sql.SparkSession.sessionState(SparkSession.scala:152)
    at org.apache.spark.sql.streaming.DataStreamReader.<init>(DataStreamReader.scala:519)
    at org.apache.spark.sql.SparkSession.readStream(SparkSession.scala:657)
    at example.code.spark.kafka.KafkaSparkConsumer.main(KafkaSparkConsumer.java:19)
  

Ответ №1:

Вы можете следовать примерам, приведенным в Руководстве по интеграции Structured Streaming Kafka:

 SparkSession session = SparkSession.builder()
  .appName("KafkaConsumer")
  .master("local[*]")
  .getOrCreate();

Dataset<Row> df = spark
  .readStream()
  .format("kafka")
  .option("kafka.bootstrap.servers", "localhost:9092")
  .option("subscribe", "second_topic")
  .load()
  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)");
  

чтобы использовать эти данные. В руководстве по программированию структурированной потоковой передачи показано, как распечатать данные на консоль:

 StreamingQuery query = df
  .writeStream()
  .format("console")
  .outputMode("append")
  .option("checkpointLocation", "path/to/checkpoint/dir")
  .start();

query.awaitTermination();
  

Комментарии:

1. Спасибо. Я попробовал приведенный выше код, но та же ошибка. Я попробовал тот же пример на компьютере с Windows, и он не удался с другой ошибкой (зависимость kafka не найдена), но передал метод ReadStream из приведенного выше кода. интересно, должно ли это что-то делать с версией Java или средой, которую я запускаю на компьютере Linux?

2. Да, я смог запустить очень простой код spark. Я использую eclipse и запускаю код из eclipse. Это первый раз, когда я пытаюсь использовать потоковое приложение, поэтому я не знаю, что происходит, хотя у меня то же самое pom.xml файл в обеих средах (Windows и Linux), поэтому я считаю, что зависимости одинаковы.

3. Итак, я отключил openjdk 8 и установил java из oracle, и он начал работать. Однако теперь моя потоковая работа просто завершается. Я думал, что буду работать непрерывно.

4. Вы убедились, что нужно позвонить query.awaitTermination ?

5. Спасибо, я пропустил это. Большое вам спасибо. Это решило проблему.