Добавление значений массива в фрейм данных spark в качестве нового столбца

#java #scala #dataframe #apache-spark

Вопрос:

Рассматривая фрейм данных spark employees с таким именем, как этот :

  ---------- -----  | name | age |  ---------- -----  | John | 32 | | Elizabeth| 28 | | Eric | 41 |  ---------- -----   

и массив строк state = ["LA", "AZ", "OH"] , я хочу добавить этот массив в df качестве нового столбца, чтобы фрейм данных выглядел следующим образом :

  ---------- ----- -------  | name | age | state |  ---------- ----- -------  | John | 32 | LA | | Elizabeth| 28 | AZ | | Eric | 41 | OH |  ---------- ----- -------   

Как я могу добиться этого в Scala (или Java, это почти одно и то же) ? Я видел только, как добавить одно и то же значение ко всем строкам в сети, и здесь мне нужны разные значения для каждой из них.

Спасибо вам ! 🙂

Комментарии:

1. Откуда ты знаешь, что Джон-это Лос-Анджелес, Элизабет-АЗ, а Эрик-О.? Приказ?

2. @jgp Я знаю/признаю, что значения массива находятся в том же порядке, что и строки во фрейме данных

3. Можете ли вы добавить номер строки и присоединиться к ним?

Ответ №1:

Поскольку spark работает в распределенном режиме, вы не можете добавлять значения на основе столбцов в массив с индексом. Предположим, что искра работает с двумя рабочими, и Джон и Элизабет доставляют работнику А, а Эрик доставляет работнику Б. Действительно, они разделятся при сохранении в фрейме данных. Рабочие не знают,каков индекс Джона, Элизабет или Эрика. Вы можете делать все, что хотите, просто в обычной одиночной программе java.

В вашем примере вам нужно преобразовать массив в фрейм данных и использовать join для объединения двух фреймов данных на основе столбца с одинаковым значением. Однако вы можете использовать crossJoin декартово произведение для своих таблиц.

 Datasetlt;Rowgt; ndf = df.crossJoin(df2);  

Если вам нужно просто добавить столбец с постоянным значением или значение на основе другого столбца в том же фрейме данных, используйте withColumn , как показано ниже:

 Datasetlt;Rowgt; ndf = df.withColumn("city",functions.lit(1)); Datasetlt;Rowgt; ndf = df.withColumn("city",functions.rand()); Datasetlt;Rowgt; ndf = df.withColumn("city",functions.col("name"));  

Наконец-то вы можете использовать Atomic подобным образом, чтобы получить то, что вы хотите. Я тестирую его в одиночном режиме spark.

 public static void main(String[] args) {  System.setProperty("hadoop.home.dir", "H:\work\HadoopWinUtils\");  SparkSession spark = SparkSession  .builder()  .master("local[*]")  .getOrCreate();   Datasetlt;Rowgt; df = spark.read().json("H:\work\HadoopWinUtils\people.json");   Listlt;Stringgt; city_array = Arrays.asList("LA", "AZ", "OH");  // Displays the content of the DataFrame to stdout  df.show();    df = df.withColumn("city",functions.col("name"));   AtomicInteger i= new AtomicInteger();   Datasetlt;Rowgt; df3 = df.map((MapFunctionlt;Row, Rowgt;) value -gt; {  return RowFactory.create(value.get(0),value.get(1),city_array.get(i.getAndIncrement()));  //return city_array.get(i.getAndIncrement());  }, RowEncoder.apply(df.schema()));   df3.show();  }  

Люди-это

  ---- -------  | age| name|  ---- -------  |null|Michael| | 30| Andy| | 19| Justin|  ---- -------   

и в результате получается

  ---- ------- ----  | age| name|city|  ---- ------- ----  |null|Michael| LA| | 30| Andy| AZ| | 19| Justin| OH|  ---- ------- ----   

Ответ №2:

Вы можете попробовать что-то подобное в пыспарке.

 gt;gt;gt; _TRANSFORMED_DF_SCHEMA = StructType([ ... StructField('name', StringType(), False), ... StructField('age', IntegerType(), False), ... StructField('id', IntegerType(), False), ... StructField('state', StringType(), False), ... ]) gt;gt;gt;  gt;gt;gt; state = ['LA', 'AZ', 'OH'] gt;gt;gt; data = (['John', 32], ['Eli', 28], ['Eric', 41]) gt;gt;gt; df = spark.createDataFrame(data, schema=['name', 'age']) gt;gt;gt; rdd1 = df.rdd.zipWithIndex() gt;gt;gt; df1 = rdd1.toDF() gt;gt;gt; df1.show()  ---------- ---  | _1| _2|  ---------- ---  |[John, 32]| 0| | [Eli, 28]| 1| |[Eric, 41]| 2|  ---------- ---   gt;gt;gt; df_final = df1.select(df1['_1']['name'].alias('name'), df1['_1']['age'].alias('age'), df1['_2'].alias('id')) gt;gt;gt; df_final.show()  ---- --- ---  |name|age| id|  ---- --- ---  |John| 32| 0| | Eli| 28| 1| |Eric| 41| 2|  ---- --- ---   gt;gt;gt; def add_state(row_dict): ... new_dict = dict() ... new_dict['name'] = row_dict['name'] ... new_dict['age'] = row_dict['age'] ... new_dict['id'] = row_dict['id'] ... new_dict['state'] = state[row_dict['id']] ... return new_dict ...  gt;gt;gt; df_rdd = df_final.rdd.map(add_state) gt;gt;gt; df_final = spark.createDataFrame(df_rdd, schema=_TRANSFORMED_DF_SCHEMA) gt;gt;gt; df_final.show()  ---- --- --- -----  |name|age| id|state|  ---- --- --- -----  |John| 32| 0| LA| | Eli| 28| 1| AZ| |Eric| 41| 2| OH|  ---- --- --- -----   

Комментарии:

1. Спасибо, я посмотрю на это, но работает ли это так же в Scala ? Я могу использовать Scala только для этого проекта.

2. Это будет работать в Scala, но вам придется внести некоторые синтаксические изменения.

3. попробуйте что-нибудь в этом роде. val df3 = df_final.map(строка=gt;{ val state_col = состояние[строка.getString(2)] }) val df3Map = df3.toDF(«state_col», «имя», «возраст», «идентификатор»)