#apache-spark #apache-kafka #kafka-consumer-api
#apache-spark #apache-kafka #kafka-consumer-api
Вопрос:
Когда я запускаю этот код, я получаю следующую ошибку. Я проверил другой ответ, но он не сработал для меня.
Кто-нибудь знает, как это сделать? Я проверил зависимости.
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaPairInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka.KafkaUtils;
import java.util.*;
/**
* Created by jonas on 10/10/16.
*/
public class SparkStream {
public static void main(String[] args){
SparkConf conf = new SparkConf()
.setAppName("kafka-sandbox")
.setMaster("local[*]");
JavaSparkContext sc = new JavaSparkContext(conf);
JavaStreamingContext ssc = new JavaStreamingContext(sc, new Duration(2000));
Map<String, String> kafkaParams = new HashMap<>();
kafkaParams.put("metadata.broker.list", "localhost:9092");
Set<String> topics = Collections.singleton("Test");
JavaPairInputDStream<String, String> directKafkaStream = KafkaUtils.createDirectStream(ssc, String.class
, String.class, kafka.serializer.StringDecoder.class, kafka.serializer.StringDecoder.class, kafkaParams, topics);
directKafkaStream.foreachRDD(rdd -> {
System.out.println("--- New RDD with " rdd.partitions().size()
" partitions and " rdd.count() " records");
rdd.foreach(record -> System.out.println(record._2));
});
// TODO: processing pipeline
ssc.start();
}
}
Ранее я запускал zookeeper на порту 2181 и Kafka server 0.9.0.0 на порту 9092. Но я получаю следующую ошибку в драйвере Spark:
Exception in thread "main" java.lang.ClassCastException: kafka.cluster.BrokerEndPoint cannot be cast to kafka.cluster.Broker
at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2$$anonfun$3$$anonfun$apply$6$$anonfun$apply$7.apply(KafkaCluster.scala:97)
at scala.Option.map(Option.scala:146)
at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2$$anonfun$3$$anonfun$apply$6.apply(KafkaCluster.scala:97)
at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2$$anonfun$3$$anonfun$apply$6.apply(KafkaCluster.scala:94)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:252)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:252)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:35)
at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:252)
at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2$$anonfun$3.apply(KafkaCluster.scala:94)
at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2$$anonfun$3.apply(KafkaCluster.scala:93)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:252)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:252)
at scala.collection.immutable.Set$Set1.foreach(Set.scala:79)
at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:252)
at scala.collection.AbstractTraversable.flatMap(Traversable.scala:104)
at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2.apply(KafkaCluster.scala:93)
at org.apache.spark.streaming.kafka.KafkaCluster$$anonfun$2.apply(KafkaCluster.scala:92)
at scala.util.Either$RightProjection.flatMap(Either.scala:522)
at org.apache.spark.streaming.kafka.KafkaCluster.findLeaders(KafkaCluster.scala:92)
at org.apache.spark.streaming.kafka.KafkaCluster.getLeaderOffsets(KafkaCluster.scala:186)
at org.apache.spark.streaming.kafka.KafkaCluster.getLeaderOffsets(KafkaCluster.scala:168)
at org.apache.spark.streaming.kafka.KafkaCluster.getLatestLeaderOffsets(KafkaCluster.scala:157)
at org.apache.spark.streaming.kafka.KafkaUtils$$anonfun$5.apply(KafkaUtils.scala:215)
at org.apache.spark.streaming.kafka.KafkaUtils$$anonfun$5.apply(KafkaUtils.scala:211)
at scala.util.Either$RightProjection.flatMap(Either.scala:522)
at org.apache.spark.streaming.kafka.KafkaUtils$.getFromOffsets(KafkaUtils.scala:211)
at org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:484)
at org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:607)
at org.apache.spark.streaming.kafka.KafkaUtils.createDirectStream(KafkaUtils.scala)
at SparkStream.main(SparkStream.java:28)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)
Комментарии:
1. Вы пытались изменить свой
metadata.broker.list
наbootstrap.servers
? Это работает для меня.2. Да, я пробовал это.
Ответ №1:
Убедитесь, что ваши зависимости совместимы друг с другом. Вот некоторые из них, которые работают вместе:
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.10</artifactId>
<version>1.6.2</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>1.6.2</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka_2.10</artifactId>
<version>1.6.2</version>
</dependency>
Ответ №2:
Это похоже на проблему с библиотекой, которую я также отлаживаю. Я использую сервер kafka с версией 0.10.0.0 и Scala версии 2.11. Моя версия ядра / потоковой передачи spark — 2.11: 2.0.1. Потоковая библиотека Spark kafka равна 0.10.0.1. Когда я использую kafka 2.11: 0.10.0.1 lib, я получаю эту ошибку, но когда я использую kafka 2.10: 0.10.0.1,это работает нормально.