Свойство Spark Structured Streaming Kafka Headers имеет значение null в объекте ForEachWriter

#java #apache-spark #apache-kafka #spark-structured-streaming

#java #apache-искра #апачи-кафка #spark-structured-streaming

Вопрос:

В настоящее время у меня есть ForEachWriter, который работает с типизированным набором данных, поступающим из Kafka. Все свойства, кроме заголовков для каждого сообщения, отображаются правильно внутри ForEachWriter. Код для репликации находится внизу.

Схема в точности такая, как она взята из программы чтения потоков данных для format Kafka.

 root
 |-- topic: string (nullable = true)
 |-- key: string (nullable = true)
 |-- value: string (nullable = true)
 |-- timestamp: string (nullable = true)
 |-- headers: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- key: string (nullable = true)
 |    |    |-- value: binary (nullable = true)
 

Заголовки явно находятся в печатаемом столбце набора данных.

  ------------ -------- ------- ----------------------- ---------------------------------- 
|topic       |key     |value  |timestamp              |headers                           |
 ------------ -------- ------- ----------------------- ---------------------------------- 
|controlTopic|        |TEXT   |2020-11-24 16:07:33.09 |[[SAMPLE, TEXT]]                  |
|controlTopic|        |2Header|2020-11-24 16:43:46.755|[[HEADER1, VAL1], [HEADER2, VAL2]]|
 ------------ -------- ------- ----------------------- ---------------------------------- 
 

И соответствующее количество объектов KeyValue даже отображается в свойстве заголовка объекта, но их значения равны null. Например, в первом сообщении будет отображаться один элемент в списке (изображение слева), а во втором сообщении будут отображаться два элемента в списке (изображение справа).

значение null

Код выглядит следующим образом:

KafkaMessage.java

 public class KafkaMessage {

    public String topic;

    public String key;

    public String value;

    public String timestamp;

    public List<KV> headers;

    public KafkaMessage() {
    }

    public KafkaMessage(String topic, String key, String value, String timestamp, List<KV> headers) {

        this.topic = topic;
        this.key = key;
        this.value = value;
        this.timestamp = timestamp;
        this.headers = headers;
    }
}
 

KV.java

 public class KV {
    public String key;
    public byte[] value;

    public KV() {
    }

    public KV(String key, byte[] value) {
        this.key = key;
        this.value = value;
    }
}

 

MySparkWriter.java

 public class MySparkWriter extends ForeachWriter<KafkaMessage> {

    @Override
    public boolean open(long partitionId, long epochId) {
        // open connection, not needed in this example
        return true;
    }

    @Override
    public void process(KafkaMessage kafkaMessage) {
        // here headers will not be null, if there are headers
        List<KV> headers = kafkaMessage.headers;
        for (KV kv : headers) {
            System.out.println(kv.key != null); // kv.key will always be null, will only print in local mode
        }
    }

    @Override
    public void close(Throwable errorOrNull) {
        // close connection, not needed in this example
    }
}

 

MySparkApp.java Обратите внимание, что это единственный класс, в котором какой-то код опущен, все, что вам нужно, это создать контекст Spark и вызвать startSpark()

 public class MySparkApp {

    ...
    
    public StreamingQuery startSpark() throws TimeoutException {
        // Get Input
        DataStreamReader dsReader = this.sparkSession.readStream()
                .format("kafka")
                .option("kafka.bootstrap.servers", "localhost:9092")
                .option("includeHeaders", true)
                .option("subscribe", "controlTopic");

        Dataset<KafkaMessage> messages = dsReader.load()
                .selectExpr("CAST(topic AS STRING)", "CAST(key AS STRING)", "CAST(value AS STRING)", "CAST(timestamp AS STRING)", "headers")
                .as(ExpressionEncoder.javaBean(KafkaMessage.class));

        // debug prints
        messages.writeStream()
            .format("console")
            .outputMode("update")
            .trigger(Trigger.ProcessingTime("2 seconds"))
            .option("truncate", "false")
            .start();
        messages.printSchema();

        // Start Streaming
        return messages
                .writeStream()
                .trigger(Trigger.ProcessingTime("2 seconds"))
                .foreach(this.sparkWriter) // this is a MySparkWriter instance
                .option("checkpointLocation", "/checkpoint/"  sparkSession.sparkContext().applicationId())
                .start();
    }
    
    ...

}
 

Комментарии:

1. любое решение для этого? когда я включаю включение заголовков, я получаю исключение ниже, вызванное: org.codehaus. джанино. Исключение InternalCompilerException: два неабстрактных метода «public int scala.collection. TraversableOnce.size()» имеют одинаковые типы параметров, объявляющий тип и возвращаемый тип