#java #scala #dataframe #apache-spark
Вопрос:
У меня есть фрейм данных с именем timeDF
, схема которого приведена ниже:
root
|-- Id: long (nullable = true)
|-- Model: timestamp (nullable = true)
|-- Prevision: timestamp (nullable = true)
Я хочу добавить новую строку в конце timeDF
, преобразовав два Calendar
объекта c1
amp; c2
в Timestamp
. Я знаю, что могу сделать это, сначала преобразовав их в Timestamp
такие :
val t1 = new Timestamp(c1.getTimeInMillis)
val t2 = new Timestamp(c2.getTimeInMillis)
Однако я не могу понять, как затем записать эти переменные timeDF
в новую строку и как позволить spark увеличить значение Id
столбца ?
Должен ли я создать List
объект с t1
и t2
и создать временный кадр данных из этого списка, чтобы затем объединить два кадра данных ? Если да, то как мне управлять Id
колонкой ? Не слишком ли много беспорядка для такой простой операции ?
Кто-нибудь может мне объяснить, пожалуйста ?
Спасибо.
Комментарии:
1. Адриен, Spark-это не база данных, поэтому вы не можете выполнить вставку, как это было бы с реляционной базой данных. Что вы можете сделать, так это создать новый фрейм данных, добавить его unionByName() в свой timeDf, а затем добавить идентификатор с помощью API фрейма данных Spark. В зависимости от вашей ситуации с параллелизмом, это может быть проблематично.
2. @jgp Хорошо, я попробую, но для идентификатора я действительно не понимаю, что делать. Если я объединю timeDF во временный фрейм данных (который имеет 2 столбца), идентификатор будет равен нулю, не так ли ?
3. Это нормально, когда есть нули, вы можете фильтровать их, изменять их значения (создавая другой столбец, да, Spark-это не база данных 🙂 ). Вам нужно беспокоиться о параллелизме при создании новых строк?
Ответ №1:
Вот решение, которое вы можете попробовать, в двух словах:
- Проглоти свой файл.
- Создайте новый фрейм данных с вашими данными и
unionByName()
. - Исправьте идентификатор.
- Убирать.
Создайте дополнительную запись
Сначала вы создаете дополнительную запись с нуля. Поскольку вы смешиваете несколько типов, я использовал POJO, вот код:
List<ModelPrevisionRecord> data = new ArrayList<>();
ModelPrevisionRecord b = new ModelPrevisionRecord(
-1L,
new Timestamp(System.currentTimeMillis()),
new Timestamp(System.currentTimeMillis()));
data.add(b);
Dataset<ModelPrevisionRecord> ds = spark.createDataset(data,
Encoders.bean(ModelPrevisionRecord.class));
timeDf = timeDf.unionByName(ds.toDF());
ModelPrevisionRecord-это очень простое POJO:
package net.jgp.labs.spark.l999_scrapbook.l000;
import java.sql.Timestamp;
public class ModelPrevisionRecord {
public long getId() {
return id;
}
public void setId(long id) {
this.id = id;
}
public Timestamp getModel() {
return model;
}
public void setModel(Timestamp model) {
this.model = model;
}
public Timestamp getPrevision() {
return prevision;
}
public void setPrevision(Timestamp prevision) {
this.prevision = prevision;
}
private long id;
private Timestamp model;
private Timestamp prevision;
public ModelPrevisionRecord(long id, Timestamp model, Timestamp prevision) {
this.id = id;
this.model = model;
this.prevision = prevision;
}
}
Исправьте идентификатор
Идентификатор равен -1, поэтому идентификатор предназначен для создания нового столбца id2
с правильным идентификатором:
timeDf = timeDf.withColumn("id2",
when(
col("id").$eq$eq$eq(-1), timeDf.agg(max("id")).head().getLong(0) 1)
.otherwise(col("id")));
Очистка фрейма данных
Наконец, очистите свой фрейм данных:
timeDf = timeDf.drop("id").withColumnRenamed("id2", "id");
Важные примечания
- Это решение будет работать только в том случае, если вы добавляете по одной записи за раз, в противном случае у вас будет один и тот же идентификатор.
- Вы можете увидеть весь пример здесь: https://github.com/jgperrin/net.jgp.labs.spark/tree/master/src/main/java/net/jgp/labs/spark/l999_scrapbook/l000, это может быть проще клонировать…
Ответ №2:
Если ваш первый кадр данных можно отсортировать по идентификатору и вам нужно добавлять строки одну за другой, вы можете найти максимальный идентификатор в своем списке:
long max = timeDF.agg(functions.max("Id")).head().getLong(0);
а затем увеличьте и добавьте его в свой фрейм данных путем объединения. Чтобы сделать это, следуйте следующему примеру, возраст которого может действовать как идентификатор. people.json
это файл в примерах spark.
Dataset<Row> df = spark.read().json("H:\work\HadoopWinUtils\people.json");
df.show();
long max = df.agg(functions.max("age")).head().getLong(0);
List<Row> rows = Arrays.asList(RowFactory.create(max 1, "test"));
StructType schema = DataTypes.createStructType(Arrays.asList(
DataTypes.createStructField("age", DataTypes.LongType, false, Metadata.empty()),
DataTypes.createStructField("name", DataTypes.StringType, false, Metadata.empty())));
Dataset<Row> df2 = spark.createDataFrame(rows, schema);
df2.show();
Dataset<Row> df3 = df.union(df2);
df3.show();
Комментарии:
1. Спасибо за вашу помощь. Я попробовал ваше решение, но получил ошибку несоответствия типа :
error: type mismatch; found : List[org.apache.spark.sql.Row] (in java.util) required: List[org.apache.spark.sql.Row] (in scala.collection.immutable) val newTime: List[Row] = Arrays.asList(RowFactory.create(id, modelTS, targetTS))
. Как я должен решить такую проблему, они используют один и тот же тип, но не могут заставить его работать.2. Вам нужно изменить типы данных на основе ваших значений. Я использую (Целое число, строку) в качестве кортежа. Просто сначала протестируйте его с помощью «people.json», чтобы увидеть, как это работает. Все, что тебе нужно, это
df.agg(max('ID'))
. Однако этот код является Java, а не Scala.3. Эта ошибка говорит, что я использую список,
java.util
но вы используете список, вScala.collections
котором они разные.
Ответ №3:
Я пробовал это, но я не знаю, почему при печати сохраненной таблицы сохраняются только последние 2 строки, все остальные удаляются.
Вот как я инициализирую таблицу дельта :
val schema = StructType(
StructField("Id", LongType, false) ::
StructField("Model", TimestampType, false) ::
StructField("Prevision", TimestampType, false) :: Nil
)
var timestampDF = spark.createDataFrame(sc.emptyRDD[Row], schema)
val write_format = "delta"
val partition_by = "Model"
val save_path = "/mnt/path/to/folder"
val table_name = "myTable"
spark.sql("DROP TABLE IF EXISTS " table_name)
dbutils.fs.rm(save_path, true)
timestampDF.write.partitionBy(partition_by)
.format(write_format)
.save(save_path)
spark.sql("CREATE TABLE " table_name " USING DELTA LOCATION '" save_path "'")
И вот как я добавляю в него новый элемент
def addTimeToData(model: Calendar, target: Calendar): Unit = {
var timeDF = spark.read
.format("delta")
.load("/mnt/path/to/folder")
val modelTS = new Timestamp(model.getTimeInMillis)
val targetTS = new Timestamp(target.getTimeInMillis)
var id: Long = 0
if (!timeDF.head(1).isEmpty) {
id = timeDF.agg(max("Id")).head().getLong(0) 1
}
val newTime = Arrays.asList(RowFactory.create(id, modelTS, targetTS))
val schema = StructType(
StructField("Id", LongType, false) ::
StructField("Model", TimestampType, false) ::
StructField("Prevision", TimestampType, false) :: Nil
)
var newTimeDF = spark.createDataFrame(newTime, schema)
val unionTimeDF = timeDF.union(newTimeDF)
timeDF = unionTimeDF
unionTimeDF.show
val save_path = "/mnt/datalake/Exploration/Provisionning/MeteoFrance/Timestamps/"
val table_name = "myTable"
spark.sql("DROP TABLE IF EXISTS " table_name)
dbutils.fs.rm(save_path, true)
timeDF.write.partitionBy("Model")
.format("delta")
.save(save_path)
spark.sql("CREATE TABLE " table_name " USING DELTA LOCATION '" save_path "'")
}
Я не очень хорошо знаком с дельта-таблицами, поэтому не знаю, могу ли я просто использовать SQL для добавления таких значений :
spark.sql("INSERT INTO 'myTable' VALUES (" id ", " modelTS ", " previsionTS ")");
И я не знаю, сработает ли просто такая переменная временных меток, как so.
Комментарии:
1. Адриен — это здорово, но это должно быть в вашем вопросе, как обновление! Я не думаю, что тебе нужна Дельта. Я постараюсь придумать ответ…
2. Я вынужден использовать дельта-таблицы
3. опубликуйте новый вопрос с вашей дополнительной потребностью (Дельта), затем код, который вы пробовали (этот ответ), и отправьте мне ссылку, я попытаюсь что-нибудь сделать (итм, вы можете подтвердить мой ответ :D).