брокер kafka новой версии 2.1.0 зависает без причины

#apache-kafka #kafka-consumer-api

#apache-kafka #kafka-consumer-api

Вопрос:

Сначала все брокеры в кластере могут запуститься и работать нормально, но иногда у одного из брокеров возникает проблема. И там появится какое-то явление:

  • весь кластер зависает, ни производитель, ни потребитель не работают, следовательно, сетевой поток с монитора равен нулю;
  • используйте kafka-topic.sh опишите тему сообщения, каждая реплика просто прекрасна, даже исключительный brokerid, и информация в zk также нормальная;
  • количество описаний файлов постепенно увеличивается на ненормальном брокере, который считывается из /proc/sys/fs/file-nr
  • netstat порт прослушивания брокера 9092 отображает множество статусов «CLOSE_WAIT»

Ниже приведен журнал ошибок, отображаемый у других брокеров, в то время как в журнале ненормальных брокеров не видно никаких исключений.

 [2019-04-15 18:10:25,243] INFO [ReplicaFetcher replicaId=0, leaderId=1, fetcherId=0] Error sending fetch request (sessionId=289971597, epoch=1254343) t
o node 1: java.io.IOException: Connection to 1 was disconnected before the response was read. (org.apache.kafka.clients.FetchSessionHandler)
[2019-04-15 18:10:25,244] WARN [ReplicaFetcher replicaId=0, leaderId=1, fetcherId=0] Error in response for fetch request (type=FetchRequest, replicaId=
0, maxWait=500, minBytes=1, maxBytes=10485760, fetchData={atm_error_intf-7=(offset=538244, logStartOffset=5319, maxBytes=1048576, currentLeaderEpoch=Op
tional[13])}, isolationLevel=READ_UNCOMMITTED, toForget=, metadata=(sessionId=289971597, epoch=1254343)) (kafka.server.ReplicaFetcherThread)
java.io.IOException: Connection to 1 was disconnected before the response was read
        at org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:97)
        at kafka.server.ReplicaFetcherBlockingSend.sendRequest(ReplicaFetcherBlockingSend.scala:97)
        at kafka.server.ReplicaFetcherThread.fetchFromLeader(ReplicaFetcherThread.scala:190)
        at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:241)
        at kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:130)
        at kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:129)
        at scala.Option.foreach(Option.scala:257)
        at kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:129)
        at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:111)
        at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82)
[2019-04-15 18:10:57,275] WARN [ReplicaFetcher replicaId=0, leaderId=1, fetcherId=0] Error in response for fetch request (type=FetchRequest, replicaId=
0, maxWait=500, minBytes=1, maxBytes=10485760, fetchData={ieg_qsm_guildstatechangereport4pandora-19=(offset=52347859, logStartOffset=38458463, maxBytes=1048576, currentLeaderEpoch=Optional[50]), ieg_qsm_playerreporthighfrequency4pandora-10=(offset=97212897, logStartOffset=65418413, maxBytes=1048576, currentLeaderEpoch=Optional[46]), ieg_qsmtest_guildstatechangereport4pandora-13=(offset=25771, logStartOffset=20917, maxBytes=1048576, currentLeaderEpoch=Optional[46]), __consumer_offsets-10=(offset=0, logStartOffset=0, maxBytes=1048576, currentLeaderEpoch=Optional[57]), ieg_qsmtest_playerreporthighfrequency4pandora-7=(offset=141317, logStartOffset=118323, maxBytes=1048576, currentLeaderEpoch=Optional[45]), __consumer_offsets-25=(offset=0, logStartOffset=0, maxBytes=1048576, currentLeaderEpoch=Optional[44]), ieg_qsmtest_playerlogin-15=(offset=59440, logStartOffset=52149, maxBytes=1048576, currentLeaderEpoch=Optional[55]), dm_pdl_wefeng_findfriend_topic-12=(offset=0, logStartOffset=0, maxBytes=1048576, currentLeaderEpoch=Optional[42]), dm_pdl_wefeng_findfriend_topic_test-0=(offset=0, logStartOffset=0, maxBytes=1048576, currentLeaderEpoch=Optional[47]), ieg_qsmtest_guildstatechangereport4pandora-18=(offset=21042, logStartOffset=16441, maxBytes=1048576, currentLeaderEpoch=Optional[56]), ieg_qsm_playerlogin-1=(offset=27414596, logStartOffset=17328842, maxBytes=1048576, currentLeaderEpoch=Optional[45]), atm_error_intf-7=(offset=538244, logStartOffset=5319, maxBytes=1048576, currentLeaderEpoch=Optional[13]), __consumer_offsets-30=(offset=0, logStartOffset=0, maxBytes=1048576, currentLeaderEpoch=Optional[51]), ieg_qsm_playerreporthighfrequency4pandora-15=(offset=87995984, logStartOffset=53470647, maxBytes=1048576, currentLeaderEpoch=Optional[55]), __consumer_offsets-45=(offset=0, logStartOffset=0, maxBytes=1048576, currentLeaderEpoch=Optional[47]), ieg_qsm_playerlogin-6=(offset=25070198, logStartOffset=16224757, maxBytes=1048576, currentLeaderEpoch=Optional[54]), ieg_qsmtest_playerreporthighfrequency4pandora-12=(offset=141878, logStartOffset=122257, maxBytes=1048576, currentLeaderEpoch=Optional[56]), dm_pdl_wefeng_findfriend_topic-17=(offset=0, logStartOffset=0, maxBytes=1048576, currentLeaderEpoch=Optional[56]), ieg_qsm_guildstatechangereport4pandora-14=(offset=45869398, logStartOffset=27847747, maxBytes=1048576, currentLeaderEpoch=Optional[47]), dm_pdl_wefeng_findfriend_topic_test-5=(offset=0, logStartOffset=0, maxBytes=1048576, currentLeaderEpoch=Optional[57]), atm_error_intf-27=(offset=539432, logStartOffset=5392, maxBytes=1048576, currentLeaderEpoch=Optional[13]), ieg_qsmtest_playerlogin-10=(offset=66712, logStartOffset=55774, maxBytes=1048576, currentLeaderEpoch=Optional[48]), __consumer_offsets-5=(offset=0, logStartOffset=0, maxBytes=1048576, currentLeaderEpoch=Optional[47])}, isolationLevel=READ_UNCOMMITTED, toForget=, metadata=(sessionId=289971597, epoch=INITIAL)) (kafka.server.ReplicaFetcherThread)
java.io.IOException: Connection to 1 was disconnected before the response was read
        at org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:97)
        at kafka.server.ReplicaFetcherBlockingSend.sendRequest(ReplicaFetcherBlockingSend.scala:97)
        at kafka.server.ReplicaFetcherThread.fetchFromLeader(ReplicaFetcherThread.scala:190)
        at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:241)
        at kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:130)
        at kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:129)
        at scala.Option.foreach(Option.scala:257)
        at kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:129)
        at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:111)
        at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82)
  

-Xmx8G -Xms8G, это размер кучи jvm. Я могу только убить -9 экземпляр и перезапустить, чтобы возобновить его.

  • версия kafka_2.12-2.1.0
  • java

Ответ №1:

Об этой проблеме сообщалось в kafka 2.1.0. Пожалуйста, проверьте https://issues.apache.org/jira/browse/KAFKA-7802 подробнее. Он помечен как дублированный https://issues.apache.org/jira/browse/KAFKA-7697 это исправлено в kafka 2.2.0, поэтому обновление версии kafka должно помочь.

Комментарии:

1. Спасибо, я попробовал обновить до 2.2.0 и исправил проблему.