#metrics
#показатели
Вопрос:
Мне нужно записать метрики запроса в kafak sink или файл hdfs.
Я попытался напечатать QueryProgressEvent.progress() на консоли и его работу, однако мне нужно записать то же самое на какой-нибудь диск для последующего анализа. Может ли кто-нибудь поделиться кодом или подходящим методом для написания того же. Ниже приведен мой прогресс написания кода в консоль :-
public class kafka_stream {
public static void main(String[] args) {
Logger.getLogger("org").setLevel(Level.OFF);
Logger.getLogger("akka").setLevel(Level.OFF);
SparkSession spark = SparkSession.builder().appName("Java Spark SQL basic example").config("spark.master", "local[*]").config("spark.streaming.stopGracefullyOnShutdown","true").getOrCreate();
spark.streams().addListener(new StreamingQueryListener() {
@Override
public void onQueryStarted(QueryStartedEvent queryStarted) {
System.out.println("Query started: " queryStarted.id());
}
@Override
public void onQueryTerminated(QueryTerminatedEvent queryTerminated) {
System.out.println("Query terminated: " queryTerminated.id());
}
@Override
public void onQueryProgress(QueryProgressEvent queryProgress) {
System.out.println("Query made progress: " queryProgress.progress());
queryProgress.progress().write();
}
});
Dataset<Row> df = spark.readStream().format("kafka").option("kafka.bootstrap.servers", "10.5.205.213:9092").option("subscribe", "telemedia-clickstream").option("failOnDataLoss","false").option("checkpointLocation", "/prod/data/telemedia/chckpoint").load().selectExpr("CAST(value AS STRING)","topic","partition","offset","timestamp","timestampType" );
StreamingQuery query = df.writeStream().outputMode("append").format("parquet").trigger(Trigger.ProcessingTime("30 seconds")).option("path", "/prod/data/telemedia").option("checkpointLocation", "/prod/data/telemedia/chckpoint").start();
try{ query.awaitTermination(); } catch(StreamingQueryException e) {System.exit(-1);}
}
}