Наиболее эффективный способ расширения столбца фрейма данных Pyspark

#apache-spark #dataframe #pyspark

#apache-spark #фрейм данных #pyspark

Вопрос:

У меня очень большой фрейм данных pyspark. Фрейм данных содержит два важных столбца: Ключ и токены, связанные с этим ключом. Таким образом, каждая строка содержит ключ и список токенов:

 load_df.show(5)

 -------------------- -----------  
|       token        |    key    | 
 -------------------- ----------- 
|[-LU4KeI8o, FrWx6...|   h9-1256 |
|[]                  |   h1-2112 |
|[HDOksdh_vv, aIHD...|   e3-0139 |
|[-LU4KeI8o, FrWx6...|   S3-4156 |
 -------------------- ----------- 
  

Теперь я хочу подсчитать, сколько раз каждый токен появлялся относительно разных ключей. Но проблема в том, что все, что я делаю, оказывается очень медленным.
Я хочу знать, каков наилучший способ для этого?

Я попытался разнести столбец token, а затем посчитать.

Что-то вроде этого:

 explode_df = load_df.withColumn('token', F.explode('token'))

load_freq = explode_df.groupby('token')
                    .count()
                    .sort('count', ascending=False)
  

или это:

 explode_df = load_df.withColumn('token', F.explode('token'))

load_freq = explode_df.groupby('token')
                    .agg(F.collect_set('key'), F.count(F.col('key')).alias('count'))
                    .sort('count', ascending=True)
  

Фрейм данных содержит более 250 миллионов строк, и этот метод работает очень медленно. Интересно, есть ли лучший способ достичь того же результата быстрее и эффективнее.

Комментарии:

1. я думаю, что у вас может быть best..do вам абсолютно необходим этап сортировки здесь?

2. @VamsiPrabhala Я думаю, что мне это нужно, поскольку я хочу поработать с определенным квантилем данных. возможно, я мог бы отсортировать их по отдельности, но я не думаю, что это было бы намного быстрее.

3. Разве здесь не следует группировать как по ключу, так и по токену? После разнесения вы можете попробовать разбить его на разделы по ключу и токену и записать его в parquet, затем прочитать его обратно и выполнить агрегацию, но улучшение, как я подозреваю, будет незначительным, если вообще будет какое-либо