Подсчитывать значение ключа, которое соответствует определенному значению в фрейме данных pyspark, используя udf

#python #apache-spark #dictionary #pyspark #apache-spark-sql

#python #apache-spark #словарь #pyspark #apache-spark-sql

Вопрос:

У меня есть фрейм данных pyspark, в котором есть столбец, значением которого является string json. Как я могу подсчитать значение, соответствующее определенному значению в списке внутри словаря, и представить отчет в виде столбца? И я хотел бы сделать это с помощью функции Python и pyspark udf.

Например, ниже приведен фрейм данных, df:

  --------------------------------------------------------------------------- 
|col                                                                        |
 --------------------------------------------------------------------------- 
|{"field":{"list":[{"item":1,"upgrade":false},{"item":2,"upgrade":true}]}}
 ---------------------------------------------------------------------------- 
|{"field":{"list":[{"item":1,"upgrade":false},{"item":2,"upgrade":false}]}}
 -------------------------------------------------------------------------- 
 

Что я пытался сделать:

 def upgrade_false(doc):
    string = str(doc) 
    return string.count('"upgrade":false')

df2= df.withColumn('upgrade_false', (F.udf(lambda j: upgrade_false(json.loads(j)),t.StringType()))('col'))
 

Но это не работает. Кто-нибудь может объяснить, что может быть не так?

Идеальный результат выглядит следующим образом:

  --------------------------------------------------------------------------- ---------------- 
|col                                                                        | upgrade_false
 --------------------------------------------------------------------------- ----------------- 
|{"field":{"list":[{"item":1,"upgrade":false},{"item":2,"upgrade":true}]}}   | 1
 ---------------------------------------------------------------------------- ---------------- 
|{"field":{"list":[{"item":1,"upgrade":false},{"item":2,"upgrade":false}]}}  | 2
 ---------------------------------------------------------------------------- ----------------- 
 

Ответ №1:

json.loads изменяет вашу строку на 'upgrade': False вместо "upgrade":false , так что вы не сможете получить никаких совпадений.

 >>> str(json.loads('{"field":{"list":[{"item":1,"upgrade":false},{"item":2,"upgrade":true}]}}'))
"{'field': {'list': [{'item': 1, 'upgrade': False}, {'item': 2, 'upgrade': True}]}}"
 

Вместо этого попробуйте udf ниже, который подсчитывает правильную строку:

 df2 = df.withColumn(
    'upgrade_false',
    F.udf(lambda j: str(json.loads(j)).count("'upgrade': False"))('col')
)
 

Комментарии:

1. как ни странно, я все еще получаю нулевое количество

2. @kihhfeue не могли бы вы показать результаты str(json.loads(df.select('col').collect()[0][0])) ?

3. Я сделал это, и задание spark выполнялось долгое время и до сих пор не завершено…

4. о, извините, я не должен был использовать collect . как насчет замены collect() на take(1) ?

5. @kihhfeue Вы сказали, что получаете нулевое количество. Не могли бы вы показать строки фрейма данных, которые показывают нулевое количество? пожалуйста, укажите их, отредактировав вопрос, а не в комментариях