#logstash #databricks
Вопрос:
Я должен отправлять журналы Spark из баз данных в Logstash. Для этого я использую GelfLogAppender для отправки журналов в виде сообщения GELF по протоколу tcp на сервер logstash, работающий на моей локальной машине.
Вот мой файл конфигурации Logstash:
input{
tcp{
port => 5000
}
}
output{
elasticsearch{
hosts => ["http://localhost:9200"]
index => "logspoc"
}
stdout{}
}
А вот мой код базы данных:
import biz.paluch.logging.gelf.log4j.GelfLogAppender
import org.apache.log4j.Logger
import org.apache.spark.sql.SparkSession
object Main{
def main(args: Array[String]): Unit = {
val rootLogger = Logger.getRootLogger()
rootLogger.addAppender(createAppender(host= "tcp:0.0.0.0", port= 5000))
val wordNumber = Seq((10,"Apple"),(10,"Ball"),(10,"Cat"),(20,"Dog")).toDF(colNames= "number","word")
wordNumber.groupBy(col1= "number").count().show()
}
def createAppender(host: String, port: Int): GelfLogAppender = {
val appender = new GelfLogAppender()
appender.setHost(host)
appender.setPort(port)
appender.setExtractStackTrace("true")
appender.setFilterStackTrace(false)
//appender.setMaximumMessageSize(8192)
appender.setIncludeFullMdc(true)
appender.activateOptions()
appender
}
}
Main.main(Array("123","23123"))
Когда я запускаю эту ячейку, я получаю следующую ошибку:
log4j:ERROR Connection refused
java.io.IOException: Cannot send data to 0.0.0.0:5000
at biz.paluch.logging.gelf.intern.sender.GelfTCPSender.sendMessage(GelfTCPSender.java:126)
at biz.paluch.logging.gelf.log4j.GelfLogAppender.append(GelfLogAppender.java:95)
at org.apache.log4j.AppenderSkeleton.doAppend(AppenderSkeleton.java:251)
at org.apache.log4j.helpers.AppenderAttachableImpl.appendLoopOnAppenders(AppenderAttachableImpl.java:66)
at org.apache.log4j.Category.callAppenders(Category.java:206)
at org.apache.log4j.Category.forcedLog(Category.java:391)
at org.apache.log4j.Category.log(Category.java:856)
at org.slf4j.impl.Log4jLoggerAdapter.info(Log4jLoggerAdapter.java:305)
at org.apache.spark.internal.Logging.logInfo(Logging.scala:57)
at org.apache.spark.internal.Logging.logInfo$(Logging.scala:56)
at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$.logInfo(CodeGenerator.scala:1564)
at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$anon$2.load(CodeGenerator.scala:1779)
at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$anon$2.load(CodeGenerator.scala:1770)
at com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3522)
at com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2315)
at com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2278)
at com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2193)
at com.google.common.cache.LocalCache.get(LocalCache.java:3932)
at com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:3936)
at com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4806)
at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$.compile(CodeGenerator.scala:1625)
at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$.compile(CodeGenerator.scala:1642)
at org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection$.createFactory(GenerateUnsafeProjection.scala:420)
at org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection$.create(GenerateUnsafeProjection.scala:426)
at org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection$.create(GenerateUnsafeProjection.scala:373)
at org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection$.create(GenerateUnsafeProjection.scala:34)
at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator.generate(CodeGenerator.scala:1544)
at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$Serializer.apply(ExpressionEncoder.scala:231)
at org.apache.spark.sql.SparkSession.$anonfun$createDataset$2(SparkSession.scala:523)
at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238)
at scala.collection.immutable.List.foreach(List.scala:392)
at scala.collection.TraversableLike.map(TraversableLike.scala:238)
at scala.collection.TraversableLike.map$(TraversableLike.scala:231)
at scala.collection.immutable.List.map(List.scala:298)
at org.apache.spark.sql.SparkSession.$anonfun$createDataset$1(SparkSession.scala:523)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:841)
at org.apache.spark.sql.SparkSession.createDataset(SparkSession.scala:519)
at org.apache.spark.sql.SQLContext.createDataset(SQLContext.scala:372)
at org.apache.spark.sql.SQLImplicits.localSeqToDatasetHolder(SQLImplicits.scala:231)
at lined13a7a62c5734238b74217a391c8437735.$read$iw$iw$iw$iw$iw$iw$iw$iw$Main$.main(command-659865:11)
at lined13a7a62c5734238b74217a391c8437735.$read$iw$iw$iw$iw$iw$iw$iw$iw.<init>(command-659865:28)
at lined13a7a62c5734238b74217a391c8437735.$read$iw$iw$iw$iw$iw$iw$iw.<init>(command-659865:88)
at lined13a7a62c5734238b74217a391c8437735.$read$iw$iw$iw$iw$iw$iw.<init>(command-659865:90)
at lined13a7a62c5734238b74217a391c8437735.$read$iw$iw$iw$iw$iw.<init>(command-659865:92)
at lined13a7a62c5734238b74217a391c8437735.$read$iw$iw$iw$iw.<init>(command-659865:94)
at lined13a7a62c5734238b74217a391c8437735.$read$iw$iw$iw.<init>(command-659865:96)
at lined13a7a62c5734238b74217a391c8437735.$read$iw$iw.<init>(command-659865:98)
at lined13a7a62c5734238b74217a391c8437735.$read$iw.<init>(command-659865:100)
at lined13a7a62c5734238b74217a391c8437735.$read.<init>(command-659865:102)
at lined13a7a62c5734238b74217a391c8437735.$read$.<init>(command-659865:106)
at lined13a7a62c5734238b74217a391c8437735.$read$.<clinit>(command-659865)
at lined13a7a62c5734238b74217a391c8437735.$eval$.$print$lzycompute(<notebook>:7)
at lined13a7a62c5734238b74217a391c8437735.$eval$.$print(<notebook>:6)
at lined13a7a62c5734238b74217a391c8437735.$eval.$print(<notebook>)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at scala.tools.nsc.interpreter.IMain$ReadEvalPrint.call(IMain.scala:745)
at scala.tools.nsc.interpreter.IMain$Request.loadAndRun(IMain.scala:1021)
at scala.tools.nsc.interpreter.IMain.$anonfun$interpret$1(IMain.scala:574)
at scala.reflect.internal.util.ScalaClassLoader.asContext(ScalaClassLoader.scala:41)
at scala.reflect.internal.util.ScalaClassLoader.asContext$(ScalaClassLoader.scala:37)
at scala.reflect.internal.util.AbstractFileClassLoader.asContext(AbstractFileClassLoader.scala:41)
at scala.tools.nsc.interpreter.IMain.loadAndRunReq$1(IMain.scala:573)
at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:600)
at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:570)
at com.databricks.backend.daemon.driver.DriverILoop.execute(DriverILoop.scala:219)
at com.databricks.backend.daemon.driver.ScalaDriverLocal.$anonfun$repl$1(ScalaDriverLocal.scala:233)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at com.databricks.backend.daemon.driver.DriverLocal$TrapExitInternal$.trapExit(DriverLocal.scala:793)
at com.databricks.backend.daemon.driver.DriverLocal$TrapExit$.apply(DriverLocal.scala:746)
at com.databricks.backend.daemon.driver.ScalaDriverLocal.repl(ScalaDriverLocal.scala:233)
at com.databricks.backend.daemon.driver.DriverLocal.$anonfun$execute$11(DriverLocal.scala:451)
at com.databricks.logging.UsageLogging.$anonfun$withAttributionContext$1(UsageLogging.scala:240)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)
at com.databricks.logging.UsageLogging.withAttributionContext(UsageLogging.scala:235)
at com.databricks.logging.UsageLogging.withAttributionContext$(UsageLogging.scala:232)
at com.databricks.backend.daemon.driver.DriverLocal.withAttributionContext(DriverLocal.scala:49)
at com.databricks.logging.UsageLogging.withAttributionTags(UsageLogging.scala:277)
at com.databricks.logging.UsageLogging.withAttributionTags$(UsageLogging.scala:270)
at com.databricks.backend.daemon.driver.DriverLocal.withAttributionTags(DriverLocal.scala:49)
at com.databricks.backend.daemon.driver.DriverLocal.execute(DriverLocal.scala:428)
at com.databricks.backend.daemon.driver.DriverWrapper.$anonfun$tryExecutingCommand$1(DriverWrapper.scala:690)
at scala.util.Try$.apply(Try.scala:213)
at com.databricks.backend.daemon.driver.DriverWrapper.tryExecutingCommand(DriverWrapper.scala:682)
at com.databricks.backend.daemon.driver.DriverWrapper.getCommandOutputAndError(DriverWrapper.scala:523)
at com.databricks.backend.daemon.driver.DriverWrapper.executeCommand(DriverWrapper.scala:635)
at com.databricks.backend.daemon.driver.DriverWrapper.runInnerLoop(DriverWrapper.scala:428)
at com.databricks.backend.daemon.driver.DriverWrapper.runInner(DriverWrapper.scala:371)
at com.databricks.backend.daemon.driver.DriverWrapper.run(DriverWrapper.scala:223)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.net.ConnectException: Connection refused
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:716)
at biz.paluch.logging.gelf.intern.sender.GelfTCPSender.connect(GelfTCPSender.java:164)
at biz.paluch.logging.gelf.intern.sender.GelfTCPSender.sendMessage(GelfTCPSender.java:103)
... 91 more
------ -----
|number|count|
------ -----
| 20| 1|
| 10| 3|
------ -----
import biz.paluch.logging.gelf.log4j.GelfLogAppender
import org.apache.log4j.Logger
import org.apache.spark.sql.SparkSession
defined object Main
Мне не хватает какой-то конфигурации в моем кластере баз данных? Почему он не может подключиться к порту? Я проверил с помощью команды netstat-aon, что этот порт открыт.
Комментарии:
1. действительно ли logstash работает на вашей машине? Где работает Elasticsearch?