#python-3.x #amazon-web-services #pyspark
#python-3.x #amazon-веб-сервисы #pyspark
Вопрос:
Я работаю над очень большим набором данных под названием Reddit на AWS. Сначала я прочитал небольшой пример по :
file_lzo = sc.newAPIHadoopFile("s3://mv559/reddit/sample-data/",
"com.hadoop.mapreduce.LzoTextInputFormat",
"org.apache.hadoop.io.LongWritable",
"org.apache.hadoop.io.Text")
Итак, я вызвал rdd file_lzo
. Я нажимаю на первый элемент, и данные выглядят так:
[(0,
'{"archived":false,"author":"TistedLogic","author_created_utc":1312615878,"author_flair_background_color":null,"author_flair_css_class":null,"author_flair_richtext":[],"author_flair_template_id":null,"author_flair_text":null,"author_flair_text_color":null,"author_flair_type":"text","author_fullname":"t2_5mk6v","author_patreon_flair":false,"body":"Is it still r\/BoneAppleTea worthy if it's the opposite?","can_gild":true,"can_mod_post":false,"collapsed":false,"collapsed_reason":null,"controversiality":0,"created_utc":1538352000,"distinguished":null,"edited":false,"gilded":0,"gildings":{"gid_1":0,"gid_2":0,"gid_3":0},"id":"e6xucdd","is_submitter":false,"link_id":"t3_9ka1hp","no_follow":true,"parent_id":"t1_e6xu13x","permalink":"\/r\/Unexpected\/comments\/9ka1hp\/jesus_fking_woah\/e6xucdd\/","removal_reason":null,"retrieved_on":1539714091,"score":2,"send_replies":true,"stickied":false,"subreddit":"Unexpected","subreddit_id":"t5_2w67q","subreddit_name_prefixed":"r\/Unexpected","subreddit_type":"public"}')]
Затем я создаю фрейм данных из этого rdd с помощью
df = spark.createDataFrame(file_lzo,['idx','map_col'])
df.show(4)
Это выглядит так
----- --------------------
| idx| map_col|
----- --------------------
| 0|{"archived":false...|
|70139|{"archived":false...|
|70139|{"archived":false...|
|70139|{"archived":false...|
----- --------------------
only showing top 4 rows
И, наконец, я хочу получить данные в формате dataframe, который выглядит следующим образом, и сохранить его как формат parquet в S3 для будущих шагов.
Я попытался создать схему, а затем использовать read.json
, однако я получил, что все значения равны нулю
fields = [StructField("archived", BooleanType(), True),
StructField("author", StringType(), True),
StructField("author_flair_css_class", StringType(), True),
StructField("author_flair_text", StringType(), True),
StructField("body", StringType(), True),
StructField("can_gild", BooleanType(), True),
StructField("controversiality", LongType(), True),
StructField("created_utc", StringType(), True),
StructField("distinguished", StringType(), True),
StructField("edited", StringType(), True),
StructField("gilded", LongType(), True),
StructField("id", StringType(), True),
StructField("is_submitter", StringType(), True),
StructField("link_id", StringType(), True),
StructField("parent_id", StringType(), True),
StructField("permalink", StringType(), True),
StructField("permalink", StringType(), True),
StructField("removal_reason", StringType(), True),
StructField("retrieved_on", LongType(), True),
StructField("score",LongType() , True),
StructField("stickied", BooleanType(), True),
StructField("subreddit", StringType(), True),
StructField("subreddit_id", StringType(), True)]
schema = StructType(fields)
-------- ------ ---------------------- ----------------- ---- -------- ---------------- ----------- ------------- ------ ------ ---- ------------ ------- --------- --------- --------- -------------- ------------ ----- -------- --------- ------------
|archived|author|author_flair_css_class|author_flair_text|body|can_gild|controversiality|created_utc|distinguished|edited|gilded| id|is_submitter|link_id|parent_id|permalink|permalink|removal_reason|retrieved_on|score|stickied|subreddit|subreddit_id|
-------- ------ ---------------------- ----------------- ---- -------- ---------------- ----------- ------------- ------ ------ ---- ------------ ------- --------- --------- --------- -------------- ------------ ----- -------- --------- ------------
| null| null| null| null|null| null| null| null| null| null| null|null| null| null| null| null| null| null| null| null| null| null| null|
| null| null| null| null|null| null| null| null| null| null| null|null| null| null| null| null| null| null| null| null| null| null| null|
| null| null| null| null|null| null| null| null| null| null| null|null| null| null| null| null| null| null| null| null| null| null| null|
-------- ------ ---------------------- ----------------- ---- -------- ---------------- ----------- ------------- ------ ------ ---- ------------ ------- --------- --------- --------- -------------- ------------ ----- -------- --------- ------------
Ответ №1:
Глядя на желаемый результат, вы могли бы рассматривать свой json как столбец MapType(), а затем извлекать из него свои столбцы.
Начните создавать фрейм данных:
my_rdd = [(0, {"author": "abc", "id": "012", "archived": "False"}),
(1, {"author": "bcd", "id": "013", "archived": "False"}),
(2, {"author": "cde", "id": "014", "archived": "True"}),
(3, {"author": "edf", "id": "015", "archived": "False"})]
df = sqlContext.createDataFrame(my_rdd,['idx','map_col'])
df.show()
# --- --------------------
# |idx| map_col|
# --- --------------------
# | 0|Map(id -> 012, au...|
# | 1|Map(id -> 013, au...|
# | 2|Map(id -> 014, au...|
# | 3|Map(id -> 015, au...|
# --- --------------------
Затем, если вы заранее не знаете, какие ключи вы хотите извлечь, соберите один и получите ключи, например, выполнив:
from pyspark.sql import functions as f
one = df.select(f.col('map_col')).rdd.take(1)
my_dict = one[0][0].keys()
my_dict
# dict_keys(['id', 'author', 'archived'])
Если вы уже знаете список ключей, используйте его напрямую.
Следовательно, вы можете сгладить столбец карты, выполнив:
keep_cols = [f.col('map_col').getItem(k).alias(k) for k in my_dict]
df.select(keep_cols).show()
# --- ------ --------
#| id|author|archived|
# --- ------ --------
#|012| abc| False|
#|013| bcd| False|
#|014| cde| True|
#|015| edf| False|
# --- ------ --------
Методы getItem()
и alias()
творят волшебство: первый извлекает выбранный ключ из столбца карты, а второй переименовывает полученный столбец по желанию.
Комментарии:
1. Привет, спасибо за ваш ответ. Я пробовал это и на шаге, чтобы получить ключи. У меня ошибка, в которой говорится, что объект ‘str’ не имеет атрибута ‘keys’. Поэтому я не могу получить ключи. Кстати, можем ли мы преобразовать этот вид rdd в желаемый формат напрямую, установив правильную схему при создании фрейма данных? большое вам спасибо
2. Кажется, ваши данные хранятся не в формате json, а в виде строк, или, может быть, вам просто нужно
my_dict = one[0].keys()
вместоmy_dict = one[0][0].keys()
3. Если вместо этого json dict сохраняется как строка, вы можете попробовать изменить его на dictionary с помощью
json.loads
. Пожалуйста, также предоставьте новый образец с точным воспроизведением ваших фактических данных (посмотрите, как мне пришлось изменить ваш образец, чтобы его можно было импортировать в pyspark без SyntaxError).4. Вы можете получить тот же фрейм данных, что и предыдущий,
df
также выполнивfrom pyspark.sql import Row
иsc.parallelize(my_rdd).map(lambda x: Row(x[1])).toDF()
. Результирующий фрейм данных по-прежнему имеет MapType() и должен быть преобразован, как показано ранее.5. Привет, спасибо за ваше объяснение. Я думаю, может быть, я недостаточно ясен, поэтому вызываю некоторые путаницы. Я обновляю свои вопросы, чтобы вы могли видеть мои реальные данные и мою предыдущую процедуру. Я попробую ваши предложения позже и дам вам обратную связь 🙂 большое вам спасибо!