Подсчитайте различные наборы между двумя столбцами, используя функцию agg в сеансе Spark Pyspark

#python #apache-spark #pyspark

Вопрос:

Я хочу получить количество уникальных соединений между местоположениями, поэтому a->b и b->>a должны считаться одним. Фрейм данных содержит метки времени и имя начального и конечного местоположения. В результате должны быть представлены уникальные соединения между станциями в течение дня в году.

введите описание изображения здесь

 import findspark
findspark.init('/home/[user_name]/spark-3.1.2-bin-hadoop3.2')
import pyspark
from pyspark.sql.functions import date_format, countDistinct, struct, col
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('cluster1').getOrCreate()

from pyspark.sql.types import StructType,StructField, StringType, IntegerType, DateType, TimestampType
from pyspark.sql.functions import to_timestamp
data2 = [
    ('2017-12-29 16:57:39.6540','2017-12-29 16:57:39.6540',"A","B"),
    ("2017-12-29 16:57:39.6540","2017-12-29 17:57:39.6540","B","A"),
    ("2017-12-29 16:57:39.6540","2017-12-29 19:57:39.6540","B","A"),
    ("2017-12-30 16:57:39.6540","2017-12-30 16:57:39.6540","C","A"),
    ("2017-12-30 16:57:39.6540","2017-12-30 17:57:39.6540","B","F"),
    ("2017-12-31 16:57:39.6540","2017-12-31 16:57:39.6540","C","A"),
    ("2017-12-31 16:57:39.6540","2017-12-31 17:57:39.6540","A","C"),
    ("2017-12-31 16:57:39.6540","2017-12-31 17:57:39.6540","B","C"),
    ("2017-12-31 16:57:39.6540","2017-12-31 17:57:39.6540","A","B"),
  ]

schema = StructType([ 
    StructField("start",StringType(),True), 
    StructField("end",StringType(),True), 
    StructField("start_loc",StringType(),True), 
    StructField("end_loc", StringType(), True)
  ])
 
df2 = spark.createDataFrame(data=data2,schema=schema)
df2 = df2.withColumn("start_timestamp",to_timestamp("start"))
df2 = df2.withColumn("end_timestamp",to_timestamp("end"))
df2 = df2.drop("start", "end")
df2.printSchema()
df2.show(truncate=False)

df2_agg = df2.withColumn("date", date_format('start_timestamp', 'D'))
.groupBy('date', 'start_loc','end_loc').agg(
    collect_list(struct(col('start_loc'), col('end_loc'))).alias("n_routes_sets"),
)
df2_agg.show()

 

Результат выглядит так:

введите описание изображения здесь

,но результат должен быть таким:

Дата n_routes
365 3
364 2
363 1

Ниже строка неверна.

 collect_list(struct(col('start_loc'), col('end_loc'))).alias("n_routes_sets"),
 

Ответ №1:

Измените свои строки,как показано ниже,и измените порядок a,b и b, a всегда как a, b или наоборот:

 from pyspark.sql.functions import date_format, countDistinct, collect_set, struct, col, when, size

...
...
df2 = df2.withColumn("sl2", when(df2['end_loc'] < df2['start_loc'],  df2['end_loc']).otherwise(df2['start_loc']) )
df2 = df2.withColumn("el2", when(df2['end_loc'] > df2['start_loc'],  df2['end_loc']).otherwise(df2['start_loc']) )
df2 = df2.drop("start_loc", "end_loc")

df2.printSchema()
df2.show(truncate=False)

df2_agg = df2.withColumn("date", date_format('start_timestamp', 'D'))
.groupBy('date').agg(collect_set(struct(col('sl2'), col('el2'))).alias("n_routes_sets"),
) 

df2_agg.select("date", size("n_routes_sets")).show()
 

ВОЗВРАТ:

  ---- ------------------- 
|date|size(n_routes_sets)|
 ---- ------------------- 
| 363|                  1|
| 364|                  2|
| 365|                  3|
 ---- ------------------- 
 

Комментарии:

1. Правда, вероятно, мне не хватало какого-то импорта из-за того, что это MRE, но это не ответ на мою проблему.

2. Я попробовал это в записной книжке databricvks, вашей оригинальной, и это не удалось, и когда я добавил их, это сработало.

3. добавлен вывод.

4. изменил ответ.

5. Это очень хороший ответ для данного конкретного MRE, но я не могу выполнить такую предварительную обработку данных перед agg по своей собственной задаче. Если бы был способ опустить эту предварительную обработку и включить фильтрацию данных внутри agg, это было бы лучшим ответом.