#scala #jms #spark-streaming #ibm-mq #jms-serializer
#scala #jms #искровая передача #ibm-mq #jms-сериализатор
Вопрос:
В настоящее время я создаю приложение, которое считывает сообщения (транзакции в json) в разделе Kafka и отправляет в IBM MQ на производстве. У меня возникли некоторые проблемы с сериализацией в классах JMS, и я немного потерял представление о том, как это исправить. Мой код:
object DispatcherMqApp extends Serializable {
private val logger = LoggerFactory.getLogger(this.getClass)
val config = ConfigFactory.load()
def inicialize(transactionType: String) = {
val spark = new SparkConf()
.setAppName("Dispatcher MQ Categorization")
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.set("spark.streaming.stopGracefullyOnShutDown", "true")
logger.debug(s"Loading configuration at ${printConfig(config).head} =>n${printConfig(config)(1)}")
val kafkaConfig = KafkaConfig.buildFromConfiguration(config, "dispatcher-mq")
val streamCtx = new StreamingContext(spark, Seconds(kafkaConfig.streamingInterval))
sys.ShutdownHookThread {
logger.warn("Stopping the application ...")
streamCtx.stop(stopSparkContext = true, stopGracefully = true)
logger.warn("Application Finish with Success !!!")
}
val topic = config.getString(s"conf.dispatcher-mq.consumer-topic.$transactionType")
logger.info(s"Topic: $topic")
val zkdir = s"${kafkaConfig.zookeeperBaseDir}$transactionType-$topic"
val kafkaManager = new KafkaManager(kafkaConfig)
val stream = kafkaManager.createStreaming(streamCtx, kafkaConfig.offset, topic, zkdir)
val kafkaSink = streamCtx.sparkContext.broadcast(kafkaManager.createProducer())
val mqConfig = MQConfig(config.getString("conf.mq.name"),
config.getString("conf.mq.host"),
config.getInt("conf.mq.port"),
config.getString("conf.mq.channel"),
config.getString("conf.mq.queue-manager"),
config.getInt("conf.mq.retries"),
config.getString("conf.mq.app-name"),
Try(config.getString("conf.mq.user")).toOption,
Try(config.getString("conf.mq.password")).toOption,
config.getString("conf.dispatcher-mq.send.category_file"))
val queueConn = new MQService(mqConfig)
(stream, queueConn, streamCtx, kafkaSink, zkdir)
}
def main(args: Array[String]): Unit = {
val transactionType = args.head
if (transactionType=="account" | transactionType=="credit") {
val (messages, queueConn, sc, kafkaSink, zkdir) = inicialize(transactionType)
val fieldsType = config.getString(s"conf.dispatcher-mq.send.fields.$transactionType")
val source = config.getString("conf.dispatcher-mq.parameters.source")
val mqVersion = config.getString(s"conf.dispatcher-mq.parameters.version.$transactionType")
val topicError = config.getString("conf.kafka.topic_error")
messages.foreachRDD(rdd => {
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
rdd.map(_._2).filter(_.toUpperCase.contains("BYCATEGORIZER"))
.foreach(message => {
val msg:Option[TextMessage] = try {
Some(queueConn.createOutputMq(message, fieldsType, source, mqVersion))
} catch {
case ex: Exception =>
logger.error(s"[ERROR] input: [[$message]]n$ex")
val errorReport = ErrorReport("GENERAL", "DISPATCHER-MQ", transactionType.toString, ex.getMessage, None, Option(ex.toString))
ErrorReportService.sendError(errorReport, topicError, kafkaSink.value)
None
}
if(msg.nonEmpty) queueConn.submit(msg.get)
})
logger.info(s"Save Offset in $zkdir...n${offsetRanges.toList.to}")
ZookeeperConn.saveOffsets(zkdir, offsetRanges)
})
sc.start()
sc.awaitTermination()
} else
logger.error(s"${args.head} is not a valid argument. ( account or credit ) !!! ")
}
У меня ошибка при сериализации JMSConnection, которая вызывается скрытой в createOutputMq
методе. Ошибка заключается в:
20/09/04 17:21:00 ERROR JobScheduler: Error running job streaming job 1599250860000 ms.0
org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304)
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2054)
at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:918)
at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:917)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:323)
at org.apache.spark.rdd.RDD.foreach(RDD.scala:917)
at br.com.HIDDEN.dispatcher.DispatcherMqApp$$anonfun$main$1.apply(DispatcherMqApp.scala:80)
at br.com.HIDDEN.dispatcher.DispatcherMqApp$$anonfun$main$1.apply(DispatcherMqApp.scala:76)
at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661)
at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:50)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50)
at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:49)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49)
at scala.util.Try$.apply(Try.scala:161)
at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:227)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:227)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:227)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:226)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.NotSerializableException: com.ibm.msg.client.jms.JmsConnection
Serialization stack:
at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101)
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:301)
... 30 more
20/09/04 17:21:00 ERROR ApplicationMaster: User class threw exception: org.apache.spark.SparkException: Task not serializable
org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304)
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2054)
at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:918)
at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:917)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:323)
at org.apache.spark.rdd.RDD.foreach(RDD.scala:917)
at br.com.HIDDEN.dispatcher.DispatcherMqApp$$anonfun$main$1.apply(DispatcherMqApp.scala:80)
at br.com.HIDDEN.dispatcher.DispatcherMqApp$$anonfun$main$1.apply(DispatcherMqApp.scala:76)
at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661)
at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:50)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50)
at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:49)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49)
at scala.util.Try$.apply(Try.scala:161)
at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:227)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:227)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:227)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:226)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.NotSerializableException: com.ibm.msg.client.jms.JmsConnection
Serialization stack:
at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101)
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:301)
... 30 more
У кого-нибудь есть идеи о том, как это исправить? Строки, показанные в сообщении об ошибке (76 и 80), являются моими messages.foreachRDD(rdd => {
и .foreach(message => {
соответственно.
Заранее спасибо
Комментарии:
1. Для IBM MQ уже есть соединитель приемника Kafka, есть ли причина не использовать это?
2. к сожалению, у нас нет этого на нашей рабочей машине, и нам вроде как не разрешено устанавливать что-либо еще, что поступает оттуда. Но я посмотрю, спасибо!
3. и, о, наша версия kafka все еще находится в версии 0.10.1
4. Я знаю, что исходный код включен в репозиторий IBM MQ messaging github. Если хотите, найдите ссылку в Google.