#python #apache-spark #pyspark
#питон #apache-искра #пыспарк
Вопрос:
Когда я создал таблицу hive, данные были следующими.
файл данных
lt;__name__gt;abc lt;__code__gt;1 lt;__value__gt;1234 lt;__name__gt;abcdef lt;__code__gt;2 lt;__value__gt;12345 lt;__name__gt;abcdef lt;__code__gt;2 lt;__value__gt;12345 1234156321 lt;__name__gt;abcdef lt;__code__gt;2 lt;__value__gt;12345 ...
Могу ли я создать таблицу сразу же, не конвертируя файл? Это обычный текстовый файл, три столбца повторяются.
Как конвертировать фрейм данных? или csv-файл?
Я хочу
| name | code | value | abc | 1 | 1234 | abcdef | 2 | 12345 ...
или
abc,1,1234 abcdef,2,12345 ...
Ответ №1:
Я решил свою проблему вот так.
data = spark.read.text(path) rows = data.rdd.zipWithIndex().map(lambda x: Row(x[0].value, int(x[1]/3))) schema = StructType() .add("col1",StringType(), False) .add("record_pos",IntegerType(), False) df = spark.createDataFrame(rows, schema) df1 = df.withColumn("key", regexp_replace(split(df["col1"], '__gt;')[0], 'lt;|__', '')) .withColumn("value", regexp_replace(regexp_replace(split(df["col1"], '__gt;')[1], 'n', 'lt;NLgt;'), 't', 'lt;TABgt;')) dataframe = df1.groupBy("record_pos").pivot("key").agg(first("value")).drop("record_pos") dataframe.show()
Ответ №2:
val path = "file:///C:/stackqustions/data/stackq5.csv" val data = sc.textFile(path) import spark.implicits._ val rdd = data.zipWithIndex.map { case (records, index) =gt; Row(records, index / 3) } val schema = new StructType().add("col1", StringType, false).add("record_pos", LongType, false) val df = spark.createDataFrame(rdd, schema) val df1 = df .withColumn("key", regexp_replace(split($"col1", "gt;")(0), "lt;|__", "")) .withColumn("value", split($"col1", "gt;")(1)).drop("col1") df1.groupBy("record_pos").pivot("key").agg(first($"value")).drop("record_pos").show
Результат:
---- ------ ----- |code| name|value| ---- ------ ----- | 1| abc| 1234| | 2|abcdef|12345| | 2|abcdef|12345| | 2|abcdef|12345| ---- ------ -----
Комментарии:
1. не все свободно говорят на языке скала. Даже если он выглядит как pyspark, некоторым пользователям может быть сложно адаптировать код.