Использование Pyspark для анализа строк JSON на наличие каждого значения в списке

#python #json #apache-spark #pyspark

#python #json #apache-spark #pyspark

Вопрос:

Я новичок в PySpark, и я изо всех сил пытаюсь найти, сколько вхождений каждого IP-адреса находится в следующем списке:

 sampleJson = [('{"user":100, "ips" : ["191.168.192.101", "191.168.192.103", "191.168.192.96", "191.168.192.99"]}',), ('{"user":101, "ips" : ["191.168.192.102", "191.168.192.105", "191.168.192.103", "191.168.192.107"]}',), ('{"user":102, "ips" : ["191.168.192.105", "191.168.192.101", "191.168.192.105", "191.168.192.107"]}',), ('{"user":103, "ips" : ["191.168.192.96", "191.168.192.100", "191.168.192.107", "191.168.192.101"]}',), ('{"user":104, "ips" : ["191.168.192.99", "191.168.192.99", "191.168.192.102", "191.168.192.99"]}',),('{"user":105, "ips" : ["191.168.192.99", "191.168.192.99", "191.168.192.100", "191.168.192.96"]}',),]
 

В идеале мне нужно, чтобы результаты выглядели примерно так:

ip количество
191.168.192.96 3
191.168.192.99 6
191.168.192.100 2
191.168.192.101 3
191.168.192.102 2
191.168.192.103 2
191.168.192.105 3
191.168.192.107 3

Я выполнил следующий код ниже, чтобы получить результаты в столбце с пользователем и в другом столбце, который показывает их IP-адреса, но теперь не может извлечь количество каждого IP-адреса в желаемый результат. Кто-нибудь может помочь?

 from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql import SparkSession
import json

json_df = spark.createDataFrame(sampleJson)

sch=StructType([StructField('user', StringType(), 
False),StructField('ips',ArrayType(StringType()))])

json_df = json_df.withColumn("n",from_json(col("_1"),sch)).select("n.*").show(10,False)

|user|ips                                                                 |
|:---|-------------------------------------------------------------------:|
|100 |[191.168.192.101, 191.168.192.103, 191.168.192.96, 191.168.192.99]  |
|101 |[191.168.192.102, 191.168.192.105, 191.168.192.103, 191.168.192.107]|
|102 |[191.168.192.105, 191.168.192.101, 191.168.192.105, 191.168.192.107]|
|103 |[191.168.192.96, 191.168.192.100, 191.168.192.107, 191.168.192.101] |
|104 |[191.168.192.99, 191.168.192.99, 191.168.192.102, 191.168.192.99]   |
|105 |[191.168.192.99, 191.168.192.99, 191.168.192.100, 191.168.192.96]   |
 

Ответ №1:

Вы можете использовать функцию explode для преобразования элементов массива в строки:

 json_df = spark.createDataFrame(sampleJson)

sch=StructType([StructField('user', StringType(), 
False),StructField('ips',ArrayType(StringType()))])

json_df = json_df.withColumn("n",from_json(col("_1"),sch)).select("n.*")

json_df = json_df 
    .withColumn('ip', explode("ips")) 
    .groupby('ip') 
    .agg(count('*').alias('count'))


json_df.show()
 

Комментарии:

1. К сожалению, я столкнулся со следующей проблемой при запуске вашего кода: AttributeError: объект ‘NoneType’ не имеет атрибута ‘withColumn’

2. вероятно, это связано с тем, что у вас уже есть действие в вашем коде, например .show(10,False) , я обновил пример и включил в него ваш код. Попробуйте еще раз