#sql-server #apache-spark #spark-jdbc
#sql-сервер #apache-spark #spark-jdbc
Вопрос:
Я пытаюсь прочитать данные из базы данных MSSQL, используя Spark jdbc с указанным смещением. Таким образом, данные должны загружаться только после указанной метки времени, которая будет этим смещением. Я попытался реализовать это, предоставив запрос в конфигурациях jdbc, однако я не нашел возможности создать подготовленную инструкцию с параметризованными значениями. В этом случае я хочу параметризовать смещение, которое будет меняться после каждого запуска приложения. Как я могу реализовать это, используя параметры jdbc?
Все конфигурации базы данных находятся в файле application.conf. Это способ, которым я читаю из базы данных:
def jdbcOptions(query: String) = Map[String,String](
"driver" -> config.getString("sqlserver.db.driver"),
"url" -> config.getString("sqlserver.db.url"),
"dbtable" -> s"(select * from TestAllData where update_database_time >= '2019-03-19 12:30:00.003') as subq,
"user" -> config.getString("sqlserver.db.user"),
"password" -> config.getString("sqlserver.db.password"),
"customSchema" -> config.getString("sqlserver.db.custom_schema")
)
val testDataDF = sparkSession
.read
.format("jdbc")
.options(jdbcOptions())
.load()
Вместо этого запрос должен выглядеть почти так:
s"(select * from TestAllData where update_database_time >= $tmstp) as subq
Комментарии:
1. Вы можете просто поместить свой $ tmstp непосредственно в запрос, если я правильно понял ваш вопрос.
2. Потому что это не лучший и безопасный способ встраивать параметры в запросы, используя строку, а не подготовленные инструкции.
3. Собираетесь ли вы запускать это задание, зависит от параметров пользователей?
4. Я собираюсь предоставить изменяемый параметр, который я прочитал из Kafka, поэтому я не хочу жестко кодировать значение
5. Насколько я знаю, нет способа сделать это, используя встроенные возможности spark. Однако вы могли бы попробовать стороннюю библиотеку, которая могла бы проверить ваш запрос перед вставкой в spark Reader.
Ответ №1:
В Spark-jdbc нет подготовленной инструкции, поэтому нет других способов, кроме установки параметров в строке:
val dayColumn = "update_database_time"
val dayValue = "2019-03-19 12:30:00.003"
s"(select * from TestAllData where $dayColumn > '$dayValue') as subq"