#scala #apache-spark #apache-spark-sql
#scala #apache-spark #apache-spark-sql
Вопрос:
У меня есть набор данных следующим образом
passengerId, flightId, from, to, date
56, 0, cg, ir, 2017-01-01
78, 0, cg, ir, 2017-01-01
12, 0, cg, ir, 2017-02-01
34, 0, cg, ir, 2017-02-01
51, 0, cg, ir, 2017-02-01
56, 1, ir, uk, 2017-01-02
78, 1, ir, uk, 2017-01-02
12, 1, ir, uk, 2017-02-02
34, 1, ir, uk, 2017-02-02
51, 1, ir, uk, 2017-02-02
56, 2, uk, in, 2017-01-05
78, 2, uk, in, 2017-01-05
12, 2, uk, in, 2017-02-05
34, 2, uk, in, 2017-02-05
51, 3, uk, in, 2017-02-05
Мне нужно представить отчет в следующих форматах.
Passenger 1 ID Passenger 2 ID No_flights_together
56 78 6
12 34 8
… … …
Найдите пассажиров, совершивших более N совместных рейсов в пределах диапазона
Passenger 1 ID Passenger 2 ID No_Flights_Together From To
56 78 6 2017-01-01 2017-03-01
12 34 8 2017-04-05 2017-12-01
… … … … …
Я не уверен, как это сделать. Помощь была бы признательна.
Ответ №1:
Вы можете самостоятельно присоединиться к df1.passengerId < df2.passengerId
наряду с same flightId
и date
, а затем выполнить необходимое количество (*), min(дата) и max (дата) с помощью groupBy/agg
:
val df = Seq(
(56, 0, "2017-01-01"),
(78, 0, "2017-01-01"),
(12, 0, "2017-02-01"),
(34, 0, "2017-02-01"),
(51, 0, "2017-02-01"),
(56, 1, "2017-01-02"),
(78, 1, "2017-01-02"),
(12, 1, "2017-02-02"),
(34, 1, "2017-02-02"),
(51, 1, "2017-02-02"),
(56, 2, "2017-01-05"),
(78, 2, "2017-01-05"),
(12, 2, "2017-02-01"),
(34, 2, "2017-02-01"),
(51, 3, "2017-02-01")
).toDF("passengerId", "flightId", "date")
df.as("df1").join(df.as("df2"),
$"df1.passengerId" < $"df2.passengerId" amp;amp;
$"df1.flightId" === $"df2.flightId" amp;amp;
$"df1.date" === $"df2.date",
"inner"
).
groupBy($"df1.passengerId", $"df2.passengerId").
agg(count("*").as("flightsTogether"), min($"df1.date").as("from"), max($"df1.date").as("to")).
where($"flightsTogether" >= 3).
show
// ----------- ----------- --------------- ---------- ----------
// |passengerId|passengerId|flightsTogether| from| to|
// ----------- ----------- --------------- ---------- ----------
// | 12| 34| 3|2017-02-01|2017-02-02|
// | 56| 78| 3|2017-01-01|2017-01-05|
// ----------- ----------- --------------- ---------- ----------
Комментарии:
1. Спасибо. Помощь приветствуется: D
2. Когда я использую приведенный выше код, я получил сообщение об ошибке «значение < не является членом org.apache.spark.sql.catalyst.analysis. Неразрешенный атрибут». Причиной ошибки является оператор связи Syas. я использую spark-sql 2.4.0. Любые указания будут полезны для меня