#java #scala #dataframe #apache-spark
Вопрос:
Рассматривая фрейм данных spark employees
с таким именем, как этот :
---------- ----- | name | age | ---------- ----- | John | 32 | | Elizabeth| 28 | | Eric | 41 | ---------- -----
и массив строк state = ["LA", "AZ", "OH"]
, я хочу добавить этот массив в df
качестве нового столбца, чтобы фрейм данных выглядел следующим образом :
---------- ----- ------- | name | age | state | ---------- ----- ------- | John | 32 | LA | | Elizabeth| 28 | AZ | | Eric | 41 | OH | ---------- ----- -------
Как я могу добиться этого в Scala (или Java, это почти одно и то же) ? Я видел только, как добавить одно и то же значение ко всем строкам в сети, и здесь мне нужны разные значения для каждой из них.
Спасибо вам ! 🙂
Комментарии:
1. Откуда ты знаешь, что Джон-это Лос-Анджелес, Элизабет-АЗ, а Эрик-О.? Приказ?
2. @jgp Я знаю/признаю, что значения массива находятся в том же порядке, что и строки во фрейме данных
3. Можете ли вы добавить номер строки и присоединиться к ним?
Ответ №1:
Поскольку spark работает в распределенном режиме, вы не можете добавлять значения на основе столбцов в массив с индексом. Предположим, что искра работает с двумя рабочими, и Джон и Элизабет доставляют работнику А, а Эрик доставляет работнику Б. Действительно, они разделятся при сохранении в фрейме данных. Рабочие не знают,каков индекс Джона, Элизабет или Эрика. Вы можете делать все, что хотите, просто в обычной одиночной программе java.
В вашем примере вам нужно преобразовать массив в фрейм данных и использовать join
для объединения двух фреймов данных на основе столбца с одинаковым значением. Однако вы можете использовать crossJoin
декартово произведение для своих таблиц.
Datasetlt;Rowgt; ndf = df.crossJoin(df2);
Если вам нужно просто добавить столбец с постоянным значением или значение на основе другого столбца в том же фрейме данных, используйте withColumn
, как показано ниже:
Datasetlt;Rowgt; ndf = df.withColumn("city",functions.lit(1)); Datasetlt;Rowgt; ndf = df.withColumn("city",functions.rand()); Datasetlt;Rowgt; ndf = df.withColumn("city",functions.col("name"));
Наконец-то вы можете использовать Atomic подобным образом, чтобы получить то, что вы хотите. Я тестирую его в одиночном режиме spark.
public static void main(String[] args) { System.setProperty("hadoop.home.dir", "H:\work\HadoopWinUtils\"); SparkSession spark = SparkSession .builder() .master("local[*]") .getOrCreate(); Datasetlt;Rowgt; df = spark.read().json("H:\work\HadoopWinUtils\people.json"); Listlt;Stringgt; city_array = Arrays.asList("LA", "AZ", "OH"); // Displays the content of the DataFrame to stdout df.show(); df = df.withColumn("city",functions.col("name")); AtomicInteger i= new AtomicInteger(); Datasetlt;Rowgt; df3 = df.map((MapFunctionlt;Row, Rowgt;) value -gt; { return RowFactory.create(value.get(0),value.get(1),city_array.get(i.getAndIncrement())); //return city_array.get(i.getAndIncrement()); }, RowEncoder.apply(df.schema())); df3.show(); }
Люди-это
---- ------- | age| name| ---- ------- |null|Michael| | 30| Andy| | 19| Justin| ---- -------
и в результате получается
---- ------- ---- | age| name|city| ---- ------- ---- |null|Michael| LA| | 30| Andy| AZ| | 19| Justin| OH| ---- ------- ----
Ответ №2:
Вы можете попробовать что-то подобное в пыспарке.
gt;gt;gt; _TRANSFORMED_DF_SCHEMA = StructType([ ... StructField('name', StringType(), False), ... StructField('age', IntegerType(), False), ... StructField('id', IntegerType(), False), ... StructField('state', StringType(), False), ... ]) gt;gt;gt; gt;gt;gt; state = ['LA', 'AZ', 'OH'] gt;gt;gt; data = (['John', 32], ['Eli', 28], ['Eric', 41]) gt;gt;gt; df = spark.createDataFrame(data, schema=['name', 'age']) gt;gt;gt; rdd1 = df.rdd.zipWithIndex() gt;gt;gt; df1 = rdd1.toDF() gt;gt;gt; df1.show() ---------- --- | _1| _2| ---------- --- |[John, 32]| 0| | [Eli, 28]| 1| |[Eric, 41]| 2| ---------- --- gt;gt;gt; df_final = df1.select(df1['_1']['name'].alias('name'), df1['_1']['age'].alias('age'), df1['_2'].alias('id')) gt;gt;gt; df_final.show() ---- --- --- |name|age| id| ---- --- --- |John| 32| 0| | Eli| 28| 1| |Eric| 41| 2| ---- --- --- gt;gt;gt; def add_state(row_dict): ... new_dict = dict() ... new_dict['name'] = row_dict['name'] ... new_dict['age'] = row_dict['age'] ... new_dict['id'] = row_dict['id'] ... new_dict['state'] = state[row_dict['id']] ... return new_dict ... gt;gt;gt; df_rdd = df_final.rdd.map(add_state) gt;gt;gt; df_final = spark.createDataFrame(df_rdd, schema=_TRANSFORMED_DF_SCHEMA) gt;gt;gt; df_final.show() ---- --- --- ----- |name|age| id|state| ---- --- --- ----- |John| 32| 0| LA| | Eli| 28| 1| AZ| |Eric| 41| 2| OH| ---- --- --- -----
Комментарии:
1. Спасибо, я посмотрю на это, но работает ли это так же в Scala ? Я могу использовать Scala только для этого проекта.
2. Это будет работать в Scala, но вам придется внести некоторые синтаксические изменения.
3. попробуйте что-нибудь в этом роде. val df3 = df_final.map(строка=gt;{ val state_col = состояние[строка.getString(2)] }) val df3Map = df3.toDF(«state_col», «имя», «возраст», «идентификатор»)