#java #apache-spark #apache-kafka #spark-structured-streaming
#java #apache-искра #апачи-кафка #spark-structured-streaming
Вопрос:
В настоящее время у меня есть ForEachWriter, который работает с типизированным набором данных, поступающим из Kafka. Все свойства, кроме заголовков для каждого сообщения, отображаются правильно внутри ForEachWriter. Код для репликации находится внизу.
Схема в точности такая, как она взята из программы чтения потоков данных для format Kafka.
root
|-- topic: string (nullable = true)
|-- key: string (nullable = true)
|-- value: string (nullable = true)
|-- timestamp: string (nullable = true)
|-- headers: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- key: string (nullable = true)
| | |-- value: binary (nullable = true)
Заголовки явно находятся в печатаемом столбце набора данных.
------------ -------- ------- ----------------------- ----------------------------------
|topic |key |value |timestamp |headers |
------------ -------- ------- ----------------------- ----------------------------------
|controlTopic| |TEXT |2020-11-24 16:07:33.09 |[[SAMPLE, TEXT]] |
|controlTopic| |2Header|2020-11-24 16:43:46.755|[[HEADER1, VAL1], [HEADER2, VAL2]]|
------------ -------- ------- ----------------------- ----------------------------------
И соответствующее количество объектов KeyValue даже отображается в свойстве заголовка объекта, но их значения равны null. Например, в первом сообщении будет отображаться один элемент в списке (изображение слева), а во втором сообщении будут отображаться два элемента в списке (изображение справа).
Код выглядит следующим образом:
KafkaMessage.java
public class KafkaMessage {
public String topic;
public String key;
public String value;
public String timestamp;
public List<KV> headers;
public KafkaMessage() {
}
public KafkaMessage(String topic, String key, String value, String timestamp, List<KV> headers) {
this.topic = topic;
this.key = key;
this.value = value;
this.timestamp = timestamp;
this.headers = headers;
}
}
KV.java
public class KV {
public String key;
public byte[] value;
public KV() {
}
public KV(String key, byte[] value) {
this.key = key;
this.value = value;
}
}
MySparkWriter.java
public class MySparkWriter extends ForeachWriter<KafkaMessage> {
@Override
public boolean open(long partitionId, long epochId) {
// open connection, not needed in this example
return true;
}
@Override
public void process(KafkaMessage kafkaMessage) {
// here headers will not be null, if there are headers
List<KV> headers = kafkaMessage.headers;
for (KV kv : headers) {
System.out.println(kv.key != null); // kv.key will always be null, will only print in local mode
}
}
@Override
public void close(Throwable errorOrNull) {
// close connection, not needed in this example
}
}
MySparkApp.java Обратите внимание, что это единственный класс, в котором какой-то код опущен, все, что вам нужно, это создать контекст Spark и вызвать startSpark()
public class MySparkApp {
...
public StreamingQuery startSpark() throws TimeoutException {
// Get Input
DataStreamReader dsReader = this.sparkSession.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("includeHeaders", true)
.option("subscribe", "controlTopic");
Dataset<KafkaMessage> messages = dsReader.load()
.selectExpr("CAST(topic AS STRING)", "CAST(key AS STRING)", "CAST(value AS STRING)", "CAST(timestamp AS STRING)", "headers")
.as(ExpressionEncoder.javaBean(KafkaMessage.class));
// debug prints
messages.writeStream()
.format("console")
.outputMode("update")
.trigger(Trigger.ProcessingTime("2 seconds"))
.option("truncate", "false")
.start();
messages.printSchema();
// Start Streaming
return messages
.writeStream()
.trigger(Trigger.ProcessingTime("2 seconds"))
.foreach(this.sparkWriter) // this is a MySparkWriter instance
.option("checkpointLocation", "/checkpoint/" sparkSession.sparkContext().applicationId())
.start();
}
...
}
Комментарии:
1. любое решение для этого? когда я включаю включение заголовков, я получаю исключение ниже, вызванное: org.codehaus. джанино. Исключение InternalCompilerException: два неабстрактных метода «public int scala.collection. TraversableOnce.size()» имеют одинаковые типы параметров, объявляющий тип и возвращаемый тип