Spark / Java: строка столбца фрейма данных для преобразования

#java #apache-spark #dataset

#java #apache-spark #набор данных

Вопрос:

У меня есть набор данных, подобный этому:

  --- ------------------- ----------------------- 
|id |time               |range                  |
 --- ------------------- ----------------------- 
|id1|2019-03-11 05:00:00|00h00-07h30;23h30-23h59|
|id2|2019-03-11 09:00:00|00h00-07h30;23h30-23h59|
|id3|2019-03-11 10:30:00|00h00-07h30;23h30-23h59|
 --- ------------------- ----------------------- 
  

с помощью схемы

 root
 |-- id: string (nullable = true)
 |-- time: string (nullable = true)
 |-- range: string (nullable = true)
  

Я хочу отфильтровать строки, которые содержат час / минуту в столбце time, между часами / минутами в столбце range.

  --- ------------------- ----------------------- ----------- 
|id |time               |range                  |between    |
 --- ------------------- ----------------------- ----------- 
|id1|2019-03-11 05:00:00|00h00-07h30;23h30-23h59|true       |
|id2|2019-03-11 09:00:00|00h00-07h30;23h30-23h59|false      |
|id3|2019-03-11 10:30:00|00h00-07h30;23h30-23h59|false      |
 --- ------------------- ----------------------- ----------- 
  

Я знаю, что в Scala я должен преобразовать столбец диапазона во что-то вроде

 array(named_struct("start", "00h00", "end", "03h00"), named_struct("start", "15h30", "end", "17h30"), named_struct("start", "21h00", "end", "23h59"))
  

Но я не нашел способа сделать это на Java. Как я могу это сделать, или есть лучшее решение?

Спасибо.

Ответ №1:

Один из способов, которым вы могли бы это сделать, это:

  1. Нормализуйте свое время с помощью статической функции Spark.
  2. Проверьте, находится ли ваше значение в диапазоне, используя UDF (пользовательские функции)

Использование статических функций:

 df = df
    .withColumn(
        "date",
        date_format(col("time"), "yyyy-MM-dd HH:mm:ss.SSSS"))
    .withColumn("h", hour(col("date")))
    .withColumn("m", minute(col("date")))
    .withColumn("s", second(col("date")))
    .withColumn("event", expr("h*3600   m*60  s"))
    .drop("date")
    .drop("h")
    .drop("m")
    .drop("s");
  

Если ваш фрейм данных выглядит как раньше:

  --- ------------------- ----------------------- 
|id |time               |range                  |
 --- ------------------- ----------------------- 
|id1|2019-03-11 05:00:00|00h00-07h30;23h30-23h59|
|id2|2019-03-11 09:00:00|00h00-07h30;23h30-23h59|
|id3|2019-03-11 10:30:00|00h00-07h30;23h30-23h59|
 --- ------------------- ----------------------- 
  

После этого это должно выглядеть как:

  --- ------------------- ----------------------- ----- 
|id |time               |range                  |event|
 --- ------------------- ----------------------- ----- 
|id1|2019-03-11 05:00:00|00h00-07h30;23h30-23h59|18000|
|id2|2019-03-11 09:00:00|00h00-07h30;23h30-23h59|32400|
|id3|2019-03-11 10:30:00|00h00-07h30;23h30-23h59|37800|
 --- ------------------- ----------------------- ----- 
  

Использование UDF:

 df = df.withColumn("between",
    callUDF("inRange", col("range"), col("event")));
  

и результатом будет:

  --- ------------------- ----------------------- ----- ------- 
|id |time               |range                  |event|between|
 --- ------------------- ----------------------- ----- ------- 
|id1|2019-03-11 05:00:00|00h00-07h30;23h30-23h59|18000|true   |
|id2|2019-03-11 09:00:00|00h00-07h30;23h30-23h59|32400|false  |
|id3|2019-03-11 10:30:00|00h00-07h30;23h30-23h59|37800|false  |
 --- ------------------- ----------------------- ----- ------- 
  

InRangeUdf

Ваш UDF будет выглядеть следующим образом:

 package net.jgp.books.sparkInAction.ch14.lab900_in_range;

import org.apache.spark.sql.api.java.UDF2;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class InRangeUdf implements UDF2<String, Integer, Boolean> {
  private static Logger log = LoggerFactory
      .getLogger(InRangeUdf.class);

  private static final long serialVersionUID = -21621751L;

  @Override
  public Boolean call(String range, Integer event) throws Exception {
    log.debug("-> call({}, {})", range, event);
    String[] ranges = range.split(";");
    for (int i = 0; i < ranges.length; i  ) {
      log.debug("Processing range #{}: {}", i, ranges[i]);
      String[] hours = ranges[i].split("-");
      int start =
          Integer.valueOf(hours[0].substring(0, 2)) * 3600  
              Integer.valueOf(hours[0].substring(3)) * 60;
      int end =
          Integer.valueOf(hours[1].substring(0, 2)) * 3600  
              Integer.valueOf(hours[1].substring(3)) * 60;
      log.debug("Checking between {} and {}", start, end);
      if (event >= start amp;amp; event <= end) {
        return true;
      }
    }
    return false;
  }

}
  

Код драйвера

Ваш код драйвера будет выглядеть следующим образом:

 package net.jgp.books.sparkInAction.ch14.lab900_in_range;

import static org.apache.spark.sql.functions.*;

import java.util.ArrayList;
import java.util.List;

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

/**
 * Custom UDF to check if in range.
 * 
 * @author jgp
 */
public class InCustomRangeApp {

  /**
   * main() is your entry point to the application.
   * 
   * @param args
   */
  public static void main(String[] args) {
    InCustomRangeApp app = new InCustomRangeApp();
    app.start();
  }

  /**
   * The processing code.
   */
  private void start() {
    // Creates a session on a local master
    SparkSession spark = SparkSession.builder()
        .appName("Custom UDF to check if in range")
        .master("local[*]")
        .getOrCreate();
    spark.udf().register(
        "inRange",
        new InRangeUdf(),
        DataTypes.BooleanType);

    Dataset<Row> df = createDataframe(spark);
    df.show(false);

    df = df
        .withColumn(
            "date",
            date_format(col("time"), "yyyy-MM-dd HH:mm:ss.SSSS"))
        .withColumn("h", hour(col("date")))
        .withColumn("m", minute(col("date")))
        .withColumn("s", second(col("date")))
        .withColumn("event", expr("h*3600   m*60  s"))
        .drop("date")
        .drop("h")
        .drop("m")
        .drop("s");
    df.show(false);

    df = df.withColumn("between",
        callUDF("inRange", col("range"), col("event")));
    df.show(false);
  }

  private static Dataset<Row> createDataframe(SparkSession spark) {
    StructType schema = DataTypes.createStructType(new StructField[] {
        DataTypes.createStructField(
            "id",
            DataTypes.StringType,
            false),
        DataTypes.createStructField(
            "time",
            DataTypes.StringType,
            false),
        DataTypes.createStructField(
            "range",
            DataTypes.StringType,
            false) });

    List<Row> rows = new ArrayList<>();
    rows.add(RowFactory.create("id1", "2019-03-11 05:00:00",
        "00h00-07h30;23h30-23h59"));
    rows.add(RowFactory.create("id2", "2019-03-11 09:00:00",
        "00h00-07h30;23h30-23h59"));
    rows.add(RowFactory.create("id3", "2019-03-11 10:30:00",
        "00h00-07h30;23h30-23h59"));

    return spark.createDataFrame(rows, schema);
  }
}