#scala #apache-spark #apache-spark-sql #user-defined-functions
#scala #apache-spark #apache-spark-sql #определяемые пользователем функции
Вопрос:
У меня есть spark dataframe, который содержит более 40 столбцов. и миллионы строк. Я хочу создать другой столбец, который принимает, скажем, 5 столбцов из вышеупомянутого фрейма данных, передать каждую строку из 5 столбцов в отдельный Api (который принимает эти 5 значений и возвращает некоторые данные) и сохранить результат в столбце.
Для простоты я использую следующий пример: допустим, у меня есть следующий фрейм данных. И я хочу отправить каждую строку «food» и «price» в API, который возвращает результат, и он сохраняется в отдельном столбце под названием «объединить»
Ввод:
---- ------ -----
|name|food |price|
---- ------ -----
|john|tomato|1.99 |
|john|carrot|0.45 |
|bill|apple |0.99 |
|john|banana|1.29 |
|bill|taco |2.59 |
---- ------ -----
Вывод:
---- ------ ----- ----------
|name|food |price|combined |
---- ------ ----- ----------
|john|tomato|1.99 |abcd |
|john|carrot|0.45 |fdg |
|bill|apple |0.99 |123fgfg |
|john|banana|1.29 |fgfg4wf |
|bill|taco |2.59 |gfg45gn |
---- ------ ----- ----------
Я создал UDF для просмотра каждой строки:
val zip = udf {
(food: String, price: Double) =>
val nvIn = new NameValue
nvIn.put("Query.ID", 1234)
nvIn.put("Food", food)
nvIn.put("Price", price)
val nvOut = new NameValue
val code: Code = getTunnelsClient().execute("CombineData", nvIn, nvOut) // this is calling the external API
nvOut.get("CombineData") //this is stored the result column
}
def test(sc: SparkContext, sqlContext: SQLContext): Unit = {
import sqlContext.implicits._
val df = Seq(
("john", "tomato", 1.99),
("john", "carrot", 0.45),
("bill", "apple", 0.99),
("john", "banana", 1.29),
("bill", "taco", 2.59)
).toDF("name", "food", "price")
val result = df.withColumn("combined", zip($"food", $"price"))
result.show(false)
}
Этот метод работает, однако я обеспокоен, поскольку я просматриваю каждую строку фрейма данных, а у меня миллионы таких строк, он не будет таким производительным в кластере
Есть ли какой-либо другой способ, которым я могу это сделать (скажем, используя spark-sql), возможно, без использования udf?
Ответ №1:
Я бы настоятельно рекомендовал использовать type safe spark
Dataset
api для отправки ваших строк данных в api.
Это включает в себя разбор ваших Dataframe
строк в scala
case
class
с помощью as
функции, а затем выполнение map
функции на вашем DatasetDataframe
, чтобы отправить ее в API и вернуть другую, case class
представляющую вашу Output
.
Несмотря на то, что строго запрещено spark sql
использовать Dataset
api, вы все равно можете извлечь выгоду из большинства оптимизаций, доступных в spark sql
case class Input(name: String, food: String, price: Double)
case class Output(name: String, food: String, price: Double, combined: String)
val df = Seq(
("john", "tomato", 1.99),
("john", "carrot", 0.45),
("bill", "apple", 0.99),
("john", "banana", 1.29),
("bill", "taco", 2.59)
).toDF("name", "food", "price")
df.as[Input].map(input => {
val nvIn = new NameValue
nvIn.put("Query.ID", 1234)
nvIn.put("Food", input.food)
nvIn.put("Price", input.price)
val nvOut = new NameValue
getTunnelsClient().execute("CombineData", nvIn, nvOut)
Output(input.name, input.food, input.price, nvOut.get("CombineData"))
}).show(false)