Объединение числового столбца с помощью PySpark

#python #pandas #apache-spark #pyspark #apache-spark-sql

#python #панды #apache-spark #pyspark #apache-spark-sql

Вопрос:

У меня есть фрейм данных PySpark df , в котором есть числовой столбец (с NaNs)

  ------- 
|numbers|
 ------- 
| 142.56|
|       |
|2023.33|
| 477.76|
| 175.52|
|1737.45|
| 520.72|
|  641.2|
|   79.3|
| 138.43|
 ------- 
 

Я хочу создать новый столбец, который определяет некоторые ячейки, например 0, (0, 500], (500, 1000], (1000, inf)

Есть ли способ добиться этого с помощью функции, подобной pandas.cut? На данный момент я делаю это с помощью PySpark, определяя функцию udf следующим образом, но недостатком этого подхода является то, что он утомительный и непараметрический

 from pyspark.sql import functions as F
from pyspark.sql.types import *

def func(numbers):
    if numbers==0:
        return '0'
    elif numbers>0 and numbers<=500:
        return '(0, 500]'
    elif numbers>500 and numbers<=1000:
        return '(500, 1000]'
    elif numbers>500:
        return '(500, inf)'
    else return 'Other'

func_udf = F.udf(func, StringType())

df.withColumn('numbers_bin', func_udf(df['numbers']))
 

Если бы df был фреймом данных Pandas, я бы использовал этот подход:

 df['numbers_bin'] = pd.cut(
    df['numbers'],
    np.concatenate((-np.inf, [0, 500, 1000], np.inf), axis=None))
 

Что намного чище и модульнее

Комментарии:

1. каков dtype столбца? почему во второй строке есть пустая запись?

2. Тип dtype — float . Я бы просто рассмотрел случай наличия нулевых значений между числами. Спасибо

Ответ №1:

Вы можете использовать Bucketizer из Spark ML:

 from pyspark.ml.feature import Bucketizer

df2 = Bucketizer(
    splits=[-float('inf'), 0, 500, 1000, float('inf')],
    inputCol='numbers',
    outputCol='numbers_bin'
).transform(df)

df2.show()
 ------- ----------- 
|numbers|numbers_bin|
 ------- ----------- 
| 142.56|        1.0|
|   null|       null|
|2023.33|        3.0|
| 477.76|        1.0|
| 175.52|        1.0|
|1737.45|        3.0|
| 520.72|        2.0|
|  641.2|        2.0|
|   79.3|        1.0|
| 138.43|        1.0|
 ------- ----------- 
 

Если вы хотите вместо этого отобразить интервал:

 import pyspark.sql.functions as F

df2 = Bucketizer(
    splits=[-float('inf'), 0, 500, 1000, float('inf')],
    inputCol='numbers', 
    outputCol='numbers_bin'
).transform(df).withColumn(
    'numbers_bin',
    F.expr("""
        format_string(
            '%s, %s',
            array(-float('inf'), 0, 500, 1000, float('inf'))[int(numbers_bin) - 1],
            array(-float('inf'), 0, 500, 1000, float('inf'))[int(numbers_bin)])
    """)
)

df2.show()
 ------- -------------- 
|numbers|   numbers_bin|
 ------- -------------- 
| 142.56|-Infinity, 0.0|
|   null|    null, null|
|2023.33| 500.0, 1000.0|
| 477.76|-Infinity, 0.0|
| 175.52|-Infinity, 0.0|
|1737.45| 500.0, 1000.0|
| 520.72|    0.0, 500.0|
|  641.2|    0.0, 500.0|
|   79.3|-Infinity, 0.0|
| 138.43|-Infinity, 0.0|
 ------- -------------- 
 

Комментарии:

1. Как я могу указать интервал в столбце «numbers_bin», а не индекс? Спасибо

2. @espogian Я включил возможное решение. немного некрасиво, но, надеюсь, должно выполнить эту работу 🙂