Невозможно отправить данные в MongoDB с помощью структурированной потоковой передачи Kafka-Spark

#mongodb #apache-spark #apache-kafka #spark-structured-streaming

#mongodb #apache-spark #apache-kafka #spark-structured-streaming

Вопрос:

Я не могу сохранить структурированные потоковые данные из Kafka в MongoDB. Это первый раз, когда я внедряю структурированные потоковые данные Kafka-Spark в приемник MongoDB. Я следил за этой статьей https://learningfromdata.blog/2017/04/16/real-time-data-ingestion-with-apache-spark-structured-streaming-implementation/ Предлагается создать MongoForeachWriter и вспомогательный класс вместе с программой structured streaming. Однако, следуя so, мне не удалось просмотреть данные в коллекции MongoDB. Может ли кто-нибудь увидеть и исправить, где я ошибаюсь???

 Error:

Error:

Current State: ACTIVE
Thread State: RUNNABLE

Logical Plan:
Project [cast(value#8 as string) AS value#21]
 - StreamingExecutionRelation KafkaSource[Subscribe[TOPIC_WITH_COMP_P2_R2, TOPIC_WITH_COMP_P2_R2.DIT, TOPIC_WITHOUT_COMP_P2_R2.DIT]], [key#7, value#8, topic#9, partition#10, offset#11L, timestamp#12, timestampType#13]

        at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:295)
        at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:189)
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 2 in stage 1.0 failed 1 times, most recent failure: Lost task 2.0 in stage 1.0 (TID 8, localhost, executor driver): java.lang.ClassCastException: java.lang.String cannot be cast to [B
        at example_new.MongoDBForeachWriter.process(MongoDBForeachWriter.scala:42)
        at example_new.MongoDBForeachWriter.process(MongoDBForeachWriter.scala:15)
        at org.apache.spark.sql.execution.streaming.ForeachSink$$anonfun$addBatch$1.apply(ForeachSink.scala:53)
        at org.apache.spark.sql.execution.streaming.ForeachSink$$anonfun$addBatch$1.apply(ForeachSink.scala:49)
        at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:929)
        at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:929)
        at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2067)
        at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2067)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
        at org.apache.spark.scheduler.Task.run(Task.scala:109)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
        at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1599)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1587)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1586)
        at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
        at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1586)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
        at scala.Option.foreach(Option.scala:257)
        at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:831)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1820)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1769)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1758)
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
        at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:642)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2027)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2048)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2067)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:2092)
        at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:929)
        at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:927)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
        at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
        at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:927)
        at org.apache.spark.sql.execution.streaming.ForeachSink.addBatch(ForeachSink.scala:49)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$3$$anonfun$apply$16.apply(MicroBatchExecution.scala:477)
        at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:77)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$3.apply(MicroBatchExecution.scala:475)
        at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:271)
        at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch(MicroBatchExecution.scala:474)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:133)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:121)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:121)
        at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:271)
        at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:121)
        at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
        at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:117)
        at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:279)
        ... 1 more
Caused by: java.lang.ClassCastException: java.lang.String cannot be cast to [B
        at example_new.MongoDBForeachWriter.process(MongoDBForeachWriter.scala:42)
        at example_new.MongoDBForeachWriter.process(MongoDBForeachWriter.scala:15)
        at org.apache.spark.sql.execution.streaming.ForeachSink$$anonfun$addBatch$1.apply(ForeachSink.scala:53)
        at org.apache.spark.sql.execution.streaming.ForeachSink$$anonfun$addBatch$1.apply(ForeachSink.scala:49)
        at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:929)
        at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:929)
        at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2067)
        at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2067)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
        at org.apache.spark.scheduler.Task.run(Task.scala:109)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
2019-04-22 19:40:26 INFO  SparkContext:54 - Invoking stop() from shutdown hook
2019-04-22 19:40:26 INFO  AbstractConnector:318 - Stopped Spark@907f2b7{HTTP/1.1,[http/1.1]}{0.0.0.0:4040}
2019-04-22 19:40:26 INFO  SparkUI:54 - Stopped Spark web UI at http://W10BZVGSQ2.aus.amer.dell.com:4040
2019-04-22 19:40:26 INFO  MapOutputTrackerMasterEndpoint:54 - MapOutputTrackerMasterEndpoint stopped!
2019-04-22 19:40:26 INFO  MemoryStore:54 - MemoryStore cleared
2019-04-22 19:40:26 INFO  BlockManager:54 - BlockManager stopped
2019-04-22 19:40:26 INFO  BlockManagerMaster:54 - BlockManagerMaster stopped
2019-04-22 19:40:26 INFO  OutputCommitCoordinator$OutputCommitCoordinatorEndpoint:54 - OutputCommitCoordinator stopped!
2019-04-22 19:40:26 WARN  SparkEnv:87 - Exception while deleting Spark temp dir: C:Usersraheem_mohammedAppDataLocalTempspark-f9296938-c32b-42ff-af71-f90efcd49b10userFiles-ee595b18-8c75-41be-b20e-f8c30628c765
java.io.IOException: Failed to delete: C:Usersraheem_mohammedAppDataLocalTempspark-f9296938-c32b-42ff-af71-f90efcd49b10userFiles-ee595b18-8c75-41be-b20e-f8c30628c765
        at org.apache.spark.util.Utils$.deleteRecursively(Utils.scala:1070)
        at org.apache.spark.SparkEnv.stop(SparkEnv.scala:103)
        at org.apache.spark.SparkContext$$anonfun$stop$11.apply$mcV$sp(SparkContext.scala:1940)
        at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1357)
        at org.apache.spark.SparkContext.stop(SparkContext.scala:1939)
        at org.apache.spark.SparkContext$$anonfun$2.apply$mcV$sp(SparkContext.scala:572)
        at org.apache.spark.util.SparkShutdownHook.run(ShutdownHookManager.scala:216)
        at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ShutdownHookManager.scala:188)
        at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply(ShutdownHookManager.scala:188)
        at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply(ShutdownHookManager.scala:188)
        at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1988)
        at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply$mcV$sp(ShutdownHookManager.scala:188)
        at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply(ShutdownHookManager.scala:188)
        at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply(ShutdownHookManager.scala:188)
        at scala.util.Try$.apply(Try.scala:192)
        at org.apache.spark.util.SparkShutdownHookManager.runAll(ShutdownHookManager.scala:188)
        at org.apache.spark.util.SparkShutdownHookManager$$anon$2.run(ShutdownHookManager.scala:178)
        at org.apache.hadoop.util.ShutdownHookManager$1.run(ShutdownHookManager.java:54)
2019-04-22 19:40:26 INFO  SparkContext:54 - Successfully stopped SparkContext
2019-04-22 19:40:26 INFO  ShutdownHookManager:54 - Shutdown hook called
2019-04-22 19:40:26 INFO  ShutdownHookManager:54 - Deleting directory C:Usersraheem_mohammedAppDataLocalTempspark-f9296938-c32b-42ff-af71-f90efcd49b10
2019-04-22 19:40:26 ERROR ShutdownHookManager:91 - Exception while deleting Spark temp dir: C:Usersraheem_mohammedAppDataLocalTempspark-f9296938-c32b-42ff-af71-f90efcd49b10
java.io.IOException: Failed to delete: C:Usersraheem_mohammedAppDataLocalTempspark-f9296938-c32b-42ff-af71-f90efcd49b10
        at org.apache.spark.util.Utils$.deleteRecursively(Utils.scala:1070)
        at org.apache.spark.util.ShutdownHookManager$$anonfun$1$$anonfun$apply$mcV$sp$3.apply(ShutdownHookManager.scala:65)
        at org.apache.spark.util.ShutdownHookManager$$anonfun$1$$anonfun$apply$mcV$sp$3.apply(ShutdownHookManager.scala:62)
        at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
        at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
        at org.apache.spark.util.ShutdownHookManager$$anonfun$1.apply$mcV$sp(ShutdownHookManager.scala:62)
        at org.apache.spark.util.SparkShutdownHook.run(ShutdownHookManager.scala:216)
        at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ShutdownHookManager.scala:188)
        at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply(ShutdownHookManager.scala:188)
        at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply(ShutdownHookManager.scala:188)
        at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1988)
        at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply$mcV$sp(ShutdownHookManager.scala:188)
        at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply(ShutdownHookManager.scala:188)
        at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply(ShutdownHookManager.scala:188)
        at scala.util.Try$.apply(Try.scala:192)
        at org.apache.spark.util.SparkShutdownHookManager.runAll(ShutdownHookManager.scala:188)
        at org.apache.spark.util.SparkShutdownHookManager$$anon$2.run(ShutdownHookManager.scala:178)
        at org.apache.hadoop.util.ShutdownHookManager$1.run(ShutdownHookManager.java:54)
2019-04-22 19:40:26 INFO  ShutdownHookManager:54 - Deleting directory C:Usersraheem_mohammedAppDataLocalTempspark-992d4d7e-ea11-4295-9368-c4038b26f895
2019-04-22 19:40:26 INFO  ShutdownHookManager:54 - Deleting directory C:Usersraheem_mohammedAppDataLocalTemptemporaryReader-4d362eeb-6ee5-4a48-9da9-3792a22ec1ca
2019-04-22 19:40:26 INFO  ShutdownHookManager:54 - Deleting directory C:Usersraheem_mohammedAppDataLocalTemptemporary-add2fc32-1623-4784-8df1-f5cb0a1dd9fc
2019-04-22 19:40:26 INFO  ShutdownHookManager:54 - Deleting directory C:Usersraheem_mohammedAppDataLocalTempspark-f9296938-c32b-42ff-af71-f90efcd49b10userFiles-ee595b18-8c75-41be-b20e-f8c30628c765
2019-04-22 19:40:26 ERROR ShutdownHookManager:91 - Exception while deleting Spark temp dir: C:Usersraheem_mohammedAppDataLocalTempspark-f9296938-c32b-42ff-af71-f90efcd49b10userFiles-ee595b18-8c75-41be-b20e-f8c30628c765
java.io.IOException: Failed to delete: C:Usersraheem_mohammedAppDataLocalTempspark-f9296938-c32b-42ff-af71-f90efcd49b10userFiles-ee595b18-8c75-41be-b20e-f8c30628c765
        at org.apache.spark.util.Utils$.deleteRecursively(Utils.scala:1070)
        at org.apache.spark.util.ShutdownHookManager$$anonfun$1$$anonfun$apply$mcV$sp$3.apply(ShutdownHookManager.scala:65)
        at org.apache.spark.util.ShutdownHookManager$$anonfun$1$$anonfun$apply$mcV$sp$3.apply(ShutdownHookManager.scala:62)
        at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
        at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
        at org.apache.spark.util.ShutdownHookManager$$anonfun$1.apply$mcV$sp(ShutdownHookManager.scala:62)
        at org.apache.spark.util.SparkShutdownHook.run(ShutdownHookManager.scala:216)
        at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ShutdownHookManager.scala:188)
        at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply(ShutdownHookManager.scala:188)
        at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply(ShutdownHookManager.scala:188)
        at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1988)
        at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply$mcV$sp(ShutdownHookManager.scala:188)
        at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply(ShutdownHookManager.scala:188)
        at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply(ShutdownHookManager.scala:188)
        at scala.util.Try$.apply(Try.scala:192)
        at org.apache.spark.util.SparkShutdownHookManager.runAll(ShutdownHookManager.scala:188)
        at org.apache.spark.util.SparkShutdownHookManager$$anon$2.run(ShutdownHookManager.scala:178)
        at org.apache.hadoop.util.ShutdownHookManager$1.run(ShutdownHookManager.java:54)
  

Созданы MongoForeachWriter.scala, Helper.scala и StructuredStreamingProgram.scala

 package example

import java.util.concurrent.TimeUnit

import scala.concurrent.Await
import scala.concurrent.duration.Duration

import org.mongodb.scala._

object Helpers {

  implicit class DocumentObservable[C](val observable: Observable[Document]) extends ImplicitObservable[Document] {
    override val converter: (Document) => String = (doc) => doc.toJson
  }

  implicit class GenericObservable[C](val observable: Observable[C]) extends ImplicitObservable[C] {
    override val converter: (C) => String = (doc) => doc.toString
  }

  trait ImplicitObservable[C] {
    val observable: Observable[C]
    val converter: (C) => String

    def results(): Seq[C] = Await.result(observable.toFuture(), Duration(10, TimeUnit.SECONDS))
    def headResult() = Await.result(observable.head(), Duration(10, TimeUnit.SECONDS))
    def printResults(initial: String = ""): Unit = {
      if (initial.length > 0) print(initial)
      results().foreach(res => println(converter(res)))
    }
    def printHeadResult(initial: String = ""): Unit = println(s"${initial}${converter(headResult())}")
  }

}
  

 package example
import java.util.Calendar

import org.apache.spark.util.LongAccumulator
import org.apache.spark.sql.Row
import org.apache.spark.sql.ForeachWriter
import org.mongodb.scala._
import org.mongodb.scala.bson.collection.mutable.Document
import org.mongodb.scala.bson._
import example.Helpers._

import scala.util.Try


class MongoDBForeachWriter(p_uri: String,
                           p_dbName: String,
                           p_collectionName: String,
                           p_messageCountAccum: LongAccumulator) extends ForeachWriter[Row] {

  val mongodbURI = p_uri
  val dbName = p_dbName
  val collectionName = p_collectionName
  val messageCountAccum = p_messageCountAccum

  var mongoClient: MongoClient = null
  var db: MongoDatabase = null
  var collection: MongoCollection[Document] = null

  def ensureMongoDBConnection(): Unit = {
    if (mongoClient == null) {
      mongoClient = MongoClient(mongodbURI)
      db = mongoClient.getDatabase(dbName)
      collection = db.getCollection(collectionName)
    }
  }

  override def open(partitionId: Long, version: Long): Boolean = {
    true
  }

  override def process(record: Row): Unit = {
    val valueStr = new String(record.getAs[Array[Byte]]("value"))

    val doc: Document = Document(valueStr)
    doc  = ("log_time" -> Calendar.getInstance().getTime())

    // lazy opening of MongoDB connection
    ensureMongoDBConnection()
    val result = collection.insertOne(doc)

    // tracks how many records I have processed
    if (messageCountAccum != null)
      messageCountAccum.add(1)
  }

  override def close(errorOrNull: Throwable): Unit = {
    if(mongoClient != null) {
      Try {
        mongoClient.close()
      }
    }
  }
}
  

 package example
import org.apache.spark.sql.functions.{col, _}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.util.LongAccumulator
import example.Helpers._
import java.util.Calendar

object StructuredStreamingProgram {

  def main(args: Array[String]): Unit = {

    val spark = SparkSession
      .builder()
      .master("local[*]")
      .appName("OSB_Streaming_Model")
      .getOrCreate()

    import spark.implicits._

    val df = spark
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "10.160.172.45:9092, 10.160.172.46:9092, 10.160.172.100:9092")
      .option("subscribe", "TOPIC_WITH_COMP_P2_R2, TOPIC_WITH_COMP_P2_R2.DIT, TOPIC_WITHOUT_COMP_P2_R2.DIT")
      .load()

    val dfs = df.selectExpr("CAST(value AS STRING)")

    // sends to MongoDB once every 20 seconds
    val mongodb_uri = "mongodb://dstk8sdev06.us.dell.com:27018"
    val mdb_name = "HANZO_MDB"
    val mdb_collection = "Spark"
    val CountAccum: LongAccumulator = spark.sparkContext.longAccumulator("mongostreamcount")

    val structuredStreamForeachWriter: MongoDBForeachWriter = new MongoDBForeachWriter(mongodb_uri,mdb_name,mdb_collection,CountAccum)
    val query = dfs.writeStream
      .foreach(structuredStreamForeachWriter)
      .trigger(Trigger.ProcessingTime("20 seconds"))
      .start()

    while (!spark.streams.awaitAnyTermination(60000)) {
      println(Calendar.getInstance().getTime() " :: mongoEventsCount = " CountAccum.value)
    }

  }
}
  

Мне нужно сохранить структурированные потоковые данные в Mongo collection

Комментарии:

1. Вызвано: org.apache.spark.SparkException: Задание прервано из-за сбоя этапа: задача 2 на этапе 1.0 не удалась 1 раз, последний сбой: утеряна задача 2.0 на этапе 1.0 (TID 8, localhost, драйвер исполнителя): java.lang. ClassCastException: java.lang. Строка не может быть преобразована в [B.

2. Проблема в том, что когда вы пытаетесь привести объект, здесь val dfs = df.selectExpr("CAST(value AS STRING)")

3. можете ли вы опубликовать какую-либо часть данных, которые вы пытаетесь преобразовать или получить?

4. @KenrySanchez Спасибо, что помогли мне. Ниже приведены потоковые данные из Kafka _raw _time «2019-04-15 00:42:32,819 INFO — Пн Апр 15 00:42:32 CDT 2019 ID:<237027.1555306952812.0> svc8_pubsub2_prod_osb svc8_pubsub2_prod_osb_ms17 ISPFSDPartnerPubSub/4_2 / ProxyServices/ InboundAndOutbound/AP/inboundpartner communicationsapplps businesskeys [Сообщение отправлено в Siebel LPQ.BusinessKeys(UUID: 383aebcb-d708-42e0-842b-42cad6ed21f3, DPSNum: 91913796263, MessageTypeID: ServiceStatusUpdate, размер сообщения) 231 Время преобразования (0.01)] мс » 2019-04-15T00:42:32.819-0500

Ответ №1:

Согласно ошибке, у вас уже есть строка (вы уже сделали df.selectExpr("CAST(value AS STRING)") ), поэтому вам следует попробовать получить событие строки как String , а не как Array[Byte]

Начните с изменения

 val valueStr = new String(record.getAs[Array[Byte]]("value"))
  

Для

 val valueStr = record.getAs[String]("value")
  

Я понимаю, что у вас, возможно, уже есть кластер для запуска кода Spark, но я бы посоветовал все же изучить Kafka Connect Mongo Sink Connector, чтобы вам не приходилось писать и поддерживать свой собственный Mongo writer в коде Spark.

Или вы можете также напрямую записывать наборы данных Spark в mongo

Комментарии:

1. Привет @cricket_007, спасибо за ваше предложение. я попробую, как вы указали. Мы используем Spark для анализа потоковых данных в реальном времени и пытаемся сохранить их в MongoDB

2. Потоки Spark или Kafka могут работать только для чтения и записи в Kafka, выполняя оконный анализ… Лично я считаю, что использовать Kafka Connect проще, чем самостоятельно определять распределенных авторов в Spark. Это все