#pyspark
#pyspark
Вопрос:
Как мне создать чистые тестовые данные для pyspark? Я кое-что выяснил, что кажется довольно хорошим, но некоторые части кажутся немного неуклюжими, поэтому я публикую.
Допустим, у меня есть фрейм данных df
со сложной схемой и небольшим количеством строк. Я хочу, чтобы тестовые данные были сохранены в моем репозитории. Мне не нужен двоичный файл. На данный момент я не уверен, что лучший способ продолжить — но я думаю, что у меня есть файл, подобный
test_fn.py
и в нем есть вот это
schema_str='struct<eventTimestamp:timestamp,list_data:array<struct<valueA:string,valueB:string,valueC:boolean>>>'
чтобы получить схему в формате txt, используйте df.schema.simpleString()
функцию. Затем, чтобы получить строки — я делаю
lns = [row.json_txt for row in df.select((F.to_json(F.struct('*'))).alias('json_txt')).collect()]
теперь я помещаю эти строки в свой test_fn.py
файл, или у меня мог бы быть .json
файл в репозитории.
Теперь, чтобы запустить тест, я должен создать фрейм данных с правильной схемой и данными из этого текста. Кажется, единственный способ, которым spark проанализирует простую строку, — это если я создам с ней dataframe, то есть я не смогу передать эту простую строку from_json
функции? Итак, это немного неудобно, поэтому я решил опубликовать —
schema2 = spark.createDataFrame(data=[], schema=schema_str).schema
lns = # say I read the lns back from above
df_txt = spark.createDataFrame(data=lns, schema=T.StringType())
Я вижу, что в df_txt есть только один столбец с именем ‘value’
df_json = df_txt.select(F.from_json('value', schema=schema2).alias('xx'))
sel = ['xx.%s' % nm for nm in df_json.select('xx').schema.fields[0].dataType.fieldNames()]
df2 = df_json.select(*sel)
Теперь df2
должно быть то же самое, что и df1
— что, как я вижу, имеет место в deepdiff
модуле.