Задания ставятся в очередь в SparkStreaming

#apache-spark #amazon-ec2 #spark-streaming

#apache-spark #amazon-ec2 #spark-streaming

Вопрос:

У меня есть кластер spark с тремя узлами на Amazon EC2, два из них находятся в зоне доступности 1-a, а один — в зоне доступности 1b. Я запустил spark-streaming с использованием 2 ядер. Одно ядро будет использоваться получателем, а другое ядро останется для обработки задач. Когда исполнитель для обработки задач запущен в зоне 1-a, он работает отлично. Но если исполнитель для обработки задач запускается в зоне 1b, пакетные задания часто ставятся в очередь, и время обработки продолжает увеличиваться. Я прилагаю скриншот ниже : введите описание изображения здесь

Кроме того, однажды я получил эту ошибку. Итак, я создал запись в /etc/hosts для каждого узла spark. Но, несмотря на это, он поставил задания в очередь. Кто-нибудь может помочь мне указать на проблему здесь.

 FetchFailed(BlockManagerId(0, ip-x-x-x-x, 49445), shuffleId=32, mapId=0, reduceId=1, message=
org.apache.spark.shuffle.FetchFailedException: Failed to connect to ip-x-x-x-x:49445
    at org.apache.spark.storage.ShuffleBlockFetcherIterator.throwFetchFailedException(ShuffleBlockFetcherIterator.scala:323)
    at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:300)
    at org.apache.spark.storage.ShuffleBlockFetcherIterator.next(ShuffleBlockFetcherIterator.scala:51)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
    at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
    at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
    at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:197)
    at org.apache.spark.shuffle.BlockStoreShuffleReader.read(BlockStoreShuffleReader.scala:103)
    at org.apache.spark.rdd.ShuffledRDD.compute(ShuffledRDD.scala:98)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
    at org.apache.spark.scheduler.Task.run(Task.scala:89)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: Failed to connect to ip-x-x-x-x:49445
    at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:216)
    at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:167)
    at org.apache.spark.network.netty.NettyBlockTransferService$$anon$1.createAndStart(NettyBlockTransferService.scala:90)
    at org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140)
    at org.apache.spark.network.shuffle.RetryingBlockFetcher.access$200(RetryingBlockFetcher.java:43)
    at org.apache.spark.network.shuffle.RetryingBlockFetcher$1.run(RetryingBlockFetcher.java:170)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
    at java.util.concurrent.FutureTask.run(FutureTask.java:262)
    ... 3 more
Caused by: java.nio.channels.UnresolvedAddressException
    at sun.nio.ch.Net.checkAddress(Net.java:107)
    at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:649)
    at io.netty.channel.socket.nio.NioSocketChannel.doConnect(NioSocketChannel.java:209)
    at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.connect(AbstractNioChannel.java:207)
    at io.netty.channel.DefaultChannelPipeline$HeadContext.connect(DefaultChannelPipeline.java:1097)
    at io.netty.channel.AbstractChannelHandlerContext.invokeConnect(AbstractChannelHandlerContext.java:471)
    at io.netty.channel.AbstractChannelHandlerContext.connect(AbstractChannelHandlerContext.java:456)
    at io.netty.channel.ChannelOutboundHandlerAdapter.connect(ChannelOutboundHandlerAdapter.java:47)
    at io.netty.channel.AbstractChannelHandlerContext.invokeConnect(AbstractChannelHandlerContext.java:471)
    at io.netty.channel.AbstractChannelHandlerContext.connect(AbstractChannelHandlerContext.java:456)
    at io.netty.channel.ChannelDuplexHandler.connect(ChannelDuplexHandler.java:50)
    at io.netty.channel.AbstractChannelHandlerContext.invokeConnect(AbstractChannelHandlerContext.java:471)
    at io.netty.channel.AbstractChannelHandlerContext.connect(AbstractChannelHandlerContext.java:456)
    at io.netty.channel.AbstractChannelHandlerContext.connect(AbstractChannelHandlerContext.java:438)
    at io.netty.channel.DefaultChannelPipeline.connect(DefaultChannelPipeline.java:908)
    at io.netty.channel.AbstractChannel.connect(AbstractChannel.java:203)
    at io.netty.bootstrap.Bootstrap$2.run(Bootstrap.java:166)
    at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:357)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:357)
    at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
    ... 1 more