Эффективный способ генерировать большие рандомизированные данные в Spark

#scala #apache-spark #dataframe

#scala #apache-spark #dataframe

Вопрос:

Я пытаюсь сгенерировать большой случайный набор данных spark. По сути, я хочу начать с 2018-12-01 09:00:00 и для каждой новой строки временная метка будет меняться на scala.util.Random.nextInt(3) секунды. ( timestamp Столбец является единственным значимым столбцом)

Я хочу, чтобы это работало, даже когда я пытаюсь сгенерировать триллионы строк в большом кластере, поэтому я пытаюсь генерировать его партиями по 100 элементов за раз, поскольку триллионы строк не могут поместиться в Seq .

С этим кодом есть несколько проблем, таких как var и я не уверен в своем использовании union . Мне интересно, есть ли у кого-нибудь идея получше, как это сделать.

 import Math.{max, min}
import java.sql.Timestamp
import java.sql.Timestamp.valueOf

import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.{DataFrame, Row, SaveMode}
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._

object DataGenerator extends SparkEnv {

  import spark.implicits._

  val batchSize = 100
  val rnd = scala.util.Random

  // randomly generates a DataFrame with n Rows
  def generateTimestampData(n: Int): DataFrame = {
    val timestampDataFields = Seq(StructField("timestamp", TimestampType, false))
    val initDF = spark.createDataFrame(spark.sparkContext.emptyRDD[Row], StructType(timestampDataFields))
    def loop(data: DataFrame, lastTime: Long, _n: Int): DataFrame = {
      if (_n == 0) {
        val w = Window.orderBy("timestamp")
        data.withColumn("eventID", concat(typedLit("event"), row_number().over(w)))
      } else {
        var thisTime = lastTime
        def rts(ts: Long): Stream[Long] = ts #:: { thisTime = ts   rnd.nextInt(3) * 1000; rts(thisTime) }
        val thisBatch = rts(lastTime)
          .map(new Timestamp(_))
          .take(min(batchSize, _n))
          .toDF("timestamp")
        loop(data union thisBatch, thisTime, max(_n - batchSize, 0))
      }
    }
    loop(initDF, valueOf("2018-12-01 09:00:00").getTime(), n)
  }

  def main(args: Array[String]): Unit = {
    val w = Window.orderBy("timestamp")
    val df = generateTimestampData(10015)
      .withColumn("part", floor(row_number().over(w) / 100))
    df.repartition(27)
      .write
      .partitionBy("part")
      .option("compression", "snappy")
      .mode(SaveMode.Overwrite)
      .parquet("data/generated/ts_data")
  }

}
  

Приведенный выше код приводит к созданию фрейма данных с 10 015 строками, который выглядит примерно так.

  ------------------- ---------- ---- 
|          timestamp|   eventID|part|
 ------------------- ---------- ---- 
|2018-12-01 11:43:09|event10009| 100|
|2018-12-01 11:43:02|event10003| 100|
|2018-12-01 11:43:11|event10012| 100|
|2018-12-01 11:43:10|event10011| 100|
|2018-12-01 11:43:08|event10007| 100|
|2018-12-01 11:43:02|event10001| 100|
|2018-12-01 11:43:08|event10008| 100|
|2018-12-01 11:43:12|event10013| 100|
|2018-12-01 11:43:09|event10010| 100|
|2018-12-01 11:43:14|event10014| 100|
|2018-12-01 10:11:54| event4357|  43|
|2018-12-01 10:47:33| event6524|  65|
|2018-12-01 10:23:08| event5064|  50|
|2018-12-01 10:23:02| event5060|  50|
|2018-12-01 10:23:39| event5099|  50|
|2018-12-01 10:22:25| event5019|  50|
|2018-12-01 09:16:36| event1042|  10|
|2018-12-01 09:16:03| event1008|  10|
|2018-12-01 09:16:13| event1017|  10|
|2018-12-01 09:17:28| event1092|  10|
 ------------------- ---------- ---- 
  

Ответ №1:

Вы можете реализовать RDD, который выполняет генерацию случайных данных параллельно, как в следующем примере.

 import scala.reflect.ClassTag
import org.apache.spark.{Partition, TaskContext}
import org.apache.spark.rdd.RDD

// Each random partition will hold `numValues` items
final class RandomPartition[A: ClassTag](val index: Int, numValues: Int, random: => A) extends Partition {
  def values: Iterator[A] = Iterator.fill(numValues)(random)
}

// The RDD will parallelize the workload across `numSlices`
final class RandomRDD[A: ClassTag](@transient private val sc: SparkContext, numSlices: Int, numValues: Int, random: => A) extends RDD[A](sc, deps = Seq.empty) {

  // Based on the item and executor count, determine how many values are
  // computed in each executor. Distribute the rest evenly (if any).
  private val valuesPerSlice = numValues / numSlices
  private val slicesWithExtraItem = numValues % numSlices

  // Just ask the partition for the data
  override def compute(split: Partition, context: TaskContext): Iterator[A] =
    split.asInstanceOf[RandomPartition[A]].values

  // Generate the partitions so that the load is as evenly spread as possible
  // e.g. 10 partition and 22 items -> 2 slices with 3 items and 8 slices with 2
  override protected def getPartitions: Array[Partition] =
    ((0 until slicesWithExtraItem).view.map(new RandomPartition[A](_, valuesPerSlice   1, random))   
      (slicesWithExtraItem until numSlices).view.map(new RandomPartition[A](_, valuesPerSlice, random))).toArray

}
  

Получив это, вы можете использовать его, передавая свой собственный генератор случайных данных, чтобы получить RDD[Int]

 val rdd = new RandomRDD(spark.sparkContext, 10, 22, scala.util.Random.nextInt(100)   1)
rdd.foreach(println)
/*
 * outputs:
 * 30
 * 86
 * 75
 * 20
 * ...
 */
  

или RDD[(Int, Int, Int)]

 def rand = scala.util.Random.nextInt(100)   1
val rdd = new RandomRDD(spark.sparkContext, 10, 22, (rand, rand, rand))
rdd.foreach(println)
/*
 * outputs:
 * (33,22,15)
 * (65,24,64)
 * (41,81,44)
 * (58,7,18)
 * ...
 */
  

и, конечно, вы также можете очень легко обернуть это в DataFrame :

 spark.createDataFrame(rdd).show()
/*
 * outputs:
 *  --- --- --- 
 * | _1| _2| _3|
 *  --- --- --- 
 * |100| 48| 92|
 * | 34| 40| 30|
 * | 98| 63| 61|
 * | 95| 17| 63|
 * | 68| 31| 34|
 * .............
 */
  

Обратите внимание, как в этом случае генерируемые данные отличаются каждый раз, когда выполняется действие с RDD / DataFrame . Изменяя реализацию RandomPartition так, чтобы она фактически сохраняла значения, а не генерировала их «на лету», вы можете получить стабильный набор случайных элементов, сохраняя при этом гибкость и масштабируемость этого подхода.

Одним из приятных свойств подхода без состояния является то, что вы можете генерировать огромный набор данных даже локально. Следующее запустилось за несколько секунд на моем ноутбуке:

 new RandomRDD(spark.sparkContext, 10, Int.MaxValue, 42).count
// returns: 2147483647
  

Комментарии:

1. В моей версии spark этого нет, spark.createDataFrame(rdd: RDD[_]) вместо этого есть spark.createDataFrame(rdd: RDD[_], beanClass: Class[_]) .

2. Вы должны явно указать выходной класс. Какая это версия?

3. Это версия 2.4.0

4. Передайте ожидаемый результат явно, либо с помощью Class , либо с помощью StructType . spark.apache.org/docs/2.4.0/api/scala /…

5. Если вы получаете java.lang.ClassCastException: cannot assign instance of java.lang.invoke.SerializedLambda to field ошибку, вы можете захотеть изменить вызов по имени в (val index: Int, numValues: Int, random: => A) на вызов по значению (val index: Int, numValues: Int, random: A)