#database #scala #apache-spark
#База данных #scala #apache-spark
Вопрос:
Я пытаюсь подключиться к базе данных Oracle, и для этого мне нужно использовать чистое соединение scala, а не spark. Итак, я написал код подключения для базы данных Oracle.
Теперь главная головная боль заключается в том, что у меня есть столбец в таблице oracle, в котором для каждой строки записан запрос select (в некоторой степени таблица содержит метаданные).). Мне нужно взять запрос, записанный в каждом столбце, и запустить его, который будет находиться в таблице hive, и сохранить результат запроса в dataframe. Я не уверен, какой подход к решению вышеуказанной проблемы.
Данные таблицы Oracle
Я могу подключиться к таблице Oracle, используя чистое соединение scala.Мне нужно получить данные столбца запроса и запустить его. Результат запроса мне нужно сохранить в dataframe для дальнейшей обработки.
Код подключения:-
object ScalaJdbcConnectSelect {
def main(args: Array[String]) {
// connect to the database named "mysql" on the localhost
val driver = "com.mysql.jdbc.Driver"
val url = "jdbc:mysql://localhost/mysql"
val username = "root"
val password = "root"
var connection:Connection = null
try {
// make the connection
Class.`enter code here`forName(driver)
connection = DriverManager.getConnection(url, username, password)
// create the statement, and run the select query
val statement = connection.createStatement()
val resultSet = statement.executeQuery(query)`
Комментарии:
1. Покажите нам свой код подключения? Что вы получаете в ответ от вашего подключения / запроса? А
java.sql.ResultSet
?2. Я добавил код подключения в сообщение. Я возвращаю набор результатов Java из statment.executequery.
Ответ №1:
Вы можете подключиться к Oracle из Spark, и вы должны сделать это, потому что в противном случае вам придется последовательно переносить все данные в какое-либо промежуточное хранилище, прежде чем читать их с помощью Spark, а это довольно расточительно по сравнению с несколькими подключениями к базе данных, извлекающими данные параллельно.
// Set the variables server, port, service
val url = s"jdbc:oracle:thin:@$server:$port:$service"
// Add odbc6.jar via --driver-class-path and --jars during spark-shell/submit
val reader = spark.read.format("jdbc")
.option("url", url)
.option("user", user)
.option("password", password)
.option("driver", "oracle.jdbc.driver.OracleDriver")
// Note the use of partitionColumn is necessary to create multiple connections
// from the workers
val df = reader.option("dbtable", "db.table")
.option("partitionColumn", "col1")
.load
val dfWithQuery = reader.option("dbtable", "(SELECT a, b, c FROM t1) AS tbl1")
.option("partitionColumn", "a")
.load
Сначала вы получаете свой результат df
, а затем можете собирать столбец запроса, просматривая и добавляя собранный запрос в reader
для создания нового DataFrame
.
Если вы настаиваете на использовании вашего метода, то у вас уже есть ResultSet
2, . Вам просто нужно получить свой столбец во время итерации по строкам, а затем использовать запрос с вашим Connection
.
// ^ Your code above, Connection already exists
while (resultSet.next()){
try {
val queryValue = resultSet.getString("query")
val queryResultSet = statement.executeQuery(queryValue)
while (queryResultSet){
// Do stuff with your newly queried ResultSet
}
}
}
Вы можете видеть, насколько многоуровневым try catch
это не очень красиво.
Если вы хотите, вы можете создать Dataset
из ResultSet
, но обратите внимание, что для этого необходимо сначала загрузить все RestulSet
в память драйвера, и если данные большие, у вас может закончиться.
def resultSetToSpark[T](rs: ResultSet, f: ResultSet => T,
spark: SparkSession, encoder: Encoder[T]): Dataset[T] = {
val data: Seq[T] = Iterator.continually(rs.next, rs)
.takeWhile(_._1).map{
case (_,rs) => f(rs)
}.toList
spark.createDataset(data)(encoder)
}
Обратите внимание, вы должны предоставить функцию, которая получает одну строку из ResultSet
и создает экземпляр class
из полученных значений, а также кодировщик вашего класса. Ниже приведен пример.
case class Potato(a: String, b: String, c: String)
def parseResultSet(rs: ResultSet): Potato = Potato(
rs.getString("a"), rs.getString("b"), rs.getString("c")
)
import org.apache.spark.sql.{Encoder, Encoders}
val encoder: Encoder[Potato] = Encoders.product[Potato]
// Use like this
val dfFromRS = resultSetToSpark(resultSet, parseResultSet, spark, encoder)
Таким образом, наилучший подход на самом деле заключается в использовании Spark DataFrameReader
и чтобы работники Spark выполняли несколько подключений к базе данных, но это действительно имеет значение, только если данные, о которых идет речь, могут привести к нехватке памяти в драйвере.
Вы всегда можете получить список всех запросов, подобных этому.
case class Query(statement: String)
implicit val queryEncoder: Encoder[Query] = Encoders.product[Query]
val queryDs = resultSetToSpark(rs)(rs => Message(rs.getString("query")))(spark, queryEncoder)
val queryList = queryDs.collect.toList
val df1 = spark.sql(queryList.head) // get a DataFrame from the first query in the list
Комментарии:
1. Спасибо за подробный ответ. Столбец запроса содержит инструкции sql, которые мне нужно выполнить в таблице hive.. Как я могу этого добиться. Для подключения к oracle я использую чистое соединение scala, а не соединение spark.
2. @Saurabh Итак, что вы хотите делать с таблицами Hive при обращении к ним?
3. Моя проблема заключается в том, что у меня есть одна таблица Oracle, в которой col_1 содержит запросы. Запросы должны выполняться в таблице hive, а результат запроса, который я получил из таблицы hive, должен храниться в dataframe для каждого запроса, который будет запущен.
4. Я установил соединение с таблицей oracle и извлек данные из таблицы в resultset. Net Я попытался запустить запрос в таблице улья
5. Возможно, я слишком усложнил это. Знаете ли вы, что вы можете использовать
spark.sql(query)
для полученияDataFrame
? Или вам нужен список всех запросов, прежде чем вы что-то с ними сделаете?