Spark: как мне найти пассажиров, совершивших более 3 совместных рейсов

#scala #apache-spark #apache-spark-sql

#scala #apache-spark #apache-spark-sql

Вопрос:

У меня есть набор данных следующим образом

 passengerId, flightId, from, to, date
56,          0,        cg,   ir, 2017-01-01
78,          0,        cg,   ir, 2017-01-01
12,          0,        cg,   ir, 2017-02-01
34,          0,        cg,   ir, 2017-02-01
51,          0,        cg,   ir, 2017-02-01

56,          1,        ir,   uk, 2017-01-02
78,          1,        ir,   uk, 2017-01-02
12,          1,        ir,   uk, 2017-02-02
34,          1,        ir,   uk, 2017-02-02
51,          1,        ir,   uk, 2017-02-02

56,          2,        uk,   in, 2017-01-05
78,          2,        uk,   in, 2017-01-05
12,          2,        uk,   in, 2017-02-05
34,          2,        uk,   in, 2017-02-05
51,          3,        uk,   in, 2017-02-05
  

Мне нужно представить отчет в следующих форматах.

 Passenger 1 ID  Passenger 2 ID  No_flights_together
    56               78               6
    12               34               8
    …                 …               …

  

Найдите пассажиров, совершивших более N совместных рейсов в пределах диапазона

 Passenger 1 ID  Passenger 2 ID  No_Flights_Together From        To
56                  78                    6         2017-01-01  2017-03-01
12                  34                    8         2017-04-05  2017-12-01
…                   …                     …         …           …

  

Я не уверен, как это сделать. Помощь была бы признательна.

Ответ №1:

Вы можете самостоятельно присоединиться к df1.passengerId < df2.passengerId наряду с same flightId и date , а затем выполнить необходимое количество (*), min(дата) и max (дата) с помощью groupBy/agg :

 val df = Seq(
  (56, 0, "2017-01-01"),
  (78, 0, "2017-01-01"),
  (12, 0, "2017-02-01"),
  (34, 0, "2017-02-01"),
  (51, 0, "2017-02-01"),
  (56, 1, "2017-01-02"),
  (78, 1, "2017-01-02"),
  (12, 1, "2017-02-02"),
  (34, 1, "2017-02-02"),
  (51, 1, "2017-02-02"),
  (56, 2, "2017-01-05"),
  (78, 2, "2017-01-05"),
  (12, 2, "2017-02-01"),
  (34, 2, "2017-02-01"),
  (51, 3, "2017-02-01")
).toDF("passengerId", "flightId", "date")

df.as("df1").join(df.as("df2"),
    $"df1.passengerId" < $"df2.passengerId" amp;amp;
    $"df1.flightId" === $"df2.flightId" amp;amp;
    $"df1.date" === $"df2.date",
    "inner"
  ).
  groupBy($"df1.passengerId", $"df2.passengerId").
  agg(count("*").as("flightsTogether"), min($"df1.date").as("from"), max($"df1.date").as("to")).
  where($"flightsTogether" >= 3).
  show
//  ----------- ----------- --------------- ---------- ---------- 
// |passengerId|passengerId|flightsTogether|      from|        to|
//  ----------- ----------- --------------- ---------- ---------- 
// |         12|         34|              3|2017-02-01|2017-02-02|
// |         56|         78|              3|2017-01-01|2017-01-05|
//  ----------- ----------- --------------- ---------- ---------- 
  

Комментарии:

1. Спасибо. Помощь приветствуется: D

2. Когда я использую приведенный выше код, я получил сообщение об ошибке «значение < не является членом org.apache.spark.sql.catalyst.analysis. Неразрешенный атрибут». Причиной ошибки является оператор связи Syas. я использую spark-sql 2.4.0. Любые указания будут полезны для меня