запись запроса.lastProgress() или QueryProgressEvent.progress() в kafka или hdfs sink

#metrics

#показатели

Вопрос:

Мне нужно записать метрики запроса в kafak sink или файл hdfs.

Я попытался напечатать QueryProgressEvent.progress() на консоли и его работу, однако мне нужно записать то же самое на какой-нибудь диск для последующего анализа. Может ли кто-нибудь поделиться кодом или подходящим методом для написания того же. Ниже приведен мой прогресс написания кода в консоль :-

 public class kafka_stream {
    public static void main(String[] args) {
           Logger.getLogger("org").setLevel(Level.OFF);
           Logger.getLogger("akka").setLevel(Level.OFF);
           SparkSession spark = SparkSession.builder().appName("Java Spark SQL basic example").config("spark.master", "local[*]").config("spark.streaming.stopGracefullyOnShutdown","true").getOrCreate();



spark.streams().addListener(new StreamingQueryListener() {
    @Override
    public void onQueryStarted(QueryStartedEvent queryStarted) {
        System.out.println("Query started: "   queryStarted.id());
    }
    @Override
    public void onQueryTerminated(QueryTerminatedEvent queryTerminated) {
        System.out.println("Query terminated: "   queryTerminated.id());
    }
    @Override
    public void onQueryProgress(QueryProgressEvent queryProgress) {
        System.out.println("Query made progress: "   queryProgress.progress());
queryProgress.progress().write();
    }
});

Dataset<Row> df = spark.readStream().format("kafka").option("kafka.bootstrap.servers", "10.5.205.213:9092").option("subscribe", "telemedia-clickstream").option("failOnDataLoss","false").option("checkpointLocation", "/prod/data/telemedia/chckpoint").load().selectExpr("CAST(value AS STRING)","topic","partition","offset","timestamp","timestampType" );

StreamingQuery query = df.writeStream().outputMode("append").format("parquet").trigger(Trigger.ProcessingTime("30 seconds")).option("path", "/prod/data/telemedia").option("checkpointLocation", "/prod/data/telemedia/chckpoint").start();

try{ query.awaitTermination();   }  catch(StreamingQueryException e) {System.exit(-1);}

                                                    }

                                                    }