#java #apache-spark #apache-spark-sql #spark-structured-streaming
Вопрос:
У меня есть задание spark, которое получает данные от Кафки, преобразует данные (создает несколько наборов данных) и вставляет их в несколько баз данных (MongoDB и PostgreSQL).
Ее примерный запрос
public Dataset<Row> apply(Dataset<Row> dataset) throws Exception {
LOG.info("..... data process into transaction layer......");
if (dataset.isStreaming()) {
dataset.writeStream().queryName(Constant.QueryNames.STREAM).outputMode(OutputMode.Update())
.foreachBatch((dataframe, bachId) -> {
dataframe.persist();
Order.transform(dataframe, true);
Task.transform(dataframe, true);
milestone.transform(dataframe, true);
dataframe.unpersist();
}).start().awaitTermination();
} else {
dataset.persist();
Order.transform(dataframe, true);
Task.transform(dataframe, true);
milestone.transform(dataframe, true);
dataset.unpersist();
}
return dataset;
}
Я попытался использовать пользовательский интерфейс Spark для получения подробной информации, но, поскольку он создает пакеты данных для обработки, а пользовательский интерфейс потоковой передачи структуры постоянно ожидает данных (допустим, мои пакетные данные закончились), он будет усреднять время обработки по времени
Как мне получить время обработки, необходимое для обработки и вставки данных в базы данных. например, общее количество обработанных событий в секунду? или наоборот
Комментарии:
1. Вы можете начать с предоставления 100 тыс. записей со скоростью 2 тыс. строк в секунду и проверить время обработки всего задания, а затем продолжать увеличивать скорость передачи данных до 3 тыс./4 тыс. строк в секунду и записывать время обработки. Еще одним хорошим показателем будет график использования памяти для каждой скорости передачи данных. Вы пишете генератор данных на основе входных данных.