Фильтрует вложенную структуру JSON и получает имена полей в качестве значений в Pyspark

#python #apache-spark #pyspark #apache-spark-sql #pyspark-dataframes

#python #apache-spark #pyspark #apache-spark-sql #pyspark-фреймы данных

Вопрос:

У меня есть следующие сложные данные, которые хотелось бы проанализировать в PySpark:

 records = '[{"segmentMembership":{"ups":{"FF6KCPTR6AQ0836R":{"lastQualificationTime":"2021-01-16 22:05:11.074357","status":"exited"},"QMS3YRT06JDEUM8O":{"lastQualificationTime":"2021-01-16 22:05:11.074357","status":"realized"},"8XH45RT87N6ZV4KQ":{"lastQualificationTime":"2021-01-16 22:05:11.074357","status":"exited"}}},"_aepgdcdevenablement2":{"emailId":{"address":"stuff@someemail.com"},"person":{"name":{"firstName":"Name2"}},"identities":{"customerid":"PH25PEUWOTA7QF93"}}},{"segmentMembership":{"ups":{"FF6KCPTR6AQ0836R":{"lastQualificationTime":"2021-01-16 22:05:11.074457","status":"realized"},"D45TOO8ZUH0B7GY7":{"lastQualificationTime":"2021-01-16 22:05:11.074457","status":"realized"},"QMS3YRT06JDEUM8O":{"lastQualificationTime":"2021-01-16 22:05:11.074457","status":"existing"}}},"_aepgdcdevenablement2":{"emailId":{"address":"stuff4@someemail.com"},"person":{"name":{"firstName":"TestName"}},"identities":{"customerid":"9LAIHVG91GCREE3Z"}}}]'
df = spark.read.json(sc.parallelize([records]))
df.show()
df.printSchema()
 

Проблема, с которой я сталкиваюсь, связана с segmentMembership объектом. Объект JSON выглядит следующим образом:

 "segmentMembership": {
      "ups": {
        "FF6KCPTR6AQ0836R": {
          "lastQualificationTime": "2021-01-16 22:05:11.074357",
          "status": "exited"
        },
        "QMS3YRT06JDEUM8O": {
          "lastQualificationTime": "2021-01-16 22:05:11.074357",
          "status": "realized"
        },
        "8XH45RT87N6ZV4KQ": {
          "lastQualificationTime": "2021-01-16 22:05:11.074357",
          "status": "exited"
        }
      }
    }
 

Самое неприятное в этом то, что ключевые значения ("FF6KCPTR6AQ0836R", "QMS3YRT06JDEUM8O", "8XH45RT87N6ZV4KQ") в конечном итоге определяются как столбец в pyspark.

В конце концов, если статус сегмента «exited», я надеялся получить следующие результаты.

  -------------------- ---------------- --------- ------------------ 
|address             |customerid      |firstName|segment_id        |
 -------------------- ---------------- --------- ------------------ 
|stuff@someemail.com |PH25PEUWOTA7QF93|Name2    |[8XH45RT87N6ZV4KQ]|
|stuff4@someemail.com|9LAIHVG91GCREE3Z|TestName |[8XH45RT87N6ZV4KQ]|
 -------------------- ---------------- --------- ------------------ 
 

После загрузки данных в dataframe (выше) я попробовал следующее:

 dfx = df.select("_aepgdcdevenablement2.emailId.address", "_aepgdcdevenablement2.identities.customerid", "_aepgdcdevenablement2.person.name.firstName", "segmentMembership.ups")
dfx.show(truncate=False)

seg_list = array(*[lit(k) for k in ["8XH45RT87N6ZV4KQ", "QMS3YRT06JDEUM8O"]])
print(seg_list)

# if v["status"] in ['existing', 'realized']

def confusing_compare(ups, seg_list):
    seg_id_filtered_d = dict((k, ups[k]) for k in seg_list if k in ups)

    # This is the line I am having a problem with.
    # seg_id_status_filtered_d = {key for key, value in seg_id_filtered_d.items() if v["status"] in ['existing', 'realized']}
       
    return list(seg_id_filtered_d)

final_conf_dx_pred = udf(confusing_compare, ArrayType(StringType()))
result_df = dfx.withColumn("segment_id", final_conf_dx_pred(dfx.ups, seg_list)).select("address", "customerid", "firstName", "segment_id")

result_df.show(truncate=False)
 

Я не могу проверить поле состояния в поле значения dic.

Ответ №1:

На самом деле вы можете сделать это без использования UDF. Здесь я использую все имена сегментов, присутствующие в схеме, и отфильтровываю их status = 'exited' . Вы можете адаптировать его в зависимости от того, какие сегменты и статус вы хотите.

Сначала, используя поля схемы, получите список всех имен сегментов следующим образом:

 segment_names = df.select("segmentMembership.ups.*").schema.fieldNames()
 

Затем, пройдя по списку, созданному выше, и используя when функцию, вы можете создать столбец, который может иметь либо segment_name значение, либо значение null в зависимости от status :

 active_segments = [
        when(col(f"segmentMembership.ups.{c}.status") != lit("exited"), lit(c)) 
        for c in segment_names
]
 

Наконец, добавьте новый столбец segments типа массива и используйте filter функцию для удаления нулевых элементов из массива (что соответствует статусу 'exited' ):

 dfx = df.withColumn("segments", array(*active_segments)) 
        .withColumn("segments", expr("filter(segments, x -> x is not null)")) 
        .select(
        col("_aepgdcdevenablement2.emailId.address"),
        col("_aepgdcdevenablement2.identities.customerid"),
        col("_aepgdcdevenablement2.person.name.firstName"),
        col("segments").alias("segment_id")
    )

dfx.show(truncate=False)

# -------------------- ---------------- --------- ------------------------------------------------------ 
#|address             |customerid      |firstName|segment_id                                            |
# -------------------- ---------------- --------- ------------------------------------------------------ 
#|stuff@someemail.com |PH25PEUWOTA7QF93|Name2    |[QMS3YRT06JDEUM8O]                                    |
#|stuff4@someemail.com|9LAIHVG91GCREE3Z|TestName |[D45TOO8ZUH0B7GY7, FF6KCPTR6AQ0836R, QMS3YRT06JDEUM8O]|
# -------------------- ---------------- --------- ------------------------------------------------------ 
 

Комментарии:

1. ВАУ… Спасибо @blackbishop, это сработало. Это отличный метод, позволяющий делать это без UDFs.

2. спасибо за вашу помощь. основываясь на приведенном выше результате, я разнес столбец segment_id. Каков наилучший подход для добавления соответствующего значения status и lastQualificationTime в новый столбец?

3. @user422930 вы можете изменить приведенный выше код, чтобы при создании when выражения вы возвращали struct(lit(c).alias("segment_id"), col("segmentMembership.ups.{c}.status"), col("segmentMembership.ups.{c}.lastQualificationTime")) вместо just lit(c) . В конечном результате у вас будет массив struct , затем вы можете разбить его и выделить столбцы struct.

4. @user422930 еще раз спасибо за вашу помощь. Это было очень полезно и дало мне отличную возможность для обучения.