#java #apache-kafka #spark-streaming #spark-streaming-kafka
#java #apache-kafka #искровая потоковая передача #искровая потоковая передача -kafka
Вопрос:
Я пытаюсь настроить простое приложение Spark Streaming, которое будет считывать сообщения из темы Kafka.
После большой работы я нахожусь на этом этапе, но получаю исключения, показанные ниже.
Код:
public static void main(String[] args) throws Exception {
String brokers = "my.kafka.broker" ":" "6667";
String topics = "MyKafkaTopic";
// Create context with a 2 seconds batch interval
SparkConf sparkConf = new SparkConf().setAppName("StreamingE")
.setMaster("local[1]")
;
JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, Durations.seconds(2));
Set<String> topicsSet = new HashSet<>(Arrays.asList(topics.split(",")));
Map<String, String> kafkaParams = new HashMap<>();
kafkaParams.put("metadata.broker.list", brokers);
System.out.println("Brokers: " brokers);
// Create direct kafka stream with brokers and topics
JavaPairInputDStream<String, String> messages = KafkaUtils.createDirectStream(
jssc,
String.class,
String.class,
StringDecoder.class,
StringDecoder.class,
kafkaParams,
topicsSet
);
System.out.println("Message received: " messages);
// Start the computation
jssc.start();
jssc.awaitTermination();
}
Который выдает:
[WARNING]
java.lang.reflect.InvocationTargetException
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.codehaus.mojo.exec.ExecJavaMojo$1.run(ExecJavaMojo.java:293)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.IllegalArgumentException: requirement failed: No output operations registered, so nothing to execute
at scala.Predef$.require(Predef.scala:233)
at org.apache.spark.streaming.DStreamGraph.validate(DStreamGraph.scala:161)
at org.apache.spark.streaming.StreamingContext.validate(StreamingContext.scala:542)
at org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:601)
at org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:600)
at org.apache.spark.streaming.api.java.JavaStreamingContext.start(JavaStreamingContext.scala:624)
at com.ncr.dataplatform.api.StreamingE.main(StreamingE.java:66)
В отчаянии я попытался подключиться к Zookeeper:
String brokers = "my.kafka.zookeeper" ":" "2181";
String topics = "MyKafkaTopic";
Но это бросает:
[WARNING]
java.lang.reflect.InvocationTargetException
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.codehaus.mojo.exec.ExecJavaMojo$1.run(ExecJavaMojo.java:293)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.spark.SparkException: java.io.EOFException: Received -1 when reading from channel, socket has likely been closed.
at org.apache.spark.streaming.kafka.KafkaCluster$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
at org.apache.spark.streaming.kafka.KafkaCluster$anonfun$checkErrors$1.apply(KafkaCluster.scala:366)
at scala.util.Either.fold(Either.scala:97)
at org.apache.spark.streaming.kafka.KafkaCluster$.checkErrors(KafkaCluster.scala:365)
at org.apache.spark.streaming.kafka.KafkaUtils$.getFromOffsets(KafkaUtils.scala:222)
at org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:484)
at org.apache.spark.streaming.kafka.KafkaUtils$.createDirectStream(KafkaUtils.scala:607)
at org.apache.spark.streaming.kafka.KafkaUtils.createDirectStream(KafkaUtils.scala)
at com.ncr.dataplatform.api.StreamingE.main(StreamingE.java:53)
Соответствующие зависимости:
<properties>
<spark.version>1.6.2</spark.version>
<kafka.version>0.8.2.1</kafka.version>
</properties>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
<version>${kafka.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.10</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka_2.10</artifactId>
<version>${spark.version}</version>
</dependency>
Я хотел бы спросить:
Должен ли я подключаться к брокеру Kafka или серверам zookeeper?
Что я делаю не так в своем коде, чтобы не иметь возможности подключаться / прослушивать входящие сообщения?
Ответ №1:
Вызвано: java.lang.Исключение IllegalArgumentException: сбой требования: операции вывода не зарегистрированы, поэтому ничего не нужно выполнять
Способ работы Spark заключается в том, что большинство его преобразований являются ленивыми. Когда вы хотите, чтобы график выполнялся, вам необходимо зарегистрировать выходное преобразование. Выходные преобразования выполняются в форме foreachRDD
, print
, collect
или count
(и более).
Вместо использования println
вызовите DStream.print()
:
// Create direct kafka stream with brokers and topics
JavaPairInputDStream<String, String> messages = KafkaUtils.createDirectStream(
jssc,
String.class,
String.class,
StringDecoder.class,
StringDecoder.class,
kafkaParams,
topicsSet
);
messages.print();
// Start the computation
jssc.start();
jssc.awaitTermination();
Что касается Kafka, metadata.broker.list
необходимо указать адреса ваших узлов Kafka broker. Существует отдельный ключ с именем zookeeper.connect
для предоставления адреса ZooKeepers.
Ответ №2:
import static org.apache.spark.streaming.kafka.KafkaUtils.createStream;
import java.util.HashMap;
import java.util.Map;
import org.apache.spark.SparkConf;
import org.apache.spark.storage.StorageLevel;
import org.apache.spark.streaming.Seconds;
import org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.collect.ImmutableMap;
import java.io.FileInputStream;
import java.io.InputStream;
import java.util.Properties;
import kafka.serializer.StringDecoder;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.streaming.api.java.JavaDStream;
import scala.Tuple2;
public class KafkaKerberosReader {
// Spark information
private static SparkConf conf;
private static String appName = "KafkaKerberosReader";
private static JavaStreamingContext context;
private static final Logger logger = LoggerFactory.getLogger(KafkaKerberosReader.class.getSimpleName());
// Kafka information
private static String zkQuorum = "";
private static String kfkQuorum = "";
private static String group = "";
private static Integer threads = 1;
private static Map<String, String> kafkaParams = new HashMap<String, String>();
public static void loadProps() {
Properties prop = new Properties();
try {
logger.info("------------------------------loadProps");
InputStream input = new FileInputStream("config.properties");
prop.load(input);
System.out.println("loadProps loaded:" prop);
appName = prop.getProperty("app.name");
autoOffsetReset = prop.getProperty("auto.offset.reset");
secProtocol = prop.getProperty("security.protocol");
kfkQuorum = bServers = prop.getProperty("bootstrap.servers");
zkQuorum = zServers = prop.getProperty("zookeeper.connect");
group = kGroupId = prop.getProperty("group.id");
kKeyTabFile = prop.getProperty("kerberos.keytabfile");
kJaas = prop.getProperty("kerberos.jaas");
kTopic = prop.getProperty("kafka.topic");
kPrincipal = prop.getProperty("kerberos.principal");
logger.info("loadProps:Props:zk:" zServers ",issecure:" secProtocol ",autoOffsetReset:"
autoOffsetReset ",bServers:" bServers ",kJaas:" kJaas ",keytab:" kKeyTabFile
", kTopic:" kTopic ", kPrincipal" kPrincipal);
if (kPrincipal != null amp;amp; kKeyTabFile != null) {
logger.info("---------------------Logging into Kerberos");
org.apache.hadoop.conf.Configuration conf = new org.apache.hadoop.conf.Configuration();
conf.set("hadoop.security.authentication", "Kerberos");
UserGroupInformation.setConfiguration(conf);
UserGroupInformation.loginUserFromKeytabAndReturnUGI(kPrincipal, kKeyTabFile);
}
} catch (Exception e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
logger.info("------------------------------main:START");
loadProps();
// Configure the application
configureSpark();
// Create the context
context = createContext(kTopic);
// Stop the application
context.start();
context.awaitTermination();
logger.info("main:END");
}
/**
* ----------------------------------------------- | This is the kernel of
* the spark application | -----------------------------------------------
*
*/
private static JavaStreamingContext createContext(String topic) {
logger.info("-------------------------------------------------------");
logger.info("| Starting: {} |", appName);
logger.info("| kafkaParams: |", kafkaParams);
logger.info("-------------------------------------------------------");
// Create the spark streaming context
context = new JavaStreamingContext(conf, Seconds.apply(5));
// Read from a Kerberized Kafka
JavaPairReceiverInputDStream<String, String> kafkaStream = createStream(context, zkQuorum, "Default",
ImmutableMap.of(topic, threads), StorageLevel.MEMORY_AND_DISK_SER());
kafkaStream.print();
JavaDStream<String> lines = kafkaStream.map(new Function<Tuple2<String, String>, String>() {
private static final long serialVersionUID = 1L;
@Override
public String call(Tuple2<String, String> tuple2) {
return tuple2._2();
}
});
lines.print();
// kafkaStream.map(message -> message._2.toLowerCase()).print();
logger.info("-------------------------------------------------------");
logger.info("| Finished: {} |", appName);
logger.info("-------------------------------------------------------");
return context;
}
/**
* Create a SparkConf and configure it.
*
*/
private static void configureSpark() {
logger.info("------------------------------Initializing '%s'.", appName);
conf = new SparkConf().setAppName(appName);
if (group != null amp;amp; group.trim().length() != 0) {
kafkaParams.put("group.id", group);
}
kafkaParams.put("auto.offset.reset", autoOffsetReset);
kafkaParams.put("security.protocol", secProtocol);
kafkaParams.put("bootstrap.servers", kfkQuorum);
kafkaParams.put("zookeeper.connect", zkQuorum);
logger.info(">- Configuration done with the follow properties:");
logger.info(conf.toDebugString());
}
static String autoOffsetReset, secProtocol, bServers, zServers, kGroupId, kKeyTabFile, kJaas, kTopic, kPrincipal;
}
Свойства:
app.name=KafkaKerberosReader
auto.offset.reset=smallest
security.protocol=PLAINTEXTSASL
bootstrap.servers=sandbox.hortonworks.com:6667
zookeeper.connect=sandbox.hortonworks.com:2181
group.id=Default
kafka.topic=ifinboundprecint
//#kerberos.keytabfile=/etc/hello.keytab
//#kerberos.jaas=/etc/kafka/conf/kafka_client_jaas.conf
//#kerberos.principal=hello@EXAMPLE.COM
Вызов:
искровая отправка — основная пряжа -клиент в режиме развертывания -число исполнителей 3 -исполнитель-память 500M -исполнитель-ядра 3 -класс com.my.spark.KafkaKerberosReader ~/SparkStreamKafkaTest-1.0-SNAPSHOT.jar