почему потоковый запрос spark завершается ошибкой java.util.concurrent.Исключение TimeoutException: Фьючерсы истекли по истечении [5 минут]

#spark-streaming #azure-eventhub #watermark

Вопрос:

У меня есть потоковый запрос, передающий данные из Azure Eventhubs в ADLS каждые 5 секунд, и один и тот же потоковый запрос является водяным знаком в течение 1 часа с задержкой в 5 минут.

Код:

     val rawStreamQuery = messages.writeStream.format("delta")
    .option("checkpointLocation", BASE_LOC   "checkpoint/"   RAW_SCHEMA_NAME   "/"   RAW_TASK_TABLE)
    .trigger(Trigger.ProcessingTime(RAW_STREAM_TRIGGER_INTERVAL))
    .table(RAW_SCHEMA_NAME   "."   RAW_TASK_TABLE)

    rawStreamQuery.withWatermark(watermarkTimeStamp, STREAM_WATERMARK) //5 minutes 
      .groupBy(window(col(watermarkTimeStamp), STREAM_WINDOW).as("window")) //1 hour        
      .count()
      .select(
        lit(commonDataObj.getFeedName).as("feed_name")
        , lit(commonDataObj.getStage).as("stage_name")
        , col("count").as("record_count")
        , col("window").getField("start").as("start_ts")
        , col("window").getField("end").as("end_ts")
      )
 

Получаю ошибку ниже.

 Caused by: java.util.concurrent.TimeoutException: Futures timed out after [5 minutes]
at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:259)
at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:263)
at scala.concurrent.Await$.$anonfun$result$1(package.scala:220)
at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:57)
at scala.concurrent.Await$.result(package.scala:146)
at org.apache.spark.eventhubs.client.CachedEventHubsReceiver.createReceiver(CachedEventHubsReceiver.scala:99)
at org.apache.spark.eventhubs.client.CachedEventHubsReceiver.recreateReceiver(CachedEventHubsReceiver.scala:151)
at org.apache.spark.eventhubs.client.CachedEventHubsReceiver.checkCursor(CachedEventHubsReceiver.scala:169)
at org.apache.spark.eventhubs.client.CachedEventHubsReceiver.org$apache$spark$eventhubs$client$CachedEventHubsReceiver$receive(CachedEventHubsReceiver.scala:231)
at org.apache.spark.eventhubs.client.CachedEventHubsReceiver$.receive(CachedEventHubsReceiver.scala:356)
at org.apache.spark.eventhubs.rdd.EventHubsRDD.compute(EventHubsRDD.scala:123)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:356)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:320)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:356)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:320)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:356)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:320)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:356)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:320)
at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
at org.apache.spark.scheduler.Task.doRunTask(Task.scala:144)
at org.apache.spark.scheduler.Task.run(Task.scala:117)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$9(Executor.scala:640)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1581)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:643)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
 

Комментарии:

1. Вам удалось найти решение этой проблемы? У меня возникают похожие проблемы

2. нет … что-то связанное с замком зажигания. подожди, я думаю.. пожалуйста, дайте мне знать, если вы сможете исправить то же самое.

3. У нас уже установлено значение 30 секунд.. кроме того, теперь мы также отключили упреждение искры, установив spark.databricks.preemption.enabled false его из-за некоторых комментариев здесь . Однако мы сталкиваемся с этой проблемой нерегулярно; в последний раз наше потоковое задание выполнялось за 130 часов до истечения тайм-аута, поэтому нам придется посмотреть, поможет ли это

4. то же самое here..it происходит нерегулярно.. дам вам знать, если что-нибудь всплывет..