Ошибка при отправке журналов из Spark в Logstash в базах данных

#logstash #databricks

Вопрос:

Я должен отправлять журналы Spark из баз данных в Logstash. Для этого я использую GelfLogAppender для отправки журналов в виде сообщения GELF по протоколу tcp на сервер logstash, работающий на моей локальной машине.

Вот мой файл конфигурации Logstash:

  input{
   tcp{
   port => 5000
}
}

output{
    elasticsearch{
       hosts => ["http://localhost:9200"]
       index => "logspoc"
       }
    stdout{}
}
 

А вот мой код базы данных:

     import biz.paluch.logging.gelf.log4j.GelfLogAppender
import org.apache.log4j.Logger
import org.apache.spark.sql.SparkSession
object Main{
  def main(args: Array[String]): Unit = {
    
    val rootLogger = Logger.getRootLogger()
rootLogger.addAppender(createAppender(host= "tcp:0.0.0.0", port= 5000))
    

val wordNumber = Seq((10,"Apple"),(10,"Ball"),(10,"Cat"),(20,"Dog")).toDF(colNames= "number","word")
wordNumber.groupBy(col1= "number").count().show()

  }
  def createAppender(host: String, port: Int): GelfLogAppender = {
  val appender = new GelfLogAppender()
  appender.setHost(host)
  appender.setPort(port)
  appender.setExtractStackTrace("true")
  appender.setFilterStackTrace(false)
  //appender.setMaximumMessageSize(8192)
  appender.setIncludeFullMdc(true)
  appender.activateOptions()
  appender
}
}

Main.main(Array("123","23123"))
 

Когда я запускаю эту ячейку, я получаю следующую ошибку:

 log4j:ERROR Connection refused
java.io.IOException: Cannot send data to 0.0.0.0:5000
    at biz.paluch.logging.gelf.intern.sender.GelfTCPSender.sendMessage(GelfTCPSender.java:126)
    at biz.paluch.logging.gelf.log4j.GelfLogAppender.append(GelfLogAppender.java:95)
    at org.apache.log4j.AppenderSkeleton.doAppend(AppenderSkeleton.java:251)
    at org.apache.log4j.helpers.AppenderAttachableImpl.appendLoopOnAppenders(AppenderAttachableImpl.java:66)
    at org.apache.log4j.Category.callAppenders(Category.java:206)
    at org.apache.log4j.Category.forcedLog(Category.java:391)
    at org.apache.log4j.Category.log(Category.java:856)
    at org.slf4j.impl.Log4jLoggerAdapter.info(Log4jLoggerAdapter.java:305)
    at org.apache.spark.internal.Logging.logInfo(Logging.scala:57)
    at org.apache.spark.internal.Logging.logInfo$(Logging.scala:56)
    at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$.logInfo(CodeGenerator.scala:1564)
    at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$anon$2.load(CodeGenerator.scala:1779)
    at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$anon$2.load(CodeGenerator.scala:1770)
    at com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3522)
    at com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2315)
    at com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2278)
    at com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2193)
    at com.google.common.cache.LocalCache.get(LocalCache.java:3932)
    at com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:3936)
    at com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4806)
    at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$.compile(CodeGenerator.scala:1625)
    at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$.compile(CodeGenerator.scala:1642)
    at org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection$.createFactory(GenerateUnsafeProjection.scala:420)
    at org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection$.create(GenerateUnsafeProjection.scala:426)
    at org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection$.create(GenerateUnsafeProjection.scala:373)
    at org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection$.create(GenerateUnsafeProjection.scala:34)
    at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator.generate(CodeGenerator.scala:1544)
    at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$Serializer.apply(ExpressionEncoder.scala:231)
    at org.apache.spark.sql.SparkSession.$anonfun$createDataset$2(SparkSession.scala:523)
    at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
    at scala.collection.immutable.List.foreach(List.scala:392)
    at scala.collection.TraversableLike.map(TraversableLike.scala:238)
    at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
    at scala.collection.immutable.List.map(List.scala:298)
    at org.apache.spark.sql.SparkSession.$anonfun$createDataset$1(SparkSession.scala:523)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:841)
    at org.apache.spark.sql.SparkSession.createDataset(SparkSession.scala:519)
    at org.apache.spark.sql.SQLContext.createDataset(SQLContext.scala:372)
    at org.apache.spark.sql.SQLImplicits.localSeqToDatasetHolder(SQLImplicits.scala:231)
    at lined13a7a62c5734238b74217a391c8437735.$read$iw$iw$iw$iw$iw$iw$iw$iw$Main$.main(command-659865:11)
    at lined13a7a62c5734238b74217a391c8437735.$read$iw$iw$iw$iw$iw$iw$iw$iw.<init>(command-659865:28)
    at lined13a7a62c5734238b74217a391c8437735.$read$iw$iw$iw$iw$iw$iw$iw.<init>(command-659865:88)
    at lined13a7a62c5734238b74217a391c8437735.$read$iw$iw$iw$iw$iw$iw.<init>(command-659865:90)
    at lined13a7a62c5734238b74217a391c8437735.$read$iw$iw$iw$iw$iw.<init>(command-659865:92)
    at lined13a7a62c5734238b74217a391c8437735.$read$iw$iw$iw$iw.<init>(command-659865:94)
    at lined13a7a62c5734238b74217a391c8437735.$read$iw$iw$iw.<init>(command-659865:96)
    at lined13a7a62c5734238b74217a391c8437735.$read$iw$iw.<init>(command-659865:98)
    at lined13a7a62c5734238b74217a391c8437735.$read$iw.<init>(command-659865:100)
    at lined13a7a62c5734238b74217a391c8437735.$read.<init>(command-659865:102)
    at lined13a7a62c5734238b74217a391c8437735.$read$.<init>(command-659865:106)
    at lined13a7a62c5734238b74217a391c8437735.$read$.<clinit>(command-659865)
    at lined13a7a62c5734238b74217a391c8437735.$eval$.$print$lzycompute(<notebook>:7)
    at lined13a7a62c5734238b74217a391c8437735.$eval$.$print(<notebook>:6)
    at lined13a7a62c5734238b74217a391c8437735.$eval.$print(<notebook>)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at scala.tools.nsc.interpreter.IMain$ReadEvalPrint.call(IMain.scala:745)
    at scala.tools.nsc.interpreter.IMain$Request.loadAndRun(IMain.scala:1021)
    at scala.tools.nsc.interpreter.IMain.$anonfun$interpret$1(IMain.scala:574)
    at scala.reflect.internal.util.ScalaClassLoader.asContext(ScalaClassLoader.scala:41)
    at scala.reflect.internal.util.ScalaClassLoader.asContext$(ScalaClassLoader.scala:37)
    at scala.reflect.internal.util.AbstractFileClassLoader.asContext(AbstractFileClassLoader.scala:41)
    at scala.tools.nsc.interpreter.IMain.loadAndRunReq$1(IMain.scala:573)
    at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:600)
    at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:570)
    at com.databricks.backend.daemon.driver.DriverILoop.execute(DriverILoop.scala:219)
    at com.databricks.backend.daemon.driver.ScalaDriverLocal.$anonfun$repl$1(ScalaDriverLocal.scala:233)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at com.databricks.backend.daemon.driver.DriverLocal$TrapExitInternal$.trapExit(DriverLocal.scala:793)
    at com.databricks.backend.daemon.driver.DriverLocal$TrapExit$.apply(DriverLocal.scala:746)
    at com.databricks.backend.daemon.driver.ScalaDriverLocal.repl(ScalaDriverLocal.scala:233)
    at com.databricks.backend.daemon.driver.DriverLocal.$anonfun$execute$11(DriverLocal.scala:451)
    at com.databricks.logging.UsageLogging.$anonfun$withAttributionContext$1(UsageLogging.scala:240)
    at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)
    at com.databricks.logging.UsageLogging.withAttributionContext(UsageLogging.scala:235)
    at com.databricks.logging.UsageLogging.withAttributionContext$(UsageLogging.scala:232)
    at com.databricks.backend.daemon.driver.DriverLocal.withAttributionContext(DriverLocal.scala:49)
    at com.databricks.logging.UsageLogging.withAttributionTags(UsageLogging.scala:277)
    at com.databricks.logging.UsageLogging.withAttributionTags$(UsageLogging.scala:270)
    at com.databricks.backend.daemon.driver.DriverLocal.withAttributionTags(DriverLocal.scala:49)
    at com.databricks.backend.daemon.driver.DriverLocal.execute(DriverLocal.scala:428)
    at com.databricks.backend.daemon.driver.DriverWrapper.$anonfun$tryExecutingCommand$1(DriverWrapper.scala:690)
    at scala.util.Try$.apply(Try.scala:213)
    at com.databricks.backend.daemon.driver.DriverWrapper.tryExecutingCommand(DriverWrapper.scala:682)
    at com.databricks.backend.daemon.driver.DriverWrapper.getCommandOutputAndError(DriverWrapper.scala:523)
    at com.databricks.backend.daemon.driver.DriverWrapper.executeCommand(DriverWrapper.scala:635)
    at com.databricks.backend.daemon.driver.DriverWrapper.runInnerLoop(DriverWrapper.scala:428)
    at com.databricks.backend.daemon.driver.DriverWrapper.runInner(DriverWrapper.scala:371)
    at com.databricks.backend.daemon.driver.DriverWrapper.run(DriverWrapper.scala:223)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.net.ConnectException: Connection refused
    at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
    at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:716)
    at biz.paluch.logging.gelf.intern.sender.GelfTCPSender.connect(GelfTCPSender.java:164)
    at biz.paluch.logging.gelf.intern.sender.GelfTCPSender.sendMessage(GelfTCPSender.java:103)
    ... 91 more
 ------ ----- 
|number|count|
 ------ ----- 
|    20|    1|
|    10|    3|
 ------ ----- 

import biz.paluch.logging.gelf.log4j.GelfLogAppender
import org.apache.log4j.Logger
import org.apache.spark.sql.SparkSession
defined object Main
 

Мне не хватает какой-то конфигурации в моем кластере баз данных? Почему он не может подключиться к порту? Я проверил с помощью команды netstat-aon, что этот порт открыт.

Комментарии:

1. действительно ли logstash работает на вашей машине? Где работает Elasticsearch?