Сравнить значение столбца в одной строке с тем же значением столбца в предыдущей строке с тем же фреймом данных в Spark

#java #apache-spark

#java #apache-spark

Вопрос:

У меня есть фрейм данных spark с разными столбцами.

 tid | acct | bssn | name | 
-----------------------------
1  |  123 |  111 | Peter
2  |  123 |  222 | Paul
3  |  456 |  333 | John
4  |  567  | 444 | Casey
  

Я пытаюсь сравнить значения столбца account, если они совпадают, bssn и tid должны быть объединены в набор. Как мне выполнить этот spark, чтобы результирующий фрейм данных выглядел следующим образом:

 acct | bssn | name | 
-----------------------------
123 |  (111,222) | (Peter,Paul)
456 |  333 | John
567  | 444 | Casey
  

Комментарии:

1. Запрос к базе данных не может этого сделать. вам нужна программа для выполнения. Это проще, чем выбирать из базы данных.

Ответ №1:

Вы могли бы попробовать выполнить полусоединение слева в столбце. Это будет выглядеть как:

 Dataset<Row> joinedDf = df
    .join(
        rightDf,
        df.col("acct").equalTo(rightDf.col("acct2")),
        "leftsemi")
    .drop(rightDf.col("acct2"))
    .drop(rightDf.col("name2"))
    .drop(rightDf.col("bssn2"));
  

Где rightDf аналогично слева df :

 Dataset<Row> rightDf = df
    .withColumnRenamed("acct", "acct2")
    .withColumnRenamed("bssn", "bssn2")
    .withColumnRenamed("name", "name2")
    .drop("tid");
  

И соберите в виде списка. Результатом будет:

  ---- ------------------ ------------------ 
|acct|collect_list(bssn)|collect_list(name)|
 ---- ------------------ ------------------ 
|456 |[333]             |[John]            |
|567 |[444]             |[Casey]           |
|123 |[111, 222]        |[Peter, Paul]     |
 ---- ------------------ ------------------ 
  

Вот весь код:

 package net.jgp.books.spark.ch12.lab990_others;

import static org.apache.spark.sql.functions.*;

import java.util.ArrayList;
import java.util.List;

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

/**
 * Self join.
 * 
 * @author jgp
 */
public class SelfJoinApp {

  /**
   * main() is your entry point to the application.
   * 
   * @param args
   */
  public static void main(String[] args) {
    SelfJoinApp app = new SelfJoinApp();
    app.start();
  }

  /**
   * The processing code.
   */
  private void start() {
    // Creates a session on a local master
    SparkSession spark = SparkSession.builder()
        .appName("Self join")
        .master("local[*]")
        .getOrCreate();

    Dataset<Row> df = createDataframe(spark);
    df.show(false);

    Dataset<Row> rightDf = df
        .withColumnRenamed("acct", "acct2")
        .withColumnRenamed("bssn", "bssn2")
        .withColumnRenamed("name", "name2")
        .drop("tid");

    Dataset<Row> joinedDf = df
        .join(
            rightDf,
            df.col("acct").equalTo(rightDf.col("acct2")),
            "leftsemi")
        .drop(rightDf.col("acct2"))
        .drop(rightDf.col("name2"))
        .drop(rightDf.col("bssn2"));
    joinedDf.show(false);

    Dataset<Row> aggDf = joinedDf.groupBy(joinedDf.col("acct"))
        .agg(collect_list("bssn"), collect_list("name"));
    aggDf.show(false);
  }

  private static Dataset<Row> createDataframe(SparkSession spark) {
    StructType schema = DataTypes.createStructType(new StructField[] {
        DataTypes.createStructField(
            "tid",
            DataTypes.IntegerType,
            false),
        DataTypes.createStructField(
            "acct",
            DataTypes.IntegerType,
            false),
        DataTypes.createStructField(
            "bssn",
            DataTypes.IntegerType,
            false),
        DataTypes.createStructField(
            "name",
            DataTypes.StringType,
            false) });

    List<Row> rows = new ArrayList<>();
    rows.add(RowFactory.create(1, 123, 111, "Peter"));
    rows.add(RowFactory.create(2, 123, 222, "Paul"));
    rows.add(RowFactory.create(3, 456, 333, "John"));
    rows.add(RowFactory.create(4, 567, 444, "Casey"));

    return spark.createDataFrame(rows, schema);
  }
}
  

Ответ №2:

могут использоваться «GroupBy» и «collect_set»:

 val data = List(
  (1, 123, 111, "Peter"),
  (2, 123, 222, "Paul"),
  (3, 456, 333, "John"),
  (4, 567, 444, "Casey")
).toDF("tid", "acct", "bssn", "name")

val result = data.groupBy("acct").agg(collect_set("bssn"), collect_set("name"))
result.show(false)
  

Вывод:

  ---- ----------------- ----------------- 
|acct|collect_set(bssn)|collect_set(name)|
 ---- ----------------- ----------------- 
|123 |[222, 111]       |[Paul, Peter]    |
|567 |[444]            |[Casey]          |
|456 |[333]            |[John]           |
 ---- ----------------- ----------------- 
  

Думаю, это можно легко перевести на Java.