Потеря записей при внутреннем присоединении данных к левому фрейму данных в структурированной потоковой передаче Spark

#scala #apache-spark #apache-spark-sql #spark-structured-streaming

#scala #apache-spark #apache-spark-sql #spark-structured-streaming

Вопрос:

Я пытаюсь объединить данные с фреймом данных, который, в свою очередь, является результатом левого соединения. В то время как при пакетной обработке это работает должным образом, при потоковой обработке некоторые записи теряются…

Ниже я создал минимальный пример «сеансов», в которых есть события «start» и «end» и, при необходимости, некоторые «метаданные».

Скрипт генерирует два выходных данных: sessionStartsWithMetadata результат событий «start», которые объединены слева с событиями «metadata» на основе sessionId . Используется «Левое соединение», поскольку нам нравится получать событие вывода, даже если не существует соответствующих метаданных.

Кроме endedSessionsWithMetadata того, создается фрейм данных путем присоединения событий «end» к ранее созданному фрейму данных. Здесь используется «внутреннее соединение», поскольку мы хотим получить некоторый вывод только после того, как сеанс наверняка закончился.

Этот код может быть выполнен в spark-shell :

 import java.sql.Timestamp
import org.apache.spark.sql.execution.streaming.{MemoryStream, StreamingQueryWrapper}
import org.apache.spark.sql.streaming.StreamingQuery
import org.apache.spark.sql.{DataFrame, SQLContext}
import org.apache.spark.sql.functions.{col, expr, lit}

import spark.implicits._
implicit val sqlContext: SQLContext = spark.sqlContext

// Main data processing, regardless whether batch or stream processing
def process(
    sessionStartEvents: DataFrame,
    sessionOptionalMetadataEvents: DataFrame,
    sessionEndEvents: DataFrame
): (DataFrame, DataFrame) = {
  val sessionStartsWithMetadata: DataFrame = sessionStartEvents
    .join(
      sessionOptionalMetadataEvents,
      sessionStartEvents("sessionId") === sessionOptionalMetadataEvents("sessionId") amp;amp;
        sessionStartEvents("sessionStartTimestamp").between(
          sessionOptionalMetadataEvents("sessionOptionalMetadataTimestamp").minus(expr(s"INTERVAL 1 seconds")),
          sessionOptionalMetadataEvents("sessionOptionalMetadataTimestamp").plus(expr(s"INTERVAL 1 seconds"))
        ),
      "left" // metadata is optional
    )
    .select(
      sessionStartEvents("sessionId"),
      sessionStartEvents("sessionStartTimestamp"),
      sessionOptionalMetadataEvents("sessionOptionalMetadataTimestamp")
    )

  val endedSessionsWithMetadata = sessionStartsWithMetadata.join(
    sessionEndEvents,
    sessionStartsWithMetadata("sessionId") === sessionEndEvents("sessionId") amp;amp;
      sessionStartsWithMetadata("sessionStartTimestamp").between(
        sessionEndEvents("sessionEndTimestamp").minus(expr(s"INTERVAL 10 seconds")),
        sessionEndEvents("sessionEndTimestamp")
      )
  )

  (sessionStartsWithMetadata, endedSessionsWithMetadata)
}

def streamProcessing(
    sessionStartData: Seq[(Timestamp, Int)],
    sessionOptionalMetadata: Seq[(Timestamp, Int)],
    sessionEndData: Seq[(Timestamp, Int)]
): (StreamingQuery, StreamingQuery) = {

  val sessionStartEventsStream: MemoryStream[(Timestamp, Int)] = MemoryStream[(Timestamp, Int)]
  sessionStartEventsStream.addData(sessionStartData)

  val sessionStartEvents: DataFrame = sessionStartEventsStream
    .toDS()
    .toDF("sessionStartTimestamp", "sessionId")
    .withWatermark("sessionStartTimestamp", "1 second")

  val sessionOptionalMetadataEventsStream: MemoryStream[(Timestamp, Int)] = MemoryStream[(Timestamp, Int)]
  sessionOptionalMetadataEventsStream.addData(sessionOptionalMetadata)

  val sessionOptionalMetadataEvents: DataFrame = sessionOptionalMetadataEventsStream
    .toDS()
    .toDF("sessionOptionalMetadataTimestamp", "sessionId")
    .withWatermark("sessionOptionalMetadataTimestamp", "1 second")

  val sessionEndEventsStream: MemoryStream[(Timestamp, Int)] = MemoryStream[(Timestamp, Int)]
  sessionEndEventsStream.addData(sessionEndData)

  val sessionEndEvents: DataFrame = sessionEndEventsStream
    .toDS()
    .toDF("sessionEndTimestamp", "sessionId")
    .withWatermark("sessionEndTimestamp", "1 second")

  val (sessionStartsWithMetadata, endedSessionsWithMetadata) =
    process(sessionStartEvents, sessionOptionalMetadataEvents, sessionEndEvents)

  val sessionStartsWithMetadataQuery = sessionStartsWithMetadata
    .select(lit("sessionStartsWithMetadata"), col("*")) // Add label to see which query's output it is
    .writeStream
    .outputMode("append")
    .format("console")
    .option("truncate", "false")
    .option("numRows", "1000")
    .start()

  val endedSessionsWithMetadataQuery = endedSessionsWithMetadata
    .select(lit("endedSessionsWithMetadata"), col("*")) // Add label to see which query's output it is
    .writeStream
    .outputMode("append")
    .format("console")
    .option("truncate", "false")
    .option("numRows", "1000")
    .start()

  (sessionStartsWithMetadataQuery, endedSessionsWithMetadataQuery)
}

def batchProcessing(
    sessionStartData: Seq[(Timestamp, Int)],
    sessionOptionalMetadata: Seq[(Timestamp, Int)],
    sessionEndData: Seq[(Timestamp, Int)]
): Unit = {

  val sessionStartEvents = spark.createDataset(sessionStartData).toDF("sessionStartTimestamp", "sessionId")
  val sessionOptionalMetadataEvents = spark.createDataset(sessionOptionalMetadata).toDF("sessionOptionalMetadataTimestamp", "sessionId")
  val sessionEndEvents = spark.createDataset(sessionEndData).toDF("sessionEndTimestamp", "sessionId")

  val (sessionStartsWithMetadata, endedSessionsWithMetadata) =
    process(sessionStartEvents, sessionOptionalMetadataEvents, sessionEndEvents)

  println("sessionStartsWithMetadata")
  sessionStartsWithMetadata.show(100, truncate = false)

  println("endedSessionsWithMetadata")
  endedSessionsWithMetadata.show(100, truncate = false)
}


// Data is represented as tuples of (eventTime, sessionId)...
val sessionStartData = Vector(
  (new Timestamp(1), 0),
  (new Timestamp(2000), 1),
  (new Timestamp(2000), 2),
  (new Timestamp(20000), 10)
)

val sessionOptionalMetadata = Vector(
  (new Timestamp(1), 0),
  // session `1` has no metadata
  (new Timestamp(2000), 2),
  (new Timestamp(20000), 10)
)

val sessionEndData = Vector(
  (new Timestamp(10000), 0),
  (new Timestamp(11000), 1),
  (new Timestamp(12000), 2),
  (new Timestamp(30000), 10)
)

batchProcessing(sessionStartData, sessionOptionalMetadata, sessionEndData)

val (sessionStartsWithMetadataQuery, endedSessionsWithMetadataQuery) =
  streamProcessing(sessionStartData, sessionOptionalMetadata, sessionEndData)
  

В примере сеанса с идентификатором 1 нет метаданных, поэтому соответствующий столбец метаданных есть null .

Основная функциональность объединения данных реализована в def process(…) , которая вызывается с использованием как пакетных данных, так и потоковых данных.

В пакетной версии результат соответствует ожидаемому:

 sessionStartsWithMetadata
 --------- ----------------------- --------------------------------             
|sessionId|sessionStartTimestamp  |sessionOptionalMetadataTimestamp|
 --------- ----------------------- -------------------------------- 
|0        |1970-01-01 01:00:00.001|1970-01-01 01:00:00.001         |
|1        |1970-01-01 01:00:02    |null                            | ← has no metadata ✔
|2        |1970-01-01 01:00:02    |1970-01-01 01:00:02             |
|10       |1970-01-01 01:00:20    |1970-01-01 01:00:20             |
 --------- ----------------------- -------------------------------- 

endedSessionsWithMetadata
 --------- ----------------------- -------------------------------- ------------------- --------- 
|sessionId|sessionStartTimestamp  |sessionOptionalMetadataTimestamp|sessionEndTimestamp|sessionId|
 --------- ----------------------- -------------------------------- ------------------- --------- 
|0        |1970-01-01 01:00:00.001|1970-01-01 01:00:00.001         |1970-01-01 01:00:10|0        |
|1        |1970-01-01 01:00:02    |null                            |1970-01-01 01:00:11|1        |  ← has no metadata ✔
|2        |1970-01-01 01:00:02    |1970-01-01 01:00:02             |1970-01-01 01:00:12|2        |
|10       |1970-01-01 01:00:20    |1970-01-01 01:00:20             |1970-01-01 01:00:30|10       |
 --------- ----------------------- -------------------------------- ------------------- --------- 
  

Но когда выполняется та же обработка, что и обработка потока, вывод endedSessionsWithMetadata не содержит записи сеанса 1 , у которого нет метаданных:

 -------------------------------------------                                     
Batch: 0 ("start event")
-------------------------------------------
 ------------------------- --------- ----------------------- -------------------------------- 
|sessionStartsWithMetadata|sessionId|sessionStartTimestamp  |sessionOptionalMetadataTimestamp|
 ------------------------- --------- ----------------------- -------------------------------- 
|sessionStartsWithMetadata|10       |1970-01-01 01:00:20    |1970-01-01 01:00:20             |
|sessionStartsWithMetadata|2        |1970-01-01 01:00:02    |1970-01-01 01:00:02             |
|sessionStartsWithMetadata|0        |1970-01-01 01:00:00.001|1970-01-01 01:00:00.001         |
 ------------------------- --------- ----------------------- -------------------------------- 

-------------------------------------------                                     
Batch: 0 ("end event")
-------------------------------------------
 ------------------------- --------- ----------------------- -------------------------------- ------------------- --------- 
|endedSessionsWithMetadata|sessionId|sessionStartTimestamp  |sessionOptionalMetadataTimestamp|sessionEndTimestamp|sessionId|
 ------------------------- --------- ----------------------- -------------------------------- ------------------- --------- 
|endedSessionsWithMetadata|10       |1970-01-01 01:00:20    |1970-01-01 01:00:20             |1970-01-01 01:00:30|10       |
|endedSessionsWithMetadata|2        |1970-01-01 01:00:02    |1970-01-01 01:00:02             |1970-01-01 01:00:12|2        |
|endedSessionsWithMetadata|0        |1970-01-01 01:00:00.001|1970-01-01 01:00:00.001         |1970-01-01 01:00:10|0        |
 ------------------------- --------- ----------------------- -------------------------------- ------------------- --------- 

-------------------------------------------                                     
Batch: 1 ("start event")
-------------------------------------------
 ------------------------- --------- --------------------- -------------------------------- 
|sessionStartsWithMetadata|sessionId|sessionStartTimestamp|sessionOptionalMetadataTimestamp|
 ------------------------- --------- --------------------- -------------------------------- 
|sessionStartsWithMetadata|1        |1970-01-01 01:00:02  |null                            | ← has no metadata ✔
 ------------------------- --------- --------------------- -------------------------------- 

-------------------------------------------                                     
Batch: 1 ("end event")
-------------------------------------------
 ------------------------- --------- --------------------- -------------------------------- ------------------- --------- 
|endedSessionsWithMetadata|sessionId|sessionStartTimestamp|sessionOptionalMetadataTimestamp|sessionEndTimestamp|sessionId|
 ------------------------- --------- --------------------- -------------------------------- ------------------- --------- 
 ------------------------- --------- --------------------- -------------------------------- ------------------- --------- 
  ↳ ✘ here I would have expected a line with sessionId=1, that has "start" and "end" information, but no "metadata" ✘


  

Кто-нибудь может объяснить, почему при обработке потока событие «end» без «метаданных» ( sessionId=1 ) отсутствует? Что мне нужно сделать, чтобы он отображался в выходных данных?

Большое спасибо!

Комментарии:

1. каков эффект, если у вас есть водяной знак 5 секунд?

2. Я в отпуске, поэтому я не слишком увлекаюсь этим.

3. @thebluephantom Спасибо, что посмотрели на это! К сожалению, водяной знак в 5 секунд не меняет результат.

4. Мой комментарий выше был недействительным. возвращаюсь из hols. Запустил его и немного любопытно. Консультационные руководства. Завтра проведу некоторые эксперименты.

5. Это ничего не вернуло: … sessionEndEvents(«sessionEndTimestamp»), sessionEndEvents(«sessionEndTimestamp»).plus(выражение (s «ИНТЕРВАЛ 10 секунд»))

Ответ №1:

После значительного тестирования, осмотревшись и перечитав руководство:

  • Должно быть, это ошибка в Spark.
  • Я также отмечаю этот пост в обращении: <a rel="noreferrer noopener nofollow" href="https://lists.apache.org/thread.html/cc6489a19316e7382661d305fabd8c21915e5faf6a928b4869ac2b4a@» rel=»nofollow noreferrer»>https://lists.apache.org/thread.html/cc6489a19316e7382661d305fabd8c21915e5faf6a928b4869ac2b4a@ и хотя понятны глобальные и цепочечные соединения потоков, это указывает на проблему для этого типа обработки.
  • Я запускал Spark Databricks 3.x безрезультатно.

Комментарии:

1. Спасибо за сообщение в списке рассылки. К сожалению, это действительно кажется ограничением Spark.

2. Но это не видно из руководств. Хотя они показывают только ОБЪЕДИНЕНИЕ 2 DS, только DFs. В любом случае, сейчас мы r в no.

3. Хороший вопрос в любом случае.

4. С учетом приведенных выше замечаний был создан искровой билет: issues.apache.org/jira/browse/SPARK-33259