Фреймы данных Spark: добавление условного столбца в фрейм данных

#scala #apache-spark #dataframe #apache-spark-sql #conditional

#scala #apache-spark #фрейм данных #apache-spark-sql #условные операторы

Вопрос:

Я хочу добавить условный столбец Flag в фрейм данных A. Когда выполняются следующие два условия, добавьте 1 к Flag , в противном случае 0:

  1. num из фрейма данных A находится между numStart и numEnd из фрейма данных B.

  2. Если вышеуказанное условие удовлетворяет, проверьте, include равно ли оно 1.

Фрейм данных A (это очень большой фрейм данных, содержащий миллионы строк):

  ---- ------ ----- ------------------------ 
|num |food  |price|timestamp               |
 ---- ------ ----- ------------------------ 
|1275|tomato|1.99 |2018-07-21T00:00:00.683Z|
|145 |carrot|0.45 |2018-07-21T00:00:03.346Z|
|2678|apple |0.99 |2018-07-21T01:00:05.731Z|
|6578|banana|1.29 |2018-07-20T01:11:59.957Z|
|1001|taco  |2.59 |2018-07-21T01:00:07.961Z|
 ---- ------ ----- ------------------------ 
  

Фрейм данных B (это очень маленький DF, содержащий всего 100 строк):

  ---------- ----------- ------- 
|numStart  |numEnd     |include|
 ---------- ----------- ------- 
|0         |200        |1      |
|250       |1050       |0      |
|2000      |3000       |1      |
|10001     |15001      |1      |
 ---------- ----------- ------- 
  

Ожидаемый результат:

  ---- ------ ----- ------------------------ ---------- 
|num |food  |price|timestamp               |Flag      |
 ---- ------ ----- ------------------------ ---------- 
|1275|tomato|1.99 |2018-07-21T00:00:00.683Z|0         |
|145 |carrot|0.45 |2018-07-21T00:00:03.346Z|1         |
|2678|apple |0.99 |2018-07-21T01:00:05.731Z|1         |
|6578|banana|1.29 |2018-07-20T01:11:59.957Z|0         |
|1001|taco  |2.59 |2018-07-21T01:00:07.961Z|0         |
 ---- ------ ----- ------------------------ ---------- 
  

Комментарии:

1. что вы уже пробовали? Вы пробовали объединять фреймы данных на основе условия, которое вы описали в пункте (i)?

Ответ №1:

Вы можете соединить слева dfB с dfA на основе условия, которое вы описали в (i), затем создать Flag столбец, используя withColumn и coalesce функцию, значение «по умолчанию» равное 0:

  • Записи, для которых было найдено совпадение, будут использовать include значение соответствующей dfB записи
  • Записи, для которых не было совпадения, будут иметь include=null , и согласно вашему требованию такие записи должны быть получены Flag=0 , поэтому мы используем coalesce , который в случае null возвращает значение по умолчанию с литералом lit(0)

Наконец, избавьтесь от dfB столбцов, которые вас не интересуют:

 import org.apache.spark.sql.functions._
import spark.implicits._ // assuming "spark" is your SparkSession

dfA.join(dfB, $"num".between($"numStart", $"numEnd"), "left")
  .withColumn("Flag", coalesce($"include", lit(0)))
  .drop(dfB.columns: _*)
  .show()

//  ---- ------ ----- -------------------- ---- 
// | num|  food|price|           timestamp|Flag|
//  ---- ------ ----- -------------------- ---- 
// |1275|tomato| 1.99|2018-07-21T00:00:...|   0|
// | 145|carrot| 0.45|2018-07-21T00:00:...|   1|
// |2678| apple| 0.99|2018-07-21T01:00:...|   1|
// |6578|banana| 1.29|2018-07-20T01:11:...|   0|
// |1001|  taco| 2.59|2018-07-21T01:00:...|   0|
//  ---- ------ ----- -------------------- ---- 
  

Комментарии:

1. Спасибо за ваш ответ, но я получаю ошибки «Не удается разрешить слияние символов» и «не удается разрешить подсветку символа». Также, если я могу спросить, в чем причина объединения в столбце «включить»?

2. Какую версию Spark вы используете? И включили ли вы в свой код первую инструкцию import (импорт functions._ )? Вот откуда должна взяться lit функция. Что касается использования coalesce : добавлено объяснение в ответе.

Ответ №2:

Соедините два фрейма данных вместе при первом условии, сохраняя все строки в фрейме данных A (т. Е. с помощью соединения по левому краю, см. Код ниже). После объединения include столбец может быть переименован Flag , и все значения NaN внутри него будут установлены в 0. Два дополнительных столбца, numStart и numEnd будут удалены.

Таким образом, код может быть написан следующим образом:

 A.join(B, $"num" >= $"numStart" amp;amp; $"num" <= $"numEnd", "left")
  .withColumnRenamed("include", "Flag")
  .drop("numStart", "numEnd")
  .na.fill(Map("Flag" -> 0))