Пользовательские задания для полезных нагрузок json в структурированной потоковой передаче spark

#scala #apache-spark #apache-spark-sql #spark-streaming #spark-structured-streaming

#scala #apache-spark #apache-spark-sql #потоковая передача spark #spark-structured-streaming

Вопрос:

У меня в event hub есть полезная нагрузка ниже, способная использовать то же самое для spark, используя структурированную потоковую передачу. Теперь мне нужно написать пользовательские задания для каждого типа датчиков. Как я мог бы проверить значение столбца streaming DF и привязать его к другой функции? На самом деле каждый тип имеет разный набор параметров, и мне нужно получить пользовательскую схему из базы данных SQL, я застрял здесь.

 {
"Sensor_Id": 1,
"Sensor_Type":"Type1",
"Parameter":
{
"Parameter1":12
"Parameter1":34
"Parameter1":56
}
}
 

Ниже фрагмент кода для извлечения полезной нагрузки. Я могу читать содержимое отдельно от параметра, я думаю извлечь схему из базы данных SQL путем запроса с использованием Sensor_Type и прочитать часть параметра с помощью from_json. А затем напишите пользовательские задания. Есть ли какой-либо способ добиться этого в Spark 2.4?

 val eventhubs = spark.readStream
      .format("eventhubs")
      .options(eventHubsConf.toMap)
      .load()
val jsonSchema = new StructType().add("Sensor_Id", StringType)
      .add("Sensor_Type", StringType)
      .add("Parameter", StringType)
val events = eventhubs.select(from_json($"body".cast("String"),jsonSchema).alias("sensorReading"))
 

Комментарии:

1. Если количество видов для типов датчиков невелико, вы можете попробовать «СЛУЧАЙ, КОГДА» и применить другой подход к чтению «Параметра» в соответствии со значением «Sensor_Type». Я предполагаю, что тип вывода «Параметра» должен быть одинаковым и детерминированным, поскольку для запроса не имеет смысла поддерживать разные схемы для каждой строки. Если количество типов для типов датчиков велико, вы можете попробовать обернуть код для чтения «Parameter» как UDF и передать значения двоек («Sensor_Type» и «Parameter») в качестве параметров UDF.

2. @JungtaekLim, спасибо за ваш комментарий, мы планируем иметь динамический характер, чтобы пользователь мог добавлять новый тип датчика в любое время, и spark должен иметь возможность обрабатывать то же самое. Схема (параметры) вновь добавленных типов датчиков могут извлекаться из базы данных. Вот почему я подумал о передаче sensor_type в качестве переменной для запроса JDBC. Я не уверен, что мой подход здесь правильный, подходит ли spark structured streaming для таких динамических вариантов использования?

3. Я не уверен, но я предполагаю, что это работает только тогда, когда схема типов датчиков одинакова. Вы запускаете SQL-запрос, каждая операция которого имеет схему ввода и схему вывода. Если вы хотите иметь гибкость со схемой, вы можете в конечном итоге использовать их как строковый тип и иметь ветви (например, CASE WHEN).

4. @JungtaekLim Что касается случая, когда подход, я нашел способ получить sensor_type в список, теперь мне нужно перепроверить его со столбцом Sensor_Type в потоковом фрейме данных. Как я мог бы сделать то же самое? Не могли бы вы, пожалуйста, направить меня?