Как мне превратить список объектов JSON в Spark dataframe в Code Workbook?

#palantir-foundry #foundry-code-workbooks

#palantir-foundry #foundry-code-workbooks

Вопрос:

Как я могу превратить этот список объектов JSON в Spark dataframe?

 [
  {
    '1': 'A', 
    '2': 'B'
  }, 
  {
    '1': 'A', 
    '3': 'C'
  }
] 
  

в

  1     2     3
 A     B     null
 A     null  C
  

Я пробовал spark.read.json(spark.sparkContext.parallelize(d)) и различные комбинации этого json.dumps(d) .

Ответ №1:

Мне пришлось убить этого дракона, чтобы импортировать проблемы JIRA. Они вернулись в виде набора данных объектов ответа, каждый из которых содержит внутренний массив объектов issue JSON.

Этот код работал как единое преобразование для получения правильно проанализированных объектов JSON в фрейме данных:

 import json
from pyspark.sql import Row
from pyspark.sql.functions import explode

def issues_enumerated(All_Issues_Paged):

    def generate_issue_row(input_row: Row) -> Row:
        """
        Generates a dataframe of each responses issue array as a single array record per-Row
        """
        d = input_row.asDict()
        resp_json = d['response']
        resp_obj = json.loads(resp_json)
        issues = list(map(json.dumps,resp_obj['issues']))

        return Row(issues=issues)
    
    # array-per-record
    unexploded_df = All_Issues_Paged.rdd.map(generate_issue_row).toDF()
    # row-per-record
    row_per_record_df = unexploded_df.select(explode(unexploded_df.issues))
    # raw JSON string per-record RDD
    issue_json_strings_rdd = row_per_record_df.rdd.map(lambda _: _.col)
    # JSON object dataframe
    issues_df = spark.read.json(issue_json_strings_rdd)
    issues_df.printSchema()
    return issues_df
  

Схема слишком большая, чтобы ее показывать, но вот фрагмент:

 root
 |-- expand: string (nullable = true)
 |-- fields: struct (nullable = true)
 |    |-- aggregateprogress: struct (nullable = true)
 |    |    |-- percent: long (nullable = true)
 |    |    |-- progress: long (nullable = true)
 |    |    |-- total: long (nullable = true)
 |    |-- aggregatetimeestimate: long (nullable = true)
 |    |-- aggregatetimeoriginalestimate: long (nullable = true)
 |    |-- aggregatetimespent: long (nullable = true)
 |    |-- assignee: struct (nullable = true)
 |    |    |-- accountId: string (nullable = true)
 |    |    |-- accountType: string (nullable = true)
 |    |    |-- active: boolean (nullable = true)
 |    |    |-- avatarUrls: struct (nullable = true)
 |    |    |    |-- 16x16: string (nullable = true)
 |    |    |    |-- 24x24: string (nullable = true)
 |    |    |    |-- 32x32: string (nullable = true)
 |    |    |    |-- 48x48: string (nullable = true)
 |    |    |-- displayName: string (nullable = true)
 |    |    |-- emailAddress: string (nullable = true)
 |    |    |-- self: string (nullable = true)
 |    |    |-- timeZone: string (nullable = true)
 |    |-- components: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- description: string (nullable = true)
 |    |    |    |-- id: string (nullable = true)
 |    |    |    |-- name: string (nullable = true)
 |    |    |    |-- self: string (nullable = true)
 |    |-- created: string (nullable = true)
 |    |-- creator: struct (nullable = true)
 |    |    |-- accountId: string (nullable = true)
 |    |    |-- accountType: string (nullable = true)
 |    |    |-- active: boolean (nullable = true)
 |    |    |-- avatarUrls: struct (nullable = true)
 |    |    |    |-- 16x16: string (nullable = true)
 |    |    |    |-- 24x24: string (nullable = true)
 |    |    |    |-- 32x32: string (nullable = true)
 |    |    |    |-- 48x48: string (nullable = true)
 |    |    |-- displayName: string (nullable = true)
 |    |    |-- emailAddress: string (nullable = true)
 |    |    |-- self: string (nullable = true)
 |    |    |-- timeZone: string (nullable = true)
 |    |-- customfield_10000: string (nullable = true)
 |    |-- customfield_10001: struct (nullable = true)
 |    |    |-- id: string (nullable = true)
 |    |    |-- isShared: boolean (nullable = true)
 |    |    |-- title: string (nullable = true)
 |    |-- customfield_10002: string (nullable = true)
 |    |-- customfield_10003: string (nullable = true)
 |    |-- customfield_10004: struct (nullable = true)
 |    |    |-- id: string (nullable = true)
 |    |    |-- self: string (nullable = true)
 |    |    |-- value: string (nullable = true)
 |    |-- customfield_10005: string (nullable = true)
 |    |-- customfield_10006: string (nullable = true)
 |    |-- customfield_10007: string (nullable = true)
 |    |-- customfield_10008: struct (nullable = true)
 |    |    |-- data: struct (nullable = true)
 |    |    |    |-- id: long (nullable = true)
 |    |    |    |-- issueType: struct (nullable = true)
 |    |    |    |    |-- iconUrl: string (nullable = true)
 |    |    |    |    |-- id: string (nullable = true)
 |    |    |    |-- key: string (nullable = true)
 |    |    |    |-- keyNum: long (nullable = true)
 |    |    |    |-- projectId: long (nullable = true)
 |    |    |    |-- summary: string (nullable = true)
 |    |    |-- hasEpicLinkFieldDependency: boolean (nullable = true)
 |    |    |-- nonEditableReason: struct (nullable = true)
 |    |    |    |-- message: string (nullable = true)
 |    |    |    |-- reason: string (nullable = true)
 |    |    |-- showField: boolean (nullable = true)
 |    |-- customfield_10009: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- boardId: long (nullable = true)
 |    |    |    |-- completeDate: string (nullable = true)
 |    |    |    |-- endDate: string (nullable = true)
 |    |    |    |-- goal: string (nullable = true)
 |    |    |    |-- id: long (nullable = true)
 |    |    |    |-- name: string (nullable = true)
 |    |    |    |-- startDate: string (nullable = true)
 |    |    |    |-- state: string (nullable = true)

...
  

Ответ №2:

Вы можете использовать spark.createDataFrame(d) для получения желаемого эффекта.

Вы получаете предупреждение об устаревании о выводе схемы из словарей, поэтому «правильный» способ сделать это — сначала создать строки:

 from pyspark.sql import Row
data = [{'1': 'A', '2': 'B'}, {'1': 'A', '3': 'C'}]
schema = ['1', '2', '3']
rows = []
for d in data:
    dict_for_row = {k: d.get(k,None) for k in schema}
    rows.append(Row(**dict_for_row))
  

затем создайте фрейм данных:

 df = spark.createDataFrame(row)