#scala #apache-spark #apache-spark-sql #spark-structured-streaming
#scala #apache-spark #apache-spark-sql #spark-structured-streaming
Вопрос:
Я пытаюсь объединить данные с фреймом данных, который, в свою очередь, является результатом левого соединения. В то время как при пакетной обработке это работает должным образом, при потоковой обработке некоторые записи теряются…
Ниже я создал минимальный пример «сеансов», в которых есть события «start» и «end» и, при необходимости, некоторые «метаданные».
Скрипт генерирует два выходных данных: sessionStartsWithMetadata
результат событий «start», которые объединены слева с событиями «metadata» на основе sessionId
. Используется «Левое соединение», поскольку нам нравится получать событие вывода, даже если не существует соответствующих метаданных.
Кроме endedSessionsWithMetadata
того, создается фрейм данных путем присоединения событий «end» к ранее созданному фрейму данных. Здесь используется «внутреннее соединение», поскольку мы хотим получить некоторый вывод только после того, как сеанс наверняка закончился.
Этот код может быть выполнен в spark-shell
:
import java.sql.Timestamp
import org.apache.spark.sql.execution.streaming.{MemoryStream, StreamingQueryWrapper}
import org.apache.spark.sql.streaming.StreamingQuery
import org.apache.spark.sql.{DataFrame, SQLContext}
import org.apache.spark.sql.functions.{col, expr, lit}
import spark.implicits._
implicit val sqlContext: SQLContext = spark.sqlContext
// Main data processing, regardless whether batch or stream processing
def process(
sessionStartEvents: DataFrame,
sessionOptionalMetadataEvents: DataFrame,
sessionEndEvents: DataFrame
): (DataFrame, DataFrame) = {
val sessionStartsWithMetadata: DataFrame = sessionStartEvents
.join(
sessionOptionalMetadataEvents,
sessionStartEvents("sessionId") === sessionOptionalMetadataEvents("sessionId") amp;amp;
sessionStartEvents("sessionStartTimestamp").between(
sessionOptionalMetadataEvents("sessionOptionalMetadataTimestamp").minus(expr(s"INTERVAL 1 seconds")),
sessionOptionalMetadataEvents("sessionOptionalMetadataTimestamp").plus(expr(s"INTERVAL 1 seconds"))
),
"left" // metadata is optional
)
.select(
sessionStartEvents("sessionId"),
sessionStartEvents("sessionStartTimestamp"),
sessionOptionalMetadataEvents("sessionOptionalMetadataTimestamp")
)
val endedSessionsWithMetadata = sessionStartsWithMetadata.join(
sessionEndEvents,
sessionStartsWithMetadata("sessionId") === sessionEndEvents("sessionId") amp;amp;
sessionStartsWithMetadata("sessionStartTimestamp").between(
sessionEndEvents("sessionEndTimestamp").minus(expr(s"INTERVAL 10 seconds")),
sessionEndEvents("sessionEndTimestamp")
)
)
(sessionStartsWithMetadata, endedSessionsWithMetadata)
}
def streamProcessing(
sessionStartData: Seq[(Timestamp, Int)],
sessionOptionalMetadata: Seq[(Timestamp, Int)],
sessionEndData: Seq[(Timestamp, Int)]
): (StreamingQuery, StreamingQuery) = {
val sessionStartEventsStream: MemoryStream[(Timestamp, Int)] = MemoryStream[(Timestamp, Int)]
sessionStartEventsStream.addData(sessionStartData)
val sessionStartEvents: DataFrame = sessionStartEventsStream
.toDS()
.toDF("sessionStartTimestamp", "sessionId")
.withWatermark("sessionStartTimestamp", "1 second")
val sessionOptionalMetadataEventsStream: MemoryStream[(Timestamp, Int)] = MemoryStream[(Timestamp, Int)]
sessionOptionalMetadataEventsStream.addData(sessionOptionalMetadata)
val sessionOptionalMetadataEvents: DataFrame = sessionOptionalMetadataEventsStream
.toDS()
.toDF("sessionOptionalMetadataTimestamp", "sessionId")
.withWatermark("sessionOptionalMetadataTimestamp", "1 second")
val sessionEndEventsStream: MemoryStream[(Timestamp, Int)] = MemoryStream[(Timestamp, Int)]
sessionEndEventsStream.addData(sessionEndData)
val sessionEndEvents: DataFrame = sessionEndEventsStream
.toDS()
.toDF("sessionEndTimestamp", "sessionId")
.withWatermark("sessionEndTimestamp", "1 second")
val (sessionStartsWithMetadata, endedSessionsWithMetadata) =
process(sessionStartEvents, sessionOptionalMetadataEvents, sessionEndEvents)
val sessionStartsWithMetadataQuery = sessionStartsWithMetadata
.select(lit("sessionStartsWithMetadata"), col("*")) // Add label to see which query's output it is
.writeStream
.outputMode("append")
.format("console")
.option("truncate", "false")
.option("numRows", "1000")
.start()
val endedSessionsWithMetadataQuery = endedSessionsWithMetadata
.select(lit("endedSessionsWithMetadata"), col("*")) // Add label to see which query's output it is
.writeStream
.outputMode("append")
.format("console")
.option("truncate", "false")
.option("numRows", "1000")
.start()
(sessionStartsWithMetadataQuery, endedSessionsWithMetadataQuery)
}
def batchProcessing(
sessionStartData: Seq[(Timestamp, Int)],
sessionOptionalMetadata: Seq[(Timestamp, Int)],
sessionEndData: Seq[(Timestamp, Int)]
): Unit = {
val sessionStartEvents = spark.createDataset(sessionStartData).toDF("sessionStartTimestamp", "sessionId")
val sessionOptionalMetadataEvents = spark.createDataset(sessionOptionalMetadata).toDF("sessionOptionalMetadataTimestamp", "sessionId")
val sessionEndEvents = spark.createDataset(sessionEndData).toDF("sessionEndTimestamp", "sessionId")
val (sessionStartsWithMetadata, endedSessionsWithMetadata) =
process(sessionStartEvents, sessionOptionalMetadataEvents, sessionEndEvents)
println("sessionStartsWithMetadata")
sessionStartsWithMetadata.show(100, truncate = false)
println("endedSessionsWithMetadata")
endedSessionsWithMetadata.show(100, truncate = false)
}
// Data is represented as tuples of (eventTime, sessionId)...
val sessionStartData = Vector(
(new Timestamp(1), 0),
(new Timestamp(2000), 1),
(new Timestamp(2000), 2),
(new Timestamp(20000), 10)
)
val sessionOptionalMetadata = Vector(
(new Timestamp(1), 0),
// session `1` has no metadata
(new Timestamp(2000), 2),
(new Timestamp(20000), 10)
)
val sessionEndData = Vector(
(new Timestamp(10000), 0),
(new Timestamp(11000), 1),
(new Timestamp(12000), 2),
(new Timestamp(30000), 10)
)
batchProcessing(sessionStartData, sessionOptionalMetadata, sessionEndData)
val (sessionStartsWithMetadataQuery, endedSessionsWithMetadataQuery) =
streamProcessing(sessionStartData, sessionOptionalMetadata, sessionEndData)
В примере сеанса с идентификатором 1
нет метаданных, поэтому соответствующий столбец метаданных есть null
.
Основная функциональность объединения данных реализована в def process(…)
, которая вызывается с использованием как пакетных данных, так и потоковых данных.
В пакетной версии результат соответствует ожидаемому:
sessionStartsWithMetadata
--------- ----------------------- --------------------------------
|sessionId|sessionStartTimestamp |sessionOptionalMetadataTimestamp|
--------- ----------------------- --------------------------------
|0 |1970-01-01 01:00:00.001|1970-01-01 01:00:00.001 |
|1 |1970-01-01 01:00:02 |null | ← has no metadata ✔
|2 |1970-01-01 01:00:02 |1970-01-01 01:00:02 |
|10 |1970-01-01 01:00:20 |1970-01-01 01:00:20 |
--------- ----------------------- --------------------------------
endedSessionsWithMetadata
--------- ----------------------- -------------------------------- ------------------- ---------
|sessionId|sessionStartTimestamp |sessionOptionalMetadataTimestamp|sessionEndTimestamp|sessionId|
--------- ----------------------- -------------------------------- ------------------- ---------
|0 |1970-01-01 01:00:00.001|1970-01-01 01:00:00.001 |1970-01-01 01:00:10|0 |
|1 |1970-01-01 01:00:02 |null |1970-01-01 01:00:11|1 | ← has no metadata ✔
|2 |1970-01-01 01:00:02 |1970-01-01 01:00:02 |1970-01-01 01:00:12|2 |
|10 |1970-01-01 01:00:20 |1970-01-01 01:00:20 |1970-01-01 01:00:30|10 |
--------- ----------------------- -------------------------------- ------------------- ---------
Но когда выполняется та же обработка, что и обработка потока, вывод endedSessionsWithMetadata
не содержит записи сеанса 1
, у которого нет метаданных:
-------------------------------------------
Batch: 0 ("start event")
-------------------------------------------
------------------------- --------- ----------------------- --------------------------------
|sessionStartsWithMetadata|sessionId|sessionStartTimestamp |sessionOptionalMetadataTimestamp|
------------------------- --------- ----------------------- --------------------------------
|sessionStartsWithMetadata|10 |1970-01-01 01:00:20 |1970-01-01 01:00:20 |
|sessionStartsWithMetadata|2 |1970-01-01 01:00:02 |1970-01-01 01:00:02 |
|sessionStartsWithMetadata|0 |1970-01-01 01:00:00.001|1970-01-01 01:00:00.001 |
------------------------- --------- ----------------------- --------------------------------
-------------------------------------------
Batch: 0 ("end event")
-------------------------------------------
------------------------- --------- ----------------------- -------------------------------- ------------------- ---------
|endedSessionsWithMetadata|sessionId|sessionStartTimestamp |sessionOptionalMetadataTimestamp|sessionEndTimestamp|sessionId|
------------------------- --------- ----------------------- -------------------------------- ------------------- ---------
|endedSessionsWithMetadata|10 |1970-01-01 01:00:20 |1970-01-01 01:00:20 |1970-01-01 01:00:30|10 |
|endedSessionsWithMetadata|2 |1970-01-01 01:00:02 |1970-01-01 01:00:02 |1970-01-01 01:00:12|2 |
|endedSessionsWithMetadata|0 |1970-01-01 01:00:00.001|1970-01-01 01:00:00.001 |1970-01-01 01:00:10|0 |
------------------------- --------- ----------------------- -------------------------------- ------------------- ---------
-------------------------------------------
Batch: 1 ("start event")
-------------------------------------------
------------------------- --------- --------------------- --------------------------------
|sessionStartsWithMetadata|sessionId|sessionStartTimestamp|sessionOptionalMetadataTimestamp|
------------------------- --------- --------------------- --------------------------------
|sessionStartsWithMetadata|1 |1970-01-01 01:00:02 |null | ← has no metadata ✔
------------------------- --------- --------------------- --------------------------------
-------------------------------------------
Batch: 1 ("end event")
-------------------------------------------
------------------------- --------- --------------------- -------------------------------- ------------------- ---------
|endedSessionsWithMetadata|sessionId|sessionStartTimestamp|sessionOptionalMetadataTimestamp|sessionEndTimestamp|sessionId|
------------------------- --------- --------------------- -------------------------------- ------------------- ---------
------------------------- --------- --------------------- -------------------------------- ------------------- ---------
↳ ✘ here I would have expected a line with sessionId=1, that has "start" and "end" information, but no "metadata" ✘
Кто-нибудь может объяснить, почему при обработке потока событие «end» без «метаданных» ( sessionId=1
) отсутствует? Что мне нужно сделать, чтобы он отображался в выходных данных?
Большое спасибо!
Комментарии:
1. каков эффект, если у вас есть водяной знак 5 секунд?
2. Я в отпуске, поэтому я не слишком увлекаюсь этим.
3. @thebluephantom Спасибо, что посмотрели на это! К сожалению, водяной знак в 5 секунд не меняет результат.
4. Мой комментарий выше был недействительным. возвращаюсь из hols. Запустил его и немного любопытно. Консультационные руководства. Завтра проведу некоторые эксперименты.
5. Это ничего не вернуло: … sessionEndEvents(«sessionEndTimestamp»), sessionEndEvents(«sessionEndTimestamp»).plus(выражение (s «ИНТЕРВАЛ 10 секунд»))
Ответ №1:
После значительного тестирования, осмотревшись и перечитав руководство:
- Должно быть, это ошибка в Spark.
- Я также отмечаю этот пост в обращении: <a rel="noreferrer noopener nofollow" href="https://lists.apache.org/thread.html/cc6489a19316e7382661d305fabd8c21915e5faf6a928b4869ac2b4a@» rel=»nofollow noreferrer»>https://lists.apache.org/thread.html/cc6489a19316e7382661d305fabd8c21915e5faf6a928b4869ac2b4a@ и хотя понятны глобальные и цепочечные соединения потоков, это указывает на проблему для этого типа обработки.
- Я запускал Spark Databricks 3.x безрезультатно.
Комментарии:
1. Спасибо за сообщение в списке рассылки. К сожалению, это действительно кажется ограничением Spark.
2. Но это не видно из руководств. Хотя они показывают только ОБЪЕДИНЕНИЕ 2 DS, только DFs. В любом случае, сейчас мы r в no.
3. Хороший вопрос в любом случае.
4. С учетом приведенных выше замечаний был создан искровой билет: issues.apache.org/jira/browse/SPARK-33259