Scala для извлечения сведений из таблицы Oracle и запуска запроса в таблице hive

#database #scala #apache-spark

#База данных #scala #apache-spark

Вопрос:

Я пытаюсь подключиться к базе данных Oracle, и для этого мне нужно использовать чистое соединение scala, а не spark. Итак, я написал код подключения для базы данных Oracle.

Теперь главная головная боль заключается в том, что у меня есть столбец в таблице oracle, в котором для каждой строки записан запрос select (в некоторой степени таблица содержит метаданные).). Мне нужно взять запрос, записанный в каждом столбце, и запустить его, который будет находиться в таблице hive, и сохранить результат запроса в dataframe. Я не уверен, какой подход к решению вышеуказанной проблемы.

Данные таблицы Oracle

введите описание изображения здесь

Я могу подключиться к таблице Oracle, используя чистое соединение scala.Мне нужно получить данные столбца запроса и запустить его. Результат запроса мне нужно сохранить в dataframe для дальнейшей обработки.

Код подключения:-

     object ScalaJdbcConnectSelect {

  def main(args: Array[String]) {
    // connect to the database named "mysql" on the localhost
    val driver = "com.mysql.jdbc.Driver"
    val url = "jdbc:mysql://localhost/mysql"
    val username = "root"
    val password = "root"

    
    var connection:Connection = null

    try {
      // make the connection
      Class.`enter code here`forName(driver)
      connection = DriverManager.getConnection(url, username, password)

      // create the statement, and run the select query
      val statement = connection.createStatement()
      val resultSet = statement.executeQuery(query)`
  

Комментарии:

1. Покажите нам свой код подключения? Что вы получаете в ответ от вашего подключения / запроса? А java.sql.ResultSet ?

2. Я добавил код подключения в сообщение. Я возвращаю набор результатов Java из statment.executequery.

Ответ №1:

Вы можете подключиться к Oracle из Spark, и вы должны сделать это, потому что в противном случае вам придется последовательно переносить все данные в какое-либо промежуточное хранилище, прежде чем читать их с помощью Spark, а это довольно расточительно по сравнению с несколькими подключениями к базе данных, извлекающими данные параллельно.

 // Set the variables server, port, service
val url = s"jdbc:oracle:thin:@$server:$port:$service"

// Add odbc6.jar via --driver-class-path and --jars during spark-shell/submit 
val reader = spark.read.format("jdbc")
  .option("url", url)
  .option("user", user)
  .option("password", password)
  .option("driver", "oracle.jdbc.driver.OracleDriver")

// Note the use of partitionColumn is necessary to create multiple connections 
// from the workers
val df = reader.option("dbtable", "db.table")
               .option("partitionColumn", "col1")
               .load
val dfWithQuery = reader.option("dbtable", "(SELECT a, b, c FROM t1) AS tbl1")
                  .option("partitionColumn", "a")
                  .load
  

Сначала вы получаете свой результат df , а затем можете собирать столбец запроса, просматривая и добавляя собранный запрос в reader для создания нового DataFrame .

Если вы настаиваете на использовании вашего метода, то у вас уже есть ResultSet 2, . Вам просто нужно получить свой столбец во время итерации по строкам, а затем использовать запрос с вашим Connection .

 // ^ Your code above, Connection already exists
while (resultSet.next()){
  try {
    val queryValue = resultSet.getString("query")
    val queryResultSet = statement.executeQuery(queryValue)
    while (queryResultSet){
      // Do stuff with your newly queried ResultSet
    }
  }
}
  

Вы можете видеть, насколько многоуровневым try catch это не очень красиво.

Если вы хотите, вы можете создать Dataset из ResultSet , но обратите внимание, что для этого необходимо сначала загрузить все RestulSet в память драйвера, и если данные большие, у вас может закончиться.

 def resultSetToSpark[T](rs: ResultSet, f: ResultSet => T, 
                        spark: SparkSession, encoder: Encoder[T]): Dataset[T] = {
  val data: Seq[T] = Iterator.continually(rs.next, rs)
    .takeWhile(_._1).map{
      case (_,rs) => f(rs)
    }.toList

  spark.createDataset(data)(encoder)
}
  

Обратите внимание, вы должны предоставить функцию, которая получает одну строку из ResultSet и создает экземпляр class из полученных значений, а также кодировщик вашего класса. Ниже приведен пример.

 case class Potato(a: String, b: String, c: String)

def parseResultSet(rs: ResultSet): Potato = Potato(
 rs.getString("a"), rs.getString("b"), rs.getString("c")
)

import org.apache.spark.sql.{Encoder, Encoders}
val encoder: Encoder[Potato] =  Encoders.product[Potato]

// Use like this
val dfFromRS = resultSetToSpark(resultSet, parseResultSet, spark, encoder)
  

Таким образом, наилучший подход на самом деле заключается в использовании Spark DataFrameReader и чтобы работники Spark выполняли несколько подключений к базе данных, но это действительно имеет значение, только если данные, о которых идет речь, могут привести к нехватке памяти в драйвере.


Вы всегда можете получить список всех запросов, подобных этому.

 case class Query(statement: String)
implicit val queryEncoder: Encoder[Query] = Encoders.product[Query]
val queryDs = resultSetToSpark(rs)(rs => Message(rs.getString("query")))(spark, queryEncoder)
val queryList = queryDs.collect.toList
val df1 = spark.sql(queryList.head) // get a DataFrame from the first query in the list
  

Комментарии:

1. Спасибо за подробный ответ. Столбец запроса содержит инструкции sql, которые мне нужно выполнить в таблице hive.. Как я могу этого добиться. Для подключения к oracle я использую чистое соединение scala, а не соединение spark.

2. @Saurabh Итак, что вы хотите делать с таблицами Hive при обращении к ним?

3. Моя проблема заключается в том, что у меня есть одна таблица Oracle, в которой col_1 содержит запросы. Запросы должны выполняться в таблице hive, а результат запроса, который я получил из таблицы hive, должен храниться в dataframe для каждого запроса, который будет запущен.

4. Я установил соединение с таблицей oracle и извлек данные из таблицы в resultset. Net Я попытался запустить запрос в таблице улья

5. Возможно, я слишком усложнил это. Знаете ли вы, что вы можете использовать spark.sql(query) для получения DataFrame ? Или вам нужен список всех запросов, прежде чем вы что-то с ними сделаете?