Сохранение фрейма данных pyspark со сложной схемой в виде обычного текста для тестирования

#pyspark

#pyspark

Вопрос:

Как мне создать чистые тестовые данные для pyspark? Я кое-что выяснил, что кажется довольно хорошим, но некоторые части кажутся немного неуклюжими, поэтому я публикую.

Допустим, у меня есть фрейм данных df со сложной схемой и небольшим количеством строк. Я хочу, чтобы тестовые данные были сохранены в моем репозитории. Мне не нужен двоичный файл. На данный момент я не уверен, что лучший способ продолжить — но я думаю, что у меня есть файл, подобный

 test_fn.py
  

и в нем есть вот это

 schema_str='struct<eventTimestamp:timestamp,list_data:array<struct<valueA:string,valueB:string,valueC:boolean>>>' 
  

чтобы получить схему в формате txt, используйте df.schema.simpleString() функцию. Затем, чтобы получить строки — я делаю

 lns = [row.json_txt for row in df.select((F.to_json(F.struct('*'))).alias('json_txt')).collect()]
  

теперь я помещаю эти строки в свой test_fn.py файл, или у меня мог бы быть .json файл в репозитории.

Теперь, чтобы запустить тест, я должен создать фрейм данных с правильной схемой и данными из этого текста. Кажется, единственный способ, которым spark проанализирует простую строку, — это если я создам с ней dataframe, то есть я не смогу передать эту простую строку from_json функции? Итак, это немного неудобно, поэтому я решил опубликовать —

 schema2 = spark.createDataFrame(data=[], schema=schema_str).schema
lns = # say I read the lns back from above 
df_txt = spark.createDataFrame(data=lns, schema=T.StringType())
  

Я вижу, что в df_txt есть только один столбец с именем ‘value’

 df_json = df_txt.select(F.from_json('value', schema=schema2).alias('xx'))
sel = ['xx.%s' % nm for nm in df_json.select('xx').schema.fields[0].dataType.fieldNames()]
df2 = df_json.select(*sel)
  

Теперь df2 должно быть то же самое, что и df1 — что, как я вижу, имеет место в deepdiff модуле.