Отправьте несколько столбцов из Spark Dataframe во внешний API и сохраните результат в отдельном столбце

#scala #apache-spark #apache-spark-sql #user-defined-functions

#scala #apache-spark #apache-spark-sql #определяемые пользователем функции

Вопрос:

У меня есть spark dataframe, который содержит более 40 столбцов. и миллионы строк. Я хочу создать другой столбец, который принимает, скажем, 5 столбцов из вышеупомянутого фрейма данных, передать каждую строку из 5 столбцов в отдельный Api (который принимает эти 5 значений и возвращает некоторые данные) и сохранить результат в столбце.

Для простоты я использую следующий пример: допустим, у меня есть следующий фрейм данных. И я хочу отправить каждую строку «food» и «price» в API, который возвращает результат, и он сохраняется в отдельном столбце под названием «объединить»

Ввод:

  ---- ------ ----- 
|name|food  |price|
 ---- ------ ----- 
|john|tomato|1.99 |
|john|carrot|0.45 |
|bill|apple |0.99 |
|john|banana|1.29 |
|bill|taco  |2.59 |
 ---- ------ ----- 
  

Вывод:

  ---- ------ ----- ---------- 
|name|food  |price|combined  |
 ---- ------ ----- ---------- 
|john|tomato|1.99 |abcd      |
|john|carrot|0.45 |fdg       |
|bill|apple |0.99 |123fgfg   |
|john|banana|1.29 |fgfg4wf   |
|bill|taco  |2.59 |gfg45gn   |
 ---- ------ ----- ---------- 
  

Я создал UDF для просмотра каждой строки:

 val zip = udf {
(food: String, price: Double) =>
    val nvIn = new NameValue
    nvIn.put("Query.ID", 1234)
    nvIn.put("Food", food)
    nvIn.put("Price", price)
    val nvOut = new NameValue

    val code: Code = getTunnelsClient().execute("CombineData", nvIn, nvOut) // this is calling the external API
    nvOut.get("CombineData")     //this is stored the result column
  }

  def test(sc: SparkContext, sqlContext: SQLContext): Unit = {
    import sqlContext.implicits._
    val df = Seq(
      ("john", "tomato", 1.99),
      ("john", "carrot", 0.45),
      ("bill", "apple", 0.99),
      ("john", "banana", 1.29),
      ("bill", "taco", 2.59)
    ).toDF("name", "food", "price")


    val result = df.withColumn("combined", zip($"food", $"price"))
    result.show(false)

  }
  

Этот метод работает, однако я обеспокоен, поскольку я просматриваю каждую строку фрейма данных, а у меня миллионы таких строк, он не будет таким производительным в кластере

Есть ли какой-либо другой способ, которым я могу это сделать (скажем, используя spark-sql), возможно, без использования udf?

Ответ №1:

Я бы настоятельно рекомендовал использовать type safe spark Dataset api для отправки ваших строк данных в api.

Это включает в себя разбор ваших Dataframe строк в scala case class с помощью as функции, а затем выполнение map функции на вашем DatasetDataframe , чтобы отправить ее в API и вернуть другую, case class представляющую вашу Output .

Несмотря на то, что строго запрещено spark sql использовать Dataset api, вы все равно можете извлечь выгоду из большинства оптимизаций, доступных в spark sql

 case class Input(name: String, food: String, price: Double)
case class Output(name: String, food: String, price: Double, combined: String)

val df = Seq(
  ("john", "tomato", 1.99),
  ("john", "carrot", 0.45),
  ("bill", "apple", 0.99),
  ("john", "banana", 1.29),
  ("bill", "taco", 2.59)
).toDF("name", "food", "price")

df.as[Input].map(input => {
    val nvIn = new NameValue
    nvIn.put("Query.ID", 1234)
    nvIn.put("Food", input.food)
    nvIn.put("Price", input.price)
    val nvOut = new NameValue
    getTunnelsClient().execute("CombineData", nvIn, nvOut)
    Output(input.name, input.food, input.price, nvOut.get("CombineData"))
}).show(false)