Добавьте строку в фрейм данных Spark с метками времени и идентификатором

#java #scala #dataframe #apache-spark

Вопрос:

У меня есть фрейм данных с именем timeDF , схема которого приведена ниже:

 root
 |-- Id: long (nullable = true)
 |-- Model: timestamp (nullable = true)
 |-- Prevision: timestamp (nullable = true)
 

Я хочу добавить новую строку в конце timeDF , преобразовав два Calendar объекта c1 amp; c2 в Timestamp . Я знаю, что могу сделать это, сначала преобразовав их в Timestamp такие :

 val t1 = new Timestamp(c1.getTimeInMillis)
val t2 = new Timestamp(c2.getTimeInMillis)
 

Однако я не могу понять, как затем записать эти переменные timeDF в новую строку и как позволить spark увеличить значение Id столбца ?

Должен ли я создать List объект с t1 и t2 и создать временный кадр данных из этого списка, чтобы затем объединить два кадра данных ? Если да, то как мне управлять Id колонкой ? Не слишком ли много беспорядка для такой простой операции ?

Кто-нибудь может мне объяснить, пожалуйста ?

Спасибо.

Комментарии:

1. Адриен, Spark-это не база данных, поэтому вы не можете выполнить вставку, как это было бы с реляционной базой данных. Что вы можете сделать, так это создать новый фрейм данных, добавить его unionByName() в свой timeDf, а затем добавить идентификатор с помощью API фрейма данных Spark. В зависимости от вашей ситуации с параллелизмом, это может быть проблематично.

2. @jgp Хорошо, я попробую, но для идентификатора я действительно не понимаю, что делать. Если я объединю timeDF во временный фрейм данных (который имеет 2 столбца), идентификатор будет равен нулю, не так ли ?

3. Это нормально, когда есть нули, вы можете фильтровать их, изменять их значения (создавая другой столбец, да, Spark-это не база данных 🙂 ). Вам нужно беспокоиться о параллелизме при создании новых строк?

Ответ №1:

Вот решение, которое вы можете попробовать, в двух словах:

  1. Проглоти свой файл.
  2. Создайте новый фрейм данных с вашими данными и unionByName() .
  3. Исправьте идентификатор.
  4. Убирать.

Создайте дополнительную запись

Сначала вы создаете дополнительную запись с нуля. Поскольку вы смешиваете несколько типов, я использовал POJO, вот код:

 List<ModelPrevisionRecord> data = new ArrayList<>();
ModelPrevisionRecord b = new ModelPrevisionRecord(
    -1L,
    new Timestamp(System.currentTimeMillis()),
    new Timestamp(System.currentTimeMillis()));
data.add(b);
Dataset<ModelPrevisionRecord> ds = spark.createDataset(data,
    Encoders.bean(ModelPrevisionRecord.class));
timeDf = timeDf.unionByName(ds.toDF());
 

ModelPrevisionRecord-это очень простое POJO:

 package net.jgp.labs.spark.l999_scrapbook.l000;

import java.sql.Timestamp;

public class ModelPrevisionRecord {

  public long getId() {
    return id;
  }

  public void setId(long id) {
    this.id = id;
  }

  public Timestamp getModel() {
    return model;
  }

  public void setModel(Timestamp model) {
    this.model = model;
  }

  public Timestamp getPrevision() {
    return prevision;
  }

  public void setPrevision(Timestamp prevision) {
    this.prevision = prevision;
  }

  private long id;
  private Timestamp model;
  private Timestamp prevision;

  public ModelPrevisionRecord(long id, Timestamp model, Timestamp prevision) {
    this.id = id;
    this.model = model;
    this.prevision = prevision;
  }
}
 

Исправьте идентификатор

Идентификатор равен -1, поэтому идентификатор предназначен для создания нового столбца id2 с правильным идентификатором:

 timeDf = timeDf.withColumn("id2",
    when(
        col("id").$eq$eq$eq(-1), timeDf.agg(max("id")).head().getLong(0) 1)
            .otherwise(col("id")));
 

Очистка фрейма данных

Наконец, очистите свой фрейм данных:

 timeDf = timeDf.drop("id").withColumnRenamed("id2", "id");
 

Важные примечания

Ответ №2:

Если ваш первый кадр данных можно отсортировать по идентификатору и вам нужно добавлять строки одну за другой, вы можете найти максимальный идентификатор в своем списке:

 long max = timeDF.agg(functions.max("Id")).head().getLong(0);
 

а затем увеличьте и добавьте его в свой фрейм данных путем объединения. Чтобы сделать это, следуйте следующему примеру, возраст которого может действовать как идентификатор. people.json это файл в примерах spark.

 Dataset<Row> df = spark.read().json("H:\work\HadoopWinUtils\people.json");
df.show();

long max = df.agg(functions.max("age")).head().getLong(0);
List<Row> rows = Arrays.asList(RowFactory.create(max 1,  "test"));

StructType schema = DataTypes.createStructType(Arrays.asList(
                DataTypes.createStructField("age", DataTypes.LongType, false, Metadata.empty()),
                DataTypes.createStructField("name", DataTypes.StringType, false, Metadata.empty())));
Dataset<Row> df2 = spark.createDataFrame(rows, schema);
df2.show();
Dataset<Row> df3 = df.union(df2);
df3.show();
 

Комментарии:

1. Спасибо за вашу помощь. Я попробовал ваше решение, но получил ошибку несоответствия типа : error: type mismatch; found : List[org.apache.spark.sql.Row] (in java.util) required: List[org.apache.spark.sql.Row] (in scala.collection.immutable) val newTime: List[Row] = Arrays.asList(RowFactory.create(id, modelTS, targetTS)) . Как я должен решить такую проблему, они используют один и тот же тип, но не могут заставить его работать.

2. Вам нужно изменить типы данных на основе ваших значений. Я использую (Целое число, строку) в качестве кортежа. Просто сначала протестируйте его с помощью «people.json», чтобы увидеть, как это работает. Все, что тебе нужно, это df.agg(max('ID')) . Однако этот код является Java, а не Scala.

3. Эта ошибка говорит, что я использую список, java.util но вы используете список, в Scala.collections котором они разные.

Ответ №3:

Я пробовал это, но я не знаю, почему при печати сохраненной таблицы сохраняются только последние 2 строки, все остальные удаляются.

Вот как я инициализирую таблицу дельта :

 val schema = StructType(
               StructField("Id", LongType, false) ::
               StructField("Model", TimestampType, false) ::
               StructField("Prevision", TimestampType, false) :: Nil
             )

var timestampDF = spark.createDataFrame(sc.emptyRDD[Row], schema)

val write_format = "delta"
val partition_by = "Model"
val save_path = "/mnt/path/to/folder"
val table_name = "myTable"

spark.sql("DROP TABLE IF EXISTS "   table_name)
dbutils.fs.rm(save_path, true)

timestampDF.write.partitionBy(partition_by)
                 .format(write_format)
                 .save(save_path)

spark.sql("CREATE TABLE "   table_name   " USING DELTA LOCATION '"   save_path   "'")
 

И вот как я добавляю в него новый элемент

 def addTimeToData(model: Calendar, target: Calendar): Unit = {
  var timeDF = spark.read
                    .format("delta")
                    .load("/mnt/path/to/folder")
  
  val modelTS = new Timestamp(model.getTimeInMillis)
  val targetTS = new Timestamp(target.getTimeInMillis)
  var id: Long = 0
  
  if (!timeDF.head(1).isEmpty) {
    id = timeDF.agg(max("Id")).head().getLong(0)   1
  }
  
  val newTime = Arrays.asList(RowFactory.create(id, modelTS, targetTS))
  val schema = StructType(
                 StructField("Id", LongType, false) ::
                 StructField("Model", TimestampType, false) ::
                 StructField("Prevision", TimestampType, false) :: Nil
               )  
  var newTimeDF = spark.createDataFrame(newTime, schema)
  val unionTimeDF = timeDF.union(newTimeDF)
  timeDF = unionTimeDF
  unionTimeDF.show
  val save_path = "/mnt/datalake/Exploration/Provisionning/MeteoFrance/Timestamps/"
  val table_name = "myTable"

  spark.sql("DROP TABLE IF EXISTS "   table_name)
  dbutils.fs.rm(save_path, true)
  timeDF.write.partitionBy("Model")
              .format("delta")
              .save(save_path)

  spark.sql("CREATE TABLE "   table_name   " USING DELTA LOCATION '"   save_path   "'")
}
 

Я не очень хорошо знаком с дельта-таблицами, поэтому не знаю, могу ли я просто использовать SQL для добавления таких значений :

 spark.sql("INSERT INTO 'myTable' VALUES ("   id   ", "   modelTS   ", "   previsionTS   ")");
 

И я не знаю, сработает ли просто такая переменная временных меток, как so.

Комментарии:

1. Адриен — это здорово, но это должно быть в вашем вопросе, как обновление! Я не думаю, что тебе нужна Дельта. Я постараюсь придумать ответ…

2. Я вынужден использовать дельта-таблицы

3. опубликуйте новый вопрос с вашей дополнительной потребностью (Дельта), затем код, который вы пробовали (этот ответ), и отправьте мне ссылку, я попытаюсь что-нибудь сделать (итм, вы можете подтвердить мой ответ :D).